LBFGS方法推导

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: LBFGS方法推导

BFGS方法推导

可以构造一个二次函数来开始,假设x_k第k次迭代为:

    \[    m_k(p)=f_k + \nabla f_k^T p+\frac{1}{2} p^T B_k p      \qquad\qquad\qquad Equitation 1 \]

注意B_k为n x n对称正定矩阵,可以在每次迭代中更新。注意到这个构造函数在p=0的时候,f_k\nabla f_k和原函数是相同的。对其求导,得到

    \[    p_k= - B_k^{-1} \nabla f_k      \qquad\qquad\qquad Equitation 2 \]

公式2即为搜索方向向量,由此x_{k+1}第k+1次迭代为(其中\alpha_k可以用强Wolfe准则来计算):

    \[    x_{k+1}= x_k + \alpha_k p_k      \qquad\qquad\qquad Equitation 3 \]

为了不每次都计算B_k(计算太复杂),Davidon提出用最近迭代的几步来计算它。
我们已经计算出x_{k+1},现在构造其在k+1次迭代的二次函数:

    \[    m_{k+1}(p)=f_{k+1} + \nabla f_{k+1}^T p+\frac{1}{2} p^T B_{k+1} p      \qquad\qquad\qquad Equitation 4 \]

怎么求B_{k+1}呢?函数m_{k+1}(p)应该在x_k处和x_{k+1}处和目标函数f_x导数一致。对其求导,并令p = - \alpha_k p_k,即x_k处得:(x_{k+1}处自然相等,\nabla m_{k+1}(0) = \nabla f_{k+1}^T

    \[    \nabla m_{k+1}( - \alpha_k p_k )= \nabla f_{k+1} - \alpha_k B_{k+1} p_k = \nabla f_k     \qquad\qquad\qquad Equitation 5 \]

调整方程得:

    \[    B_{k+1} \alpha_k p_k = \nabla f_{k+1} - \nabla f_k   \qquad\qquad\qquad Equitation 6 \]

s_k = x_{k+1} - x_k = \alpha_k p_k, \qquad y_k = \nabla f_{k+1} - \nabla f_k,方程6为:

    \[    B_{k+1} s_k = y_k   \qquad\qquad\qquad Equitation 7 \]

H_{k+1} = B_{k+1}^{-1},得到另一个形式:

    \[    H_{k+1} y_k = s_k   \qquad\qquad\qquad Equitation 8 \]

由此,方程2可以写为:

    \[    p_k= - H_k \nabla f_k      \qquad\qquad\qquad Equitation 9 \]

由于我们需要一个比较简单的更新B_k的方法,令:

    \[    B_{k+1}= B_k + E_k , H_{k+1}= H_k + D_k      \qquad\qquad\qquad Equitation 10 \]

其中E_kD_k为秩1或秩2的矩阵。
E_k为:

    \[    E_k = \alpha u_k u_k^T + \beta v_k v_k^T      \qquad\qquad\qquad Equitation 11 \]

联立方程7,方程10和方程11得:

    \[    ( B_k + \alpha u_k u_k^T + \beta v_k v_k^T )s_k = y_k      \qquad\qquad\qquad Equitation 12 \]

等价于:

    \[    \alpha ( u_k^T s_k ) u_k  + \beta ( v_k^T s_k ) v_k = y_k - B_k s_k      \qquad\qquad\qquad Equitation 13 \]

通过方程13发现,u_kv_k不唯一,令u_kv_k分别平行于B_k s_ky_k,即u_k = \gamma B_k s_kv_k = \theta y_k,带入方程11得:

    \[    E_k = \alpha \gamma^2 B_k s_k s_k^T B_k + \beta \theta^2 y_k y_k^T      \qquad\qquad\qquad Equitation 14 \]

u_kv_k带入方程13得:

    \[    \alpha ( (\gamma B_k s_k)^T s_k ) (\gamma B_k s_k)  + \beta ( (\theta y_k)^T s_k ) (\theta y_k) = y_k - B_k s_k      \qquad\qquad\qquad Equitation 15 \]

整理后得到:

    \[    [ \alpha \gamma^2 (s_k^T B_k s_k) + 1 ]B_k s_k + [\beta \theta^2 (y_k^T s) - 1 ]y_k = 0      \qquad\qquad\qquad Equitation 16 \]

所以\alpha \gamma^2 (s_k^T B_k s_k) + 1 = 0\beta \theta^2 (y_k^T s) - 1 = 0
\alpha \gamma^2  = - \frac{1}{ (s_k^T B_k s_k) }\beta \theta^2 = \frac{1}{ (y_k^T s) }
最后联立方程14,代入到方程10得:

    \[    B_{k+1} = B_k - \frac{B_k s_k s_k^T B_k}{s_k^T B_k s_k} + \frac{y_k y_k^T}{y_k^T s_k}      \qquad\qquad\qquad Equitation 17 \]

用同样的方式可以得到:

    \[    B_{k+1}^{-1} = (B_k - \frac{B_k s_k s_k^T B_k}{s_k^T B_k s_k} + \frac{y_k y_k^T}{y_k^T s_k})^{-1}      \qquad\qquad\qquad Equitation 18 \]

Sherman Morison Woodbury公式:

    \[    (A + UCV)^{-1} = A^{-1} - \frac{A^{-1}UVA^{-1}}{C^{-1} + VA^{-1}U}   \qquad\qquad\qquad Sherman Morison Woodbury Equitation \]

下面对方程18应用Sherman Morison Woodbury公式:(注意推导中的H是方程中的B_k)(参考文档):

现在我们得到结果:

    \[    B_{k+1}^{-1} = (I - \frac{s_k y_k^T}{y_k^T s_k}) B_k^{-1} (I - \frac{y_k s_k^T}{y_k^T s_k}) + \frac{s_k s_k^T}{y_k^T s_k}     \qquad\qquad\qquad Equitation 19 \]

\rho = \frac{1}{y_k^T s_k},方程19可得BFGS方法的迭代方程:

    \[    H_{k+1} = (I - \rho s_k y_k^T }) H_k (I - \rho y_k s_k^T}) + \rho s_k s_k^T     \qquad\qquad\qquad BFGS Equitation \]

