ALS-WR(协同过滤推荐算法) in ML

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: ALS-WR(协同过滤推荐算法) in ML

简介

ALS-WR算法,简单地说就是:
(数据格式为:userId, itemId, rating, timestamp )
1 对每个userId随机初始化N(10)个factor值,由这些值影响userId的权重。
2 对每个itemId也随机初始化N(10)个factor值。
3 固定userId,从userFactors矩阵和rating矩阵中分解出itemFactors矩阵。即[Item Factors Matrix] = [User Factors Matrix]^-1 * [Rating Matrix].
4 固定itemId,从itemFactors矩阵和rating矩阵中分解出userFactors矩阵。即[User Factors Matrix] = [Item Factors Matrix]^-1 * [Rating Matrix].
5 重复迭代第3,第4步,最后可以收敛到稳定的userFactors和itemFactors。
6 对itemId进行推断就为userFactors * itemId = rating value;对userId进行推断就为itemFactors * userId = rating value。

理论推导可见Parallel-ALS推荐算法(factorize-movielens-1M)

继续阅读“ALS-WR(协同过滤推荐算法) in ML”本作品采用知识共享署名 4.0 国际许可协议进行许可。

KMeans(K均值)集群

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: KMeans(K均值)集群

引言

KMeans(K均值)集群算是集群算法中比较简单实现且又比较高效的算法了。
算法接受参数K即分类数,然后将输入数据划分为K个聚类使得:同一聚类中的对象相似度较高,而不同聚类中的对象相似度较小。
它一般的步骤为:
(1) 选择K个初始中心点,例如 C[0] = data[0], C[k-1] = data[k-1]。一般采用随机抽样的方法。
(2) 对于data[0] … data[n], 分别与C[0] … C[k-1]比较,假定与C[i]差值最少,就标记为i。计算差值有多种方式,如向量的距离计算方式。
(3) 对于所有标记为i点,重新计算C[i]={ 所有标记为i的data[j]之和 } / 标记为i的个数。
(4) 重复(2)(3),直到所有C[i]值的变化小于给定阈值或达到输入的最大迭代次数。
以前说过Mahout中的实现kmeans集群算法(cluster-reuters)Fuzzykmeans集群算法(cluster-reuters),本文介绍Spark ML中的实现方式。

代码分析

继续阅读“KMeans(K均值)集群”本作品采用知识共享署名 4.0 国际许可协议进行许可。

Multiple Layer Perceptron Classifier(多层神经网络分类) in ML

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Multiple Layer Perceptron Classifier(多层神经网络分类) in ML

引言

Logistic Regression(逻辑回归) in ML的基础上,现在来介绍多层神经网络分类方法。
神经网络分类就是模拟人类大脑认识物体的方式对输入进行分类。一般为了提高精确性和鲁棒性,都会设置多层进行分析,其中第一层为输入层,一般进行数据初分类,修剪等,最后一层为输出层,中间的为隐藏层。
每一层有多个神经元节点组成,它们都有各自的weight权值,越高越能决定输出值。本文根据Spark中的代码进行介绍,理论知识可参考《数字图像处理与机器视觉 Visual C++与Matlab实现》第12章介绍。
和逻辑回归的相比较,主要是它们的梯度计算器和更新器不一样,且神经网络的梯度计算和损失计算都是多层的。

代码分析
测试代码

以MultilayerPerceptronClassifierSuite类中的测试 test(“3 class classification with 2 hidden layers”)为例。
此单元测试用了多层神经网络和逻辑回归两种方式进行模型训练,并对训练结果进行了比较。

  test("3 class classification with 2 hidden layers") {
    val nPoints = 1000
 
    // The following coefficients are taken from OneVsRestSuite.scala
    // they represent 3-class iris dataset
    val coefficients = Array(
      -0.57997, 0.912083, -0.371077, -0.819866, 2.688191,
      -0.16624, -0.84355, -0.048509, -0.301789, 4.170682)
 
    val xMean = Array(5.843, 3.057, 3.758, 1.199)
    val xVariance = Array(0.6856, 0.1899, 3.116, 0.581)
    // the input seed is somewhat magic, to make this test pass
//用输入的系数,均值和方差生成多类逻辑回归输入数据
    val data = generateMultinomialLogisticInput(
      coefficients, xMean, xVariance, true, nPoints, 1).toDS()
    val dataFrame = data.toDF("label", "features")
    val numClasses = 3//3类
    val numIterations = 100//最大迭代次数100
//层神经元数量,输入为4个,输出为3个,隐藏层分别为5,4
    val layers = Array[Int](4, 5, 4, numClasses)
    val trainer = new MultilayerPerceptronClassifier()
      .setLayers(layers)
      .setBlockSize(1)
      .setSeed(11L) // currently this seed is ignored
      .setMaxIter(numIterations)
    val model = trainer.fit(dataFrame)//下一节继续
    val numFeatures = dataFrame.select("features").first().getAs[Vector](0).size
    assert(model.numFeatures === numFeatures)
    val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label").rdd.map {
      case Row(p: Double, l: Double) => (p, l)
    }
    // train multinomial logistic regression
    val lr = new LogisticRegressionWithLBFGS()
      .setIntercept(true)
      .setNumClasses(numClasses)
    lr.optimizer.setRegParam(0.0)
      .setNumIterations(numIterations)
    val lrModel = lr.run(data.rdd.map(OldLabeledPoint.fromML))
    val lrPredictionAndLabels =
      lrModel.predict(data.rdd.map(p => OldVectors.fromML(p.features))).zip(data.rdd.map(_.label))
    // MLP's predictions should not differ a lot from LR's.
    val lrMetrics = new MulticlassMetrics(lrPredictionAndLabels)
    val mlpMetrics = new MulticlassMetrics(mlpPredictionAndLabels)
    assert(mlpMetrics.confusionMatrix.asML ~== lrMetrics.confusionMatrix.asML absTol 100)
  }

继续阅读“Multiple Layer Perceptron Classifier(多层神经网络分类) in ML”本作品采用知识共享署名 4.0 国际许可协议进行许可。

Random Forest(随机森林) in ML

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Random Forest(随机森林) in ML

引言

随机森林算法是机器学习等领域里的一个比较广泛的算法,它可以用来分类,回归等。随机森林由多个决策树组成,相对单个决策树算法,其效果更好,且不容易出现过度拟合。
其一般构建方法为:
1 构建几棵决策树树,使用装袋方法,用泊松分布或伯努力分布对样本进行分配到每棵树上。
2 从每棵树的根节点开始对属性(feature)进行分割
3 对每个分割点,找到最好的分割方式,把记录分成左右两个子节点,即哪个feature可以更好地使记录最纯,其中每个feature的所有分割点中总会有一个点是记录更纯。一般左子节点比右子节点纯。所谓最纯,就是说所有的数据按这个属性分割后无法用其它属性再分割。
4 子节点如果已最纯,就停止继续分割,否则继续以子节点为当前的根节点按照2,3步骤继续分割。
目前有3种纯度计算公式,Gini,Entropy熵和错误率:

    \[     Gini = 1 - \sum_{i=1}^n P(i)^2 \]

    \[     Entropy = - \sum_{i=1}^n P(i) * log_2^{P(i)} \]

    \[     Error = 1 - max P(i), i \in [1,n] \]

下面以Spark中的一个随机森林的单元测试代码为例分析随机森林模型的构建。

构建随机森林
  test("alternating categorical and continuous features with multiclass labels to test indexing") {
    val arr = Array(
      LabeledPoint(0.0, Vectors.dense(1.0, 0.0, 0.0, 3.0, 1.0)),  //A
      LabeledPoint(1.0, Vectors.dense(0.0, 1.0, 1.0, 1.0, 2.0)),  //B
      LabeledPoint(0.0, Vectors.dense(2.0, 0.0, 0.0, 6.0, 3.0)),  //C
      LabeledPoint(2.0, Vectors.dense(0.0, 2.0, 1.0, 3.0, 2.0))   //D
    )
    val rdd = sc.parallelize(arr)
    val categoricalFeatures = Map(0 -> 3, 2 -> 2, 4 -> 4)
    val numClasses = 3
 
    val rf = new RandomForestClassifier()
      .setImpurity("Gini")
      .setMaxDepth(5)
      .setNumTrees(2)
      .setFeatureSubsetStrategy("sqrt")
      .setSeed(12345)
    compareAPIs(rdd, rf, categoricalFeatures, numClasses)
  }
上面的测试数据中,共有3个分类(class):0.0 1.0 2.0,共有5个属性(feature),
其中3个为非连续属性,feature 0可分割为3类,feature 2可分割为2类,feature 4可分割为4类。
后续的代码对连续属性分类后,可得到属性的分割点如下:
	  0: 0 1 2     
	  1: 0 1 2    //连续属性分割结果
	  2: 0        //只有2个属性,转换为2分类属性,即用boolean表示即可
	  3: 1 3 6    //连续属性分割结果
	  4: 0 1 2 3

按照属性的分割点进行索引,样本可表示为:

    A  LabeledPoint(0.0, Vectors.dense(1.0, 0.0, 0.0, 3.0, 1.0)),    1 0 0 1 1      
    B  LabeledPoint(1.0, Vectors.dense(0.0, 1.0, 1.0, 1.0, 2.0)),    0 1 1 0 2
    C  LabeledPoint(0.0, Vectors.dense(2.0, 0.0, 0.0, 6.0, 3.0)),    2 0 0 2 3
    D  LabeledPoint(2.0, Vectors.dense(0.0, 2.0, 1.0, 3.0, 2.0))     0 2 1 1 2

下面代码抽样装袋后,本例可以构建为如下2棵树:(抽样的一种结果,其中tree1的3个feature和tree2的3个feature也是随机选择的)

	  tree 1(feature:0 1 3)                     tree 0(feature:0 1 4)
	   node 1(node index:0)                      node 1(node index:1)
样本        B                                        A 3次B样本
feature0  010 000 000                               030 100 000      
feature1  000 010 000                               100 030 000
feature2  	                                 
feature3  010 000 000
feature4                                            000 100 030 000 

上面的010 000 000等数据是DTStatsAggregator类中的allStats变量的值。
tree 1中allStats的数据可解读为feature 0, feature 1和feature 3顺序排列,各占9位。
每个feature含有3个bin(桶)(属性分割后),每个bin有3位,分别记录3个分类的数量。
如feature0 010 000 000理解为feature 0的第0个bin的分类1有1个样本。
如feature4 000 100 030 000理解为feature 4的第1个bin的分类0有1个样本,第2个bin的分类1有3个样本。
这些都是根据输入样本统计出的结果。
继续阅读“Random Forest(随机森林) in ML”本作品采用知识共享署名 4.0 国际许可协议进行许可。

NaiveBayes(朴素贝叶斯) in ML

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: NaiveBayes(朴素贝叶斯) in ML

贝叶斯定理例子

以《数据挖掘导论》里面的例子开始:
考虑两个主球队比赛:队0和队1,如果队0的胜率为65%,队1为35%。队0获胜时队1为主场为30%,而队1取胜的比赛中75%是在主场。问:下一场比赛在队1的主场进行,哪个队最有可能胜出呢?

这儿以贝叶斯定理来解答:
假设X,Y是一对随机变量,令X代表主场,Y代表胜利者。X、Y的取值范围为『0,1』。X=0表示队0,X=1表示队1,Y=0表示失败,Y=1表示胜利,则:
队0取胜的先验概率:P(Y=0)= 0.65,
队0取胜的先验概率:P(Y=1)= 0.35,
队1取胜的比赛中在主场的条件概率:P(X=1|Y=1)=0.75,
队0获胜时队1为主场的条件概率:P(X=1|Y=0)=0.3。

现在需要计算队1为主场的条件概率,即P(Y=1|X=1)和P(Y=0|X=1):
贝叶斯定理为:P(Y|X) = \frac{P(X|Y)P(Y)}{P(X)}
则:

    \begin{align*}     P(Y=1|X=1) &=  \frac{P(X=1|Y=1) P(Y=1)}{P(X=1)}    \\                &=  \frac{P(X=1|Y=1) P(Y=1)}{P(X=1,Y=1) + P(X=1,Y=0)}   \\                &=  \frac{P(X=1|Y=1) P(Y=1)}{P(X=1|Y=1)P(Y=1) + P(X=1|Y=0)P(Y=0)}   \\                &=  \frac{0.75*0.35}{0.75*0.35 + 0.3*0.65}   \\                &=  0.5738   \\     P(Y=0|X=1) &=  1 - P(Y=1|X=1) = 0.4262 \end{align}

所以队1更能赢得比赛。

朴素贝叶斯

朴素贝叶斯就是指类条件是独立的,即

    \[     P(X|Y=y) = \prod_{i=1}^d P(X_i|Y=y) \]

其中,X=\left\{ X_1, X_2, ...., X_d \right\}条件独立。
X = \left\{ f_1, f2, ..., f_m \right\}为一个待分类项,f就是X的一个特征(feature), 取值范围0,y就是Y的一个类别,取值范围0     \[     P(y_k|X) = max \left\{ P(y_1|X), P(y_2|X), ..., P(y_n|X) \right\}, that is X \in y_k  \]

现在就需要计算P(y_1|X), P(y_2|X), ..., P(y_n|X)就可以了,即样本X属于每个分类的条件概率。

由贝叶斯定理有:

    \begin{align*}     P(y_i|X) & = \frac{ P(X|y_i)P(y_i) }{ P(X) }   \\              & = \frac{ P(f_1|y_i)P(f_2|y_i)...P(f_m|y_i) P(y_i) }{ P(X) }   \\              & = \frac{ P(y_i) \prod_{j=1}^m P(f_j|y_i) }{ P(X) }  \end{align}

在下面的代码中,对应的变量为:

    \begin{align*}             pi & = piArray(0<j<numFeatures)   \\     piArray(i) & = math.log(n + lambda) - piLogDenom  \\                & = math.log(n + lambda) - math.log(numDocuments + numLabels * lambda) \\         P(y_i) & = \log \frac{ n + lambda }{ numDocuments + numLabels * lambda }  \\                \\          theta & = DenseMatrix(numLabels, numFeatures, thetaArray, true)  \\ thetaArray(i * numFeatures + j) & = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom  \\ thetaArray(i * numFeatures + j) & = math.log(sumTermFreqs(j) + lambda) \\                                 &   - math.log(sumTermFreqs.values.sum + numFeatures * lambda)  \\          P(f_j|y_i) & = \log \frac{ sumTermFreqs(j) + lambda }{ sumTermFreqs.values.sum + numFeatures * lambda } \end{align}

其中sumTermFreqs(j)表示分类i中特征j的词频,sumTermFreqs.values.sum表示分类i中词频之和,n为分类i的文档数,numDocuments为总的文档数,P(y_i)为分类i的先验概率,P(f_j|y_i)为特征j为分类i的条件概率,lambda为平滑参数,也可防止分类i或特征j缺少样本而分子为零。

代码分析
  test("Naive Bayes Multinomial") {
    val nPoints = 1000
    val piArray = Array(0.5, 0.1, 0.4).map(math.log)
    val thetaArray = Array(
      Array(0.70, 0.10, 0.10, 0.10), // label 0
      Array(0.10, 0.70, 0.10, 0.10), // label 1
      Array(0.10, 0.10, 0.70, 0.10)  // label 2
    ).map(_.map(math.log))
    val pi = Vectors.dense(piArray)
    val theta = new DenseMatrix(3, 4, thetaArray.flatten, true)
 
    val testDataset =
      generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial").toDF()
    val nb = new NaiveBayes().setSmoothing(1.0).setModelType("multinomial")
    val model = nb.fit(testDataset)
 
    validateModelFit(pi, theta, model)
    assert(model.hasParent)
 
    val validationDataset =
      generateNaiveBayesInput(piArray, thetaArray, nPoints, 17, "multinomial").toDF()
 
    val predictionAndLabels = model.transform(validationDataset).select("prediction", "label")
    validatePrediction(predictionAndLabels)
 
    val featureAndProbabilities = model.transform(validationDataset)
      .select("features", "probability")
    validateProbabilities(featureAndProbabilities, model, "multinomial")
  }
  override def fit(dataset: Dataset[_]): M = {
    // This handles a few items such as schema validation.
    // Developers only need to implement train().
    transformSchema(dataset.schema, logging = true)
 
    // Cast LabelCol to DoubleType and keep the metadata.
    val labelMeta = dataset.schema($(labelCol)).metadata
    val casted = dataset.withColumn($(labelCol), col($(labelCol)).cast(DoubleType), labelMeta)
 
    copyValues(train(casted).setParent(this))
  }
  override protected def train(dataset: Dataset[_]): NaiveBayesModel = {
    trainWithLabelCheck(dataset, positiveLabel = true)
  }
 
  /**
   * ml assumes input labels in range [0, numClasses). But this implementation
   * is also called by mllib NaiveBayes which allows other kinds of input labels
   * such as {-1, +1}. `positiveLabel` is used to determine whether the label
   * should be checked and it should be removed when we remove mllib NaiveBayes.
   */
  private[spark] def trainWithLabelCheck(
      dataset: Dataset[_],
      positiveLabel: Boolean): NaiveBayesModel = {
    if (positiveLabel) {
      val numClasses = getNumClasses(dataset)
      if (isDefined(thresholds)) {
        require($(thresholds).length == numClasses, this.getClass.getSimpleName +
          ".train() called with non-matching numClasses and thresholds.length." +
          s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}")
      }
    }
 
    val modelTypeValue = $(modelType)
    val requireValues: Vector => Unit = {
      modelTypeValue match {
        case Multinomial =>
          requireNonnegativeValues
        case Bernoulli =>
          requireZeroOneBernoulliValues
        case _ =>
          // This should never happen.
          throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
      }
    }
 
    val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
    val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
 
    // Aggregates term frequencies per label.
    // TODO: Calling aggregateByKey and collect creates two stages, we can implement something
    // TODO: similar to reduceByKeyLocally to save one stage.
    val aggregated = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd
      .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2)))
      }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))(
      seqOp = {
         case ((weightSum: Double, featureSum: DenseVector), (weight, features)) =>
           requireValues(features)
           BLAS.axpy(weight, features, featureSum)
           (weightSum + weight, featureSum)
      },
      combOp = {
         case ((weightSum1, featureSum1), (weightSum2, featureSum2)) =>
           BLAS.axpy(1.0, featureSum2, featureSum1)
           (weightSum1 + weightSum2, featureSum1)
      }).collect().sortBy(_._1)
 
    val numLabels = aggregated.length
    val numDocuments = aggregated.map(_._2._1).sum
 
    val labelArray = new Array[Double](numLabels)
    val piArray = new Array[Double](numLabels)
    val thetaArray = new Array[Double](numLabels * numFeatures)
 
    val lambda = $(smoothing)
    val piLogDenom = math.log(numDocuments + numLabels * lambda)
    var i = 0
    aggregated.foreach { case (label, (n, sumTermFreqs)) =>
      labelArray(i) = label
      piArray(i) = math.log(n + lambda) - piLogDenom
      val thetaLogDenom = $(modelType) match {
        case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda)
        case Bernoulli => math.log(n + 2.0 * lambda)
        case _ =>
          // This should never happen.
          throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
      }
      var j = 0
      while (j < numFeatures) {
        thetaArray(i * numFeatures + j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
        j += 1
      }
      i += 1
    }
 
    val pi = Vectors.dense(piArray)
    val theta = new DenseMatrix(numLabels, numFeatures, thetaArray, true)
    new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
  }

本作品采用知识共享署名 4.0 国际许可协议进行许可。