Multiple Layer Perceptron Classifier(多层神经网络分类) in ML

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: Multiple Layer Perceptron Classifier(多层神经网络分类) in ML

引言

Logistic Regression(逻辑回归) in ML的基础上,现在来介绍多层神经网络分类方法。
神经网络分类就是模拟人类大脑认识物体的方式对输入进行分类。一般为了提高精确性和鲁棒性,都会设置多层进行分析,其中第一层为输入层,一般进行数据初分类,修剪等,最后一层为输出层,中间的为隐藏层。
每一层有多个神经元节点组成,它们都有各自的weight权值,越高越能决定输出值。本文根据Spark中的代码进行介绍,理论知识可参考《数字图像处理与机器视觉 Visual C++与Matlab实现》第12章介绍。
和逻辑回归的相比较,主要是它们的梯度计算器和更新器不一样,且神经网络的梯度计算和损失计算都是多层的。

代码分析
测试代码

以MultilayerPerceptronClassifierSuite类中的测试 test(“3 class classification with 2 hidden layers”)为例。
此单元测试用了多层神经网络和逻辑回归两种方式进行模型训练,并对训练结果进行了比较。

  test("3 class classification with 2 hidden layers") {
    val nPoints = 1000
 
    // The following coefficients are taken from OneVsRestSuite.scala
    // they represent 3-class iris dataset
    val coefficients = Array(
      -0.57997, 0.912083, -0.371077, -0.819866, 2.688191,
      -0.16624, -0.84355, -0.048509, -0.301789, 4.170682)
 
    val xMean = Array(5.843, 3.057, 3.758, 1.199)
    val xVariance = Array(0.6856, 0.1899, 3.116, 0.581)
    // the input seed is somewhat magic, to make this test pass
//用输入的系数,均值和方差生成多类逻辑回归输入数据
    val data = generateMultinomialLogisticInput(
      coefficients, xMean, xVariance, true, nPoints, 1).toDS()
    val dataFrame = data.toDF("label", "features")
    val numClasses = 3//3类
    val numIterations = 100//最大迭代次数100
//层神经元数量,输入为4个,输出为3个,隐藏层分别为5,4
    val layers = Array[Int](4, 5, 4, numClasses)
    val trainer = new MultilayerPerceptronClassifier()
      .setLayers(layers)
      .setBlockSize(1)
      .setSeed(11L) // currently this seed is ignored
      .setMaxIter(numIterations)
    val model = trainer.fit(dataFrame)//下一节继续
    val numFeatures = dataFrame.select("features").first().getAs[Vector](0).size
    assert(model.numFeatures === numFeatures)
    val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label").rdd.map {
      case Row(p: Double, l: Double) => (p, l)
    }
    // train multinomial logistic regression
    val lr = new LogisticRegressionWithLBFGS()
      .setIntercept(true)
      .setNumClasses(numClasses)
    lr.optimizer.setRegParam(0.0)
      .setNumIterations(numIterations)
    val lrModel = lr.run(data.rdd.map(OldLabeledPoint.fromML))
    val lrPredictionAndLabels =
      lrModel.predict(data.rdd.map(p => OldVectors.fromML(p.features))).zip(data.rdd.map(_.label))
    // MLP's predictions should not differ a lot from LR's.
    val lrMetrics = new MulticlassMetrics(lrPredictionAndLabels)
    val mlpMetrics = new MulticlassMetrics(mlpPredictionAndLabels)
    assert(mlpMetrics.confusionMatrix.asML ~== lrMetrics.confusionMatrix.asML absTol 100)
  }

多层神经网络分类器

优化器(BreezeLBFGS)采用开源的Breeze库的LBFGS计算方法。
LBFGS计算方法的推导见LBFGS方法推导