LBFGS方法推导

V_k = I - \rho y_k s_k^T,则BFGS方程可写成:

    \[    H_{k+1} = V_k^T H_k V_k + \rho s_k s_k^T     \qquad\qquad\qquad Equitation 20 \]

参考文档
则从k=0开始有:

    \begin{align*} H_1 & = V_0^T H_0 V_0 + \rho_0 s_0 s_0^T \\ H_2 & = V_1^T H_1 V_1 + \rho_1 s_1 s_1^T \\     & = V_1^T (V_0^T H_0 V_0 + \rho_0 s_0 s_0^T) V_1 + \rho_1 s_1 s_1^T \\     & = V_1^T V_0^T H_0 V_0 V_1 + V_1^T \rho_0 s_0 s_0^T V_1 + \rho_1 s_1 s_1^T \\ H_3 & = V_2^T H_2 V_2 + \rho_2 s_2 s_2^T \\     & = V_2^T (V_1^T V_0^T H_0 V_0 V_1 + V_1^T \rho_0 s_0 s_0^T V_1 + \rho_1 s_1 s_1^T) V_2 + \rho_2 s_2 s_2^T \\     & = V_2^T V_1^T V_0^T H_0 V_0 V_1 V_2 + V_2^T V_1^T \rho_0 s_0 s_0^T V_1 V_2 + V_2^T \rho_1 s_1 s_1^T V_2 + \rho_2 s_2 s_2^T \\ ...... \\ ...... \\ H_{k+1} & = (V_k^T v_{k-1}^T ... V_1^T V_0^T) H_0 (V_0 V_1 ... V_{k-1} V_k ) \\         & + (V_k^T v_{k-1}^T ... V_2^T V_1^T) (\rho_0 s_0 s_0^T) (V_1 V_2 ... V_{k-1} V_k ) \\         & + (V_k^T v_{k-1}^T ... V_3^T V_2^T) (\rho_1 s_1 s_1^T) (V_2 V_3 ... V_{k-1} V_k ) \\         & + ...... \\         & + (V_k^T v_{k-1}^T) (\rho_{k-2} s_{k-2} s_{k-2}^T) (V_{k-1} V_k ) \\         & + V_k^T (\rho_{k-1} s_{k-1} s_{k-1}^T) V_k \\         & + \rho_k s_k s_k^T    \qquad\qquad\qquad Equitation 21 \end{align}

由上面可见,计算H_{k+1}需要用到\left\{s_i , y_i\right\}_{i=0}^k组数据,数据量非常大,LBFGS算法就采取只去最近的m组数据来运算,即可以构造近似计算公式:

    \begin{align*} H_k & = (V_{k-1}^T ... V_{k-m}^T ) H_k^0 (V_{k-m} ... V_{k-1} ) \\         & + \rho_{k-m} (V_{k-1}^T ... V_{k-m+1}^T) s_{k-m} s_{k-m}^T (V_{k-m+1} ... V_{k-1}) \\         & + \rho_{k-m+1} (V_{k-1}^T ... V_{k-m+2}^T) s_{k-m+1} s_{k-m+1}^T (V_{k-m+2} ... V_{k-1}) \\         & + ...... \\         & + \rho_{k-2} V_{k-1}^T s_{k-2} s_{k-2}^T V_{k-1} \\         & + \rho_{k-1} s_{k-1} s_{k-1}^T    \qquad\qquad\qquad Equitation 22 \end{align}

最后Numerial Optimization 这本书给出了LBFGS的双向循环算法:

具体实现代码可以参见Breeze LBFGS

最后验证算法对公式22的正确性:参考文档
q_0 = \nabla f(x_k):

    \begin{align*} q_i & = q_{i-1} - \rho_{k-i} y_{k-i} s_{k-i}^T q_{i-1}  \\     & = ( I - \rho_{k-i} y_{k-i} s_{k-i}^T ) q_{i-1}   \\     & = V_{k-i} q_{i-1}  \\     & = V_{k-i} V_{k-i+1} q_{i-2} \\     & = ......  \\     & = V_{k-i} V_{k-i+1} ... V_{k-1} q_0  \\     & = V_{k-i} V_{k-i+1} ... V_{k-1} \nabla f(x_k) \\ \alpha_i & = \rho_{k-i} s_{k-i}^T q_{i-1}  \\          & = \rho_{k-i} s_{k-i}^T V_{k-i+1} V_{k-i+2} ... V_{k-1} \nabla f(x_k)  \end{align}

r_{m+1} = H_k^0 q = H_k^0 V_{k-i} V_{k-i+1} ... V_{k-1} \nabla f(x_k),这里的m指的是从现在到历史记录m次的后一次,因为LBFGS只记录m次历史:

    \begin{align*} r_i & = r_{i+1} + s_{k-i} ( \alpha_i - \beta ) \\     & = r_{i+1} + s_{k-i} ( \alpha_i - \rho_{k-i} y_{k-i}^T r_{i+1})   \\     & = s_{k-i} \alpha_i + ( I- s_{k-i} \rho_{k-i} y_{k-i}^T) r_{i+1}    \\     & = s_{k-i} \alpha_{i} + V_{k-i}^T r_{i+1} \\     \\ r_1 & = s_{k-1} \alpha_1 + V_{k-1}^T r_2     \\     & = s_{k-1} \rho_{k-1} s_{k-1}^T \nabla f(x_k) + V_{k-1}^T r_2    \\     & = s_{k-1} \rho_{k-1} s_{k-1}^T \nabla f(x_k) + V_{k-1}^T ( s_{k-2} \alpha_2 + V_{k-2}^T r_3)    \\     & = s_{k-1} \rho_{k-1} s_{k-1}^T \nabla f(x_k) + V_{k-1}^T s_{k-2} \rho_{k-2} s_{k-2}^T V_{k-1} \nabla f(x_k)+ V_{k-1}^T V_{k-2}^T r_3    \\     & = ......    \\     & = s_{k-1} \rho_{k-1} s_{k-1}^T \nabla f(x_k) \\     & + V_{k-1}^T s_{k-2} \rho_{k-2} s_{k-2}^T V_{k-1}  \nabla f(x_k) \\     & + ......    \\     & + (V_{k-1}^T V_{k-2}^T ... V_{k-m+2}^T ) S_{k-m+1} \rho_{k-m+1} S_{k-m+1}^T (V_{k-m+1} ... V_{k-2} V_{k-1} ) \nabla f(x_k)  \\     & + (V_{k-1}^T V_{k-2}^T ... V_{k-m+1}^T ) S_{k-m} \rho_{k-m} S_{k-m}^T (V_{k-m} ... V_{k-2} V_{k-1} ) \nabla f(x_k)    \\     & + (V_{k-1}^T V_{k-2}^T ... V_{k-m}^T ) H_k^0 (V_{k-m} ... V_{k-2} V_{k-1}) \nabla f(x_k)  \end{align}

本作品采用知识共享署名 4.0 国际许可协议进行许可。

Logistic Regression(逻辑回归) in ML

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Logistic Regression(逻辑回归) in ML

概览

Code in org.apache.spark.ml.classification.LogisticRegressionSuite.scala

class LogisticRegressionSuite
  extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
 
  import testImplicits._ //Mark A
 
  private val seed = 42
  @transient var smallBinaryDataset: Dataset[_] = _  //Mark B
 
  override def beforeAll(): Unit = {
    super.beforeAll()
 
    smallBinaryDataset = generateLogisticInput(1.0, 1.0, nPoints = 100, seed = seed).toDF()  //Mark C
 
......
 
  test("logistic regression: default params") {
    val lr = new LogisticRegression
    assert(lr.getLabelCol === "label")
    assert(lr.getFeaturesCol === "features")
    assert(lr.getPredictionCol === "prediction")
    assert(lr.getRawPredictionCol === "rawPrediction")
    assert(lr.getProbabilityCol === "probability")
    assert(lr.getFamily === "auto")
    assert(!lr.isDefined(lr.weightCol))
    assert(lr.getFitIntercept)
    assert(lr.getStandardization)
    val model = lr.fit(smallBinaryDataset)
    model.transform(smallBinaryDataset)
      .select("label", "probability", "prediction", "rawPrediction")
      .collect()
    assert(model.getThreshold === 0.5)
    assert(model.getFeaturesCol === "features")
    assert(model.getPredictionCol === "prediction")
    assert(model.getRawPredictionCol === "rawPrediction")
    assert(model.getProbabilityCol === "probability")
    assert(model.intercept !== 0.0)
    assert(model.hasParent)
 
    // copied model must have the same parent.
    MLTestingUtils.checkCopy(model)
    assert(model.hasSummary)
    val copiedModel = model.copy(ParamMap.empty)
    assert(copiedModel.hasSummary)
    model.setSummary(None)
    assert(!model.hasSummary)
  }
 
......
 
object LogisticRegressionSuite {
 
......
 
  // Generate input of the form Y = logistic(offset + scale*X)
  def generateLogisticInput(
      offset: Double,
      scale: Double,
      nPoints: Int,
      seed: Int): Seq[LabeledPoint] = {
    val rnd = new Random(seed)
    val x1 = Array.fill[Double](nPoints)(rnd.nextGaussian())
 
    val y = (0 until nPoints).map { i =>
      val p = 1.0 / (1.0 + math.exp(-(offset + scale * x1(i))))
      if (rnd.nextDouble() < p) 1.0 else 0.0 } val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(Array(x1(i)))))
    testData
  }
......

注意到Mark A, Mark B and Mark C,方法”generateLogisticInput”返回Seq结果,但是”smallBinaryDataset”是Dataset类型。 注意到在Mark A处,”localSeqToDatasetHolder”方法将把Seq隐式转换为DatasetHolder,而DatasetHolder有一个toDF()方法可以把Dateset转换为DataFrame。

abstract class SQLImplicits {
......
  /**
   * Creates a [[Dataset]] from a local Seq.
   * @since 1.6.0
   */
  implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
    DatasetHolder(_sqlContext.createDataset(s))
  }
......
 
case class DatasetHolder[T] private[sql](private val ds: Dataset[T]) {
 
  // This is declared with parentheses to prevent the Scala compiler from treating
  // `rdd.toDS("1")` as invoking this toDS and then apply on the returned Dataset.
  def toDS(): Dataset[T] = ds
 
  // This is declared with parentheses to prevent the Scala compiler from treating
  // `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
  def toDF(): DataFrame = ds.toDF()
 
  def toDF(colNames: String*): DataFrame = ds.toDF(colNames : _*)
}
构造Model

用LogisticRegression来构建model, 方法”val model = lr.fit(smallBinaryDataset)”。
继续阅读“Logistic Regression(逻辑回归) in ML”本作品采用知识共享署名 4.0 国际许可协议进行许可。

Using ES6 with gulp

转载:Using ES6 with gulp

Nov 2nd, 2015

  • As of node 4, we’re now able to use ES2015 without the need for Babel. However modules are not currently supported so you’ll need to use `require()` still. Checkout the node docs for more info on what’s supported. If you’d like module support and to utilise Babel, read on.
  • Post updated to use Babel 6.

With gulp 3.9, we are now able to use ES6 (or ES2015 as it’s now named) in our gulpfile—thanks to the awesome Babel transpiler.

Firstly make sure you have at least version 3.9 of both the CLI and local version of gulp. To check which version you have, open up terminal and type:

gulp -v

This should return:

CLI version 3.9.0
Local version 3.9.0

If you get any versions lower than 3.9, update gulp in your package.json file, and run the following to update both versions:

npm install gulp && npm install gulp -g

Creating an ES6 gulpfile

To leverage ES6 you will need to install Babel (make sure you have Babel 6) as a dependency to your project, along with the es2015 plugin preset:

npm install babel-core babel-preset-es2015 --save-dev

Once this has finished, we need to create a .babelrc config file to enable the es2015 preset:

touch .babelrc

And add the following to the file:

{
  "presets": ["es2015"]
}

We then need to instruct gulp to use Babel. To do this, we need to rename the gulpfile.js to gulpfile.babel.js:

mv "gulpfile.js" "gulpfile.babel.js"

We can now use ES6 via Babel! An example of a typical gulp task using new ES6 features:

'use strict';

import gulp from 'gulp';
import sass from 'gulp-sass';
import autoprefixer from 'gulp-autoprefixer';
import sourcemaps from 'gulp-sourcemaps';

const dirs = {
  src: 'src',
  dest: 'build'
};

const sassPaths = {
  src: `${dirs.src}/app.scss`,
  dest: `${dirs.dest}/styles/`
};

gulp.task('styles', () => {
  return gulp.src(paths.src)
    .pipe(sourcemaps.init())
    .pipe(sass.sync().on('error', plugins.sass.logError))
    .pipe(autoprefixer())
    .pipe(sourcemaps.write('.'))
    .pipe(gulp.dest(paths.dest));
});

Here we have utilised ES6 import/modules, arrow functions, template strings and constants. If you’d like to check out more ES6 features, es6-features.org is a handy resource.

Spark Planner for Converting Logical Plan to Spark Plan

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Spark Planner for Converting Logical Plan to Spark Plan

The Spark Planner is the bridge from Logic Plan to Spark Plan which will convert Logic Plan to Spark Plan.
The SparkPlanner extends from SparkStrategies(Code in SparkStrategies.scala).
It have several strategies:

  def strategies: Seq[Strategy] =
      extraStrategies ++ (
      FileSourceStrategy ::
      DataSourceStrategy ::
      DDLStrategy ::
      SpecialLimits ::
      Aggregation ::
      JoinSelection ::
      InMemoryScans ::
      BasicOperators :: Nil)

Let me introduce several here:

SpecialLimits

The Logical Plan Limit will be converted to Spark Plan TakeOrderedAndProjectExec here.

 /**
   * Plans special cases of limit operators.
   */
  object SpecialLimits extends Strategy {
    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
      case logical.ReturnAnswer(rootPlan) => rootPlan match {
        case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
          execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
        case logical.Limit(
            IntegerLiteral(limit),
            logical.Project(projectList, logical.Sort(order, true, child))) =>
          execution.TakeOrderedAndProjectExec(
            limit, order, projectList, planLater(child)) :: Nil
        case logical.Limit(IntegerLiteral(limit), child) =>
          execution.CollectLimitExec(limit, planLater(child)) :: Nil
        case other => planLater(other) :: Nil
      }
      case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
        execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
      case logical.Limit(
          IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
        execution.TakeOrderedAndProjectExec(
          limit, order, projectList, planLater(child)) :: Nil
      case _ => Nil
    }
  }
Aggregation

The Logical Plan Aggregate will be convert to bellow Spark Plan based on conditions:
SortAggregateExec
HashAggregateExec
ObjectHashAggregateExec

 /**
   * Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface.
   */
  object Aggregation extends Strategy {
    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
      case PhysicalAggregation(
          groupingExpressions, aggregateExpressions, resultExpressions, child) =>
 
        val (functionsWithDistinct, functionsWithoutDistinct) =
          aggregateExpressions.partition(_.isDistinct)
......

Let me use SQL “SELECT x.str, COUNT(*) FROM df x JOIN df y ON x.str = y.str GROUP BY x.str” for example.
And at here, “COUNT(*)” is an aggregate expression.

继续阅读“Spark Planner for Converting Logical Plan to Spark Plan”本作品采用知识共享署名 4.0 国际许可协议进行许可。

Logical Plan

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Logical Plan

LogicalPlan classes hierarchy
LogicalPlan 
    InsertIntoTable : Insert data to a table[Logical Plan].
    LeafNode : The node don't have children.
        Command : The Logical Plan don't have output.
            CreateTable 
            RunnableCommand 
                ExplainCommand 
                DropDatabaseCommand 
                AlterTableSetPropertiesCommand 
                AlterTableUnsetPropertiesCommand 
                SetCommand 
                RefreshResource 
                ShowTablesCommand 
                CacheTableCommand 
                ShowDatabasesCommand 
                AddFileCommand 
                ListFilesCommand 
                RefreshTable 
                DescribeDatabaseCommand 
                CreateHiveTableAsSelectCommand 
                AddJarCommand 
                DropTableCommand 
                CreateDatabaseCommand 
                CreateDataSourceTableCommand 
                CreateTableCommand 
                AnalyzeColumnCommand 
                CreateViewCommand 
                AnalyzeTableCommand 
                SetDatabaseCommand 
                UncacheTableCommand 
                ClearCacheCommand$ 
                DropFunctionCommand 
                AlterTableAddPartitionCommand 
                CreateDataSourceTableAsSelectCommand 
                ShowFunctionsCommand 
                DescribeTableCommand 
                ShowPartitionsCommand 
                LoadDataCommand 
                ShowColumnsCommand 
                ListJarsCommand 
                ShowCreateTableCommand 
                AlterTableDropPartitionCommand 
                AlterTableRenameCommand 
                InsertIntoDataSourceCommand 
                CreateTableLikeCommand 
                AlterTableSetLocationCommand 
                AlterDatabasePropertiesCommand 
                AlterTableRecoverPartitionsCommand 
                DescribeFunctionCommand 
                TruncateTableCommand 
                AlterViewAsCommand 
                ShowTablePropertiesCommand 
                CreateFunctionCommand 
                InsertIntoHadoopFsRelationCommand 
                AlterTableSerDePropertiesCommand 
                ResetCommand$ 
                CreateTempViewUsing 
                AlterTableRenamePartitionCommand 
        StreamingRelation 
        StreamingExecutionRelation 
        Range 
        ExternalRDD 
        UnresolvedTableValuedFunction 
        MetastoreRelation 
        MemoryPlan 
        SQLTable in SQLBuilder 
        UnresolvedRelation 
        InMemoryRelation 
        SimpleCatalogRelation 
        LogicalRelation 
        UnresolvedInlineTable 
        LogicalRDD 
        OneRowRelation$ 
    Union 
    EventTimeWatermark 
    UnaryNode : Only have one child Logical Plan
        DeserializeToObject 
        Filter 
        ScriptTransformation 
        Distinct 
        BroadcastHint 
        Project 
        With 
        Pivot 
        MapGroups 
        Window 
        GlobalLimit 
        WithWindowDefinition 
        ReturnAnswer 
        ObjectConsumer 
            AppendColumnsWithObject 
            SerializeFromObject 
            MapElements 
            MapPartitions 
            MapPartitionsInR 
        Aggregate 
        Generate 
        FlatMapGroupsInR 
        TypedFilter 
        GroupingSets 
        RedistributeData 
            RepartitionByExpression 
            SortPartitions 
        SubqueryAlias 
        Expand 
        LocalLimit 
        Sort 
        Repartition 
        Sample 
        AppendColumns 
    BinaryNode : Have two children Logical Plan
        CoGroup 
        SetOperation 
            Except 
            Intersect 
        Join 
    ObjectProducer 
        DeserializeToObject 
        MapGroups 
        CoGroup 
        ExternalRDD 
        FlatMapGroupsInR 
        MapElements 
        MapPartitions 
        MapPartitionsInR

本作品采用知识共享署名 4.0 国际许可协议进行许可。