MultilayerPerceptronClassifier.scala中的代码

  /**
   * Train a model using the given dataset and parameters.
   * Developers can implement this instead of [[fit()]] to avoid dealing with schema validation
   * and copying parameters into the model.
   *
   * @param dataset Training dataset
   * @return Fitted model
   */
  override protected def train(dataset: Dataset[_]): MultilayerPerceptronClassificationModel = {
    val myLayers = $(layers)
    val labels = myLayers.last//分类数
    val lpData = extractLabeledPoints(dataset)
    val data = lpData.map(lp => LabelConverter.encodeLabeledPoint(lp, labels))//输入数据
    val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true)//创建前向反馈拓扑
    val trainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last)//创建前向反馈训练器
    if (isDefined(initialWeights)) {
      trainer.setWeights($(initialWeights))
    } else {
      trainer.setSeed($(seed))//随机创建初始权值
    }
    if ($(solver) == MultilayerPerceptronClassifier.LBFGS) {
//本测试采用LBFGS进行优化分析,此方法会传入梯度计算器和权值更新器(见下面)
//  private var _gradient: Gradient = new ANNGradient(topology, dataStacker)
//  private var _updater: Updater = new ANNUpdater()
//  def LBFGSOptimizer: LBFGS = {
//    val lbfgs = new LBFGS(_gradient, _updater)
//    optimizer = lbfgs
//    lbfgs
//  }
      trainer.LBFGSOptimizer
        .setConvergenceTol($(tol))
        .setNumIterations($(maxIter))
    } else if ($(solver) == MultilayerPerceptronClassifier.GD) {
      trainer.SGDOptimizer
        .setNumIterations($(maxIter))
        .setConvergenceTol($(tol))
        .setStepSize($(stepSize))
    } else {
      throw new IllegalArgumentException(
        s"The solver $solver is not supported by MultilayerPerceptronClassifier.")
    }
    trainer.setStackSize($(blockSize))
    val mlpModel = trainer.train(data)
    new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights)
  }
}
 
 
/**
 * MLlib-style trainer class that trains a network given the data and topology
 *
 * @param topology topology of ANN
 * @param inputSize input size
 * @param outputSize output size
 */
private[ml] class FeedForwardTrainer(
    topology: Topology,
    val inputSize: Int,
    val outputSize: Int) extends Serializable {
......
  /**
   * Trains the ANN
   *
   * @param data RDD of input and output vector pairs
   * @return model
   */
  def train(data: RDD[(Vector, Vector)]): TopologyModel = {
    val w = if (getWeights == null) {
      // TODO: will make a copy if vector is a subvector of BDV (see Vectors code)
      topology.model(_seed).weights
    } else {
      getWeights
    }
    // TODO: deprecate standard optimizer because it needs Vector
    val trainData = dataStacker.stack(data).map { v =>
      (v._1, OldVectors.fromML(v._2))
    }
    val handlePersistence = trainData.getStorageLevel == StorageLevel.NONE
    if (handlePersistence) trainData.persist(StorageLevel.MEMORY_AND_DISK)
//调用LBFGS优化器开始优化
    val newWeights = optimizer.optimize(trainData, w)
    if (handlePersistence) trainData.unpersist()
    topology.model(newWeights)
  }
 
}
创建前向反馈拓扑
/**
 * Factory for some of the frequently-used topologies
 */
private[ml] object FeedForwardTopology {
......
  /**
   * Creates a multi-layer perceptron
   *
   * @param layerSizes sizes of layers including input and output size
   * @param softmaxOnTop whether to use SoftMax or Sigmoid function for an output layer.
   *                Softmax is default
   * @return multilayer perceptron topology
   */
  def multiLayerPerceptron(
    layerSizes: Array[Int],
    softmaxOnTop: Boolean = true): FeedForwardTopology = {
    val layers = new Array[Layer]((layerSizes.length - 1) * 2)
    for (i <- 0 until layerSizes.length - 1) {
      layers(i * 2) = new AffineLayer(layerSizes(i), layerSizes(i + 1))
      layers(i * 2 + 1) =
        if (i == layerSizes.length - 2) {
          if (softmaxOnTop) {
            new SoftmaxLayerWithCrossEntropyLoss()
          } else {
            // TODO: squared error is more natural but converges slower
            new SigmoidLayerWithSquaredError()
          }
        } else {
          new FunctionalLayer(new SigmoidFunction())
        }
    }
    FeedForwardTopology(layers)
//这儿创建的层依次为:
//0 = {AffineLayer@7887} 
//1 = {FunctionalLayer@7888} 
//2 = {AffineLayer@7889} 
//3 = {FunctionalLayer@7890} 
//4 = {AffineLayer@7891} 
//5 = {SoftmaxLayerWithCrossEntropyLoss@7892} 
//AffineLayer对输入数据进行仿射计算,即 W X = B。
//FunctionalLayer对数据应用函数计算,这儿为SigmoidFunction,即x => 1.0 / (1 + math.exp(-x))。
//SoftmaxLayerWithCrossEntropyLoss对输出进行loss损失计算
  }
}
ANNGradient计算梯度
/**
 * Neural network gradient. Does nothing but calling Model's gradient
 *
 * @param topology topology
 * @param dataStacker data stacker
 */
private[ann] class ANNGradient(topology: Topology, dataStacker: DataStacker) extends Gradient {
  override def compute(
    data: OldVector,
    label: Double,
    weights: OldVector,
    cumGradient: OldVector): Double = {
    val (input, target, realBatchSize) = dataStacker.unstack(data)
    val model = topology.model(weights)
//调用FeedForwardModel.computeGradient计算梯度
    model.computeGradient(input, target, cumGradient, realBatchSize)
  }
}
/**
 * Model of Feed Forward Neural Network.
 * Implements forward, gradient computation and can return weights in vector format.
 *
 * @param weights network weights
 * @param topology network topology
 */
private[ml] class FeedForwardModel private(
    val weights: Vector,
    val topology: FeedForwardTopology) extends TopologyModel {
 
  val layers = topology.layers
  val layerModels = new Array[LayerModel](layers.length)
  private var offset = 0
  for (i <- 0 until layers.length) {
    layerModels(i) = layers(i).createModel(
      new BDV[Double](weights.toArray, offset, 1, layers(i).weightSize))
    offset += layers(i).weightSize
  }
  private var outputs: Array[BDM[Double]] = null
  private var deltas: Array[BDM[Double]] = null
//迭代拓扑中的层对输入数据依次计算并输出,前一个的输出是后一个的输入
  override def forward(data: BDM[Double]): Array[BDM[Double]] = {
    // Initialize output arrays for all layers. Special treatment for InPlace
    val currentBatchSize = data.cols
    // TODO: allocate outputs as one big array and then create BDMs from it
    if (outputs == null || outputs(0).cols != currentBatchSize) {
      outputs = new Array[BDM[Double]](layers.length)
      var inputSize = data.rows
      for (i <- 0 until layers.length) {
        if (layers(i).inPlace) {
          outputs(i) = outputs(i - 1)
        } else {
          val outputSize = layers(i).getOutputSize(inputSize)
          outputs(i) = new BDM[Double](outputSize, currentBatchSize)
          inputSize = outputSize
        }
      }
    }
    layerModels(0).eval(data, outputs(0))
    for (i <- 1 until layerModels.length) {
      layerModels(i).eval(outputs(i - 1), outputs(i))
    }
    outputs
  }
 
  override def computeGradient(
    data: BDM[Double],
    target: BDM[Double],
    cumGradient: Vector,
    realBatchSize: Int): Double = {
    val outputs = forward(data)
    val currentBatchSize = data.cols
    // TODO: allocate deltas as one big array and then create BDMs from it
    if (deltas == null || deltas(0).cols != currentBatchSize) {
      deltas = new Array[BDM[Double]](layerModels.length)
      var inputSize = data.rows
      for (i <- 0 until layerModels.length - 1) {
        val outputSize = layers(i).getOutputSize(inputSize)
        deltas(i) = new BDM[Double](outputSize, currentBatchSize)
        inputSize = outputSize
      }
    }
//计算整个的损失loss
    val L = layerModels.length - 1
    // TODO: explain why delta of top layer is null (because it might contain loss+layer)
    val loss = layerModels.last match {
      case levelWithError: LossFunction => levelWithError.loss(outputs.last, target, deltas(L - 1))
      case _ =>
        throw new UnsupportedOperationException("Top layer is required to have objective.")
    }
    for (i <- (L - 2) to (0, -1)) {
      layerModels(i + 1).computePrevDelta(deltas(i + 1), outputs(i + 1), deltas(i))
    }
//计算梯度,用来更新权值
    val cumGradientArray = cumGradient.toArray
    var offset = 0
    for (i <- 0 until layerModels.length) {
      val input = if (i == 0) data else outputs(i - 1)
      layerModels(i).grad(deltas(i), input,
        new BDV[Double](cumGradientArray, offset, 1, layers(i).weightSize))
      offset += layers(i).weightSize
    }
    loss
  }
 
  override def predict(data: Vector): Vector = {
    val size = data.size
    val result = forward(new BDM[Double](size, 1, data.toArray))
    Vectors.dense(result.last.toArray)
  }
}
ANNUpdater权值更新器
/**
 * Simple updater
 */
private[ann] class ANNUpdater extends Updater {
 
  override def compute(
    weightsOld: OldVector,
    gradient: OldVector,
    stepSize: Double,
    iter: Int,
    regParam: Double): (OldVector, Double) = {
    val thisIterStepSize = stepSize
    val brzWeights: BV[Double] = weightsOld.asBreeze.toDenseVector
    Baxpy(-thisIterStepSize, gradient.asBreeze, brzWeights)
    (OldVectors.fromBreeze(brzWeights), 0)
  }
}

本作品采用知识共享署名 4.0 国际许可协议进行许可。

发表回复