原创文章,转载请注明: 转载自慢慢的回味
本文链接地址: Multiple Layer Perceptron Classifier(多层神经网络分类) in ML
Content:
引言
在Logistic Regression(逻辑回归) in ML的基础上,现在来介绍多层神经网络分类方法。
神经网络分类就是模拟人类大脑认识物体的方式对输入进行分类。一般为了提高精确性和鲁棒性,都会设置多层进行分析,其中第一层为输入层,一般进行数据初分类,修剪等,最后一层为输出层,中间的为隐藏层。
每一层有多个神经元节点组成,它们都有各自的weight权值,越高越能决定输出值。本文根据Spark中的代码进行介绍,理论知识可参考《数字图像处理与机器视觉 Visual C++与Matlab实现》第12章介绍。
和逻辑回归的相比较,主要是它们的梯度计算器和更新器不一样,且神经网络的梯度计算和损失计算都是多层的。
代码分析
测试代码
以MultilayerPerceptronClassifierSuite类中的测试 test(“3 class classification with 2 hidden layers”)为例。
此单元测试用了多层神经网络和逻辑回归两种方式进行模型训练,并对训练结果进行了比较。
test("3 class classification with 2 hidden layers") { val nPoints = 1000 // The following coefficients are taken from OneVsRestSuite.scala // they represent 3-class iris dataset val coefficients = Array( -0.57997, 0.912083, -0.371077, -0.819866, 2.688191, -0.16624, -0.84355, -0.048509, -0.301789, 4.170682) val xMean = Array(5.843, 3.057, 3.758, 1.199) val xVariance = Array(0.6856, 0.1899, 3.116, 0.581) // the input seed is somewhat magic, to make this test pass //用输入的系数,均值和方差生成多类逻辑回归输入数据 val data = generateMultinomialLogisticInput( coefficients, xMean, xVariance, true, nPoints, 1).toDS() val dataFrame = data.toDF("label", "features") val numClasses = 3//3类 val numIterations = 100//最大迭代次数100 //层神经元数量,输入为4个,输出为3个,隐藏层分别为5,4 val layers = Array[Int](4, 5, 4, numClasses) val trainer = new MultilayerPerceptronClassifier() .setLayers(layers) .setBlockSize(1) .setSeed(11L) // currently this seed is ignored .setMaxIter(numIterations) val model = trainer.fit(dataFrame)//下一节继续 val numFeatures = dataFrame.select("features").first().getAs[Vector](0).size assert(model.numFeatures === numFeatures) val mlpPredictionAndLabels = model.transform(dataFrame).select("prediction", "label").rdd.map { case Row(p: Double, l: Double) => (p, l) } // train multinomial logistic regression val lr = new LogisticRegressionWithLBFGS() .setIntercept(true) .setNumClasses(numClasses) lr.optimizer.setRegParam(0.0) .setNumIterations(numIterations) val lrModel = lr.run(data.rdd.map(OldLabeledPoint.fromML)) val lrPredictionAndLabels = lrModel.predict(data.rdd.map(p => OldVectors.fromML(p.features))).zip(data.rdd.map(_.label)) // MLP's predictions should not differ a lot from LR's. val lrMetrics = new MulticlassMetrics(lrPredictionAndLabels) val mlpMetrics = new MulticlassMetrics(mlpPredictionAndLabels) assert(mlpMetrics.confusionMatrix.asML ~== lrMetrics.confusionMatrix.asML absTol 100) } |
多层神经网络分类器
优化器(BreezeLBFGS)采用开源的Breeze库的LBFGS计算方法。
LBFGS计算方法的推导见LBFGS方法推导
MultilayerPerceptronClassifier.scala中的代码
/** * Train a model using the given dataset and parameters. * Developers can implement this instead of [[fit()]] to avoid dealing with schema validation * and copying parameters into the model. * * @param dataset Training dataset * @return Fitted model */ override protected def train(dataset: Dataset[_]): MultilayerPerceptronClassificationModel = { val myLayers = $(layers) val labels = myLayers.last//分类数 val lpData = extractLabeledPoints(dataset) val data = lpData.map(lp => LabelConverter.encodeLabeledPoint(lp, labels))//输入数据 val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, softmaxOnTop = true)//创建前向反馈拓扑 val trainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last)//创建前向反馈训练器 if (isDefined(initialWeights)) { trainer.setWeights($(initialWeights)) } else { trainer.setSeed($(seed))//随机创建初始权值 } if ($(solver) == MultilayerPerceptronClassifier.LBFGS) { //本测试采用LBFGS进行优化分析,此方法会传入梯度计算器和权值更新器(见下面) // private var _gradient: Gradient = new ANNGradient(topology, dataStacker) // private var _updater: Updater = new ANNUpdater() // def LBFGSOptimizer: LBFGS = { // val lbfgs = new LBFGS(_gradient, _updater) // optimizer = lbfgs // lbfgs // } trainer.LBFGSOptimizer .setConvergenceTol($(tol)) .setNumIterations($(maxIter)) } else if ($(solver) == MultilayerPerceptronClassifier.GD) { trainer.SGDOptimizer .setNumIterations($(maxIter)) .setConvergenceTol($(tol)) .setStepSize($(stepSize)) } else { throw new IllegalArgumentException( s"The solver $solver is not supported by MultilayerPerceptronClassifier.") } trainer.setStackSize($(blockSize)) val mlpModel = trainer.train(data) new MultilayerPerceptronClassificationModel(uid, myLayers, mlpModel.weights) } } /** * MLlib-style trainer class that trains a network given the data and topology * * @param topology topology of ANN * @param inputSize input size * @param outputSize output size */ private[ml] class FeedForwardTrainer( topology: Topology, val inputSize: Int, val outputSize: Int) extends Serializable { ...... /** * Trains the ANN * * @param data RDD of input and output vector pairs * @return model */ def train(data: RDD[(Vector, Vector)]): TopologyModel = { val w = if (getWeights == null) { // TODO: will make a copy if vector is a subvector of BDV (see Vectors code) topology.model(_seed).weights } else { getWeights } // TODO: deprecate standard optimizer because it needs Vector val trainData = dataStacker.stack(data).map { v => (v._1, OldVectors.fromML(v._2)) } val handlePersistence = trainData.getStorageLevel == StorageLevel.NONE if (handlePersistence) trainData.persist(StorageLevel.MEMORY_AND_DISK) //调用LBFGS优化器开始优化 val newWeights = optimizer.optimize(trainData, w) if (handlePersistence) trainData.unpersist() topology.model(newWeights) } } |
创建前向反馈拓扑
/** * Factory for some of the frequently-used topologies */ private[ml] object FeedForwardTopology { ...... /** * Creates a multi-layer perceptron * * @param layerSizes sizes of layers including input and output size * @param softmaxOnTop whether to use SoftMax or Sigmoid function for an output layer. * Softmax is default * @return multilayer perceptron topology */ def multiLayerPerceptron( layerSizes: Array[Int], softmaxOnTop: Boolean = true): FeedForwardTopology = { val layers = new Array[Layer]((layerSizes.length - 1) * 2) for (i <- 0 until layerSizes.length - 1) { layers(i * 2) = new AffineLayer(layerSizes(i), layerSizes(i + 1)) layers(i * 2 + 1) = if (i == layerSizes.length - 2) { if (softmaxOnTop) { new SoftmaxLayerWithCrossEntropyLoss() } else { // TODO: squared error is more natural but converges slower new SigmoidLayerWithSquaredError() } } else { new FunctionalLayer(new SigmoidFunction()) } } FeedForwardTopology(layers) //这儿创建的层依次为: //0 = {AffineLayer@7887} //1 = {FunctionalLayer@7888} //2 = {AffineLayer@7889} //3 = {FunctionalLayer@7890} //4 = {AffineLayer@7891} //5 = {SoftmaxLayerWithCrossEntropyLoss@7892} //AffineLayer对输入数据进行仿射计算,即 W X = B。 //FunctionalLayer对数据应用函数计算,这儿为SigmoidFunction,即x => 1.0 / (1 + math.exp(-x))。 //SoftmaxLayerWithCrossEntropyLoss对输出进行loss损失计算 } } |
ANNGradient计算梯度
/** * Neural network gradient. Does nothing but calling Model's gradient * * @param topology topology * @param dataStacker data stacker */ private[ann] class ANNGradient(topology: Topology, dataStacker: DataStacker) extends Gradient { override def compute( data: OldVector, label: Double, weights: OldVector, cumGradient: OldVector): Double = { val (input, target, realBatchSize) = dataStacker.unstack(data) val model = topology.model(weights) //调用FeedForwardModel.computeGradient计算梯度 model.computeGradient(input, target, cumGradient, realBatchSize) } } /** * Model of Feed Forward Neural Network. * Implements forward, gradient computation and can return weights in vector format. * * @param weights network weights * @param topology network topology */ private[ml] class FeedForwardModel private( val weights: Vector, val topology: FeedForwardTopology) extends TopologyModel { val layers = topology.layers val layerModels = new Array[LayerModel](layers.length) private var offset = 0 for (i <- 0 until layers.length) { layerModels(i) = layers(i).createModel( new BDV[Double](weights.toArray, offset, 1, layers(i).weightSize)) offset += layers(i).weightSize } private var outputs: Array[BDM[Double]] = null private var deltas: Array[BDM[Double]] = null //迭代拓扑中的层对输入数据依次计算并输出,前一个的输出是后一个的输入 override def forward(data: BDM[Double]): Array[BDM[Double]] = { // Initialize output arrays for all layers. Special treatment for InPlace val currentBatchSize = data.cols // TODO: allocate outputs as one big array and then create BDMs from it if (outputs == null || outputs(0).cols != currentBatchSize) { outputs = new Array[BDM[Double]](layers.length) var inputSize = data.rows for (i <- 0 until layers.length) { if (layers(i).inPlace) { outputs(i) = outputs(i - 1) } else { val outputSize = layers(i).getOutputSize(inputSize) outputs(i) = new BDM[Double](outputSize, currentBatchSize) inputSize = outputSize } } } layerModels(0).eval(data, outputs(0)) for (i <- 1 until layerModels.length) { layerModels(i).eval(outputs(i - 1), outputs(i)) } outputs } override def computeGradient( data: BDM[Double], target: BDM[Double], cumGradient: Vector, realBatchSize: Int): Double = { val outputs = forward(data) val currentBatchSize = data.cols // TODO: allocate deltas as one big array and then create BDMs from it if (deltas == null || deltas(0).cols != currentBatchSize) { deltas = new Array[BDM[Double]](layerModels.length) var inputSize = data.rows for (i <- 0 until layerModels.length - 1) { val outputSize = layers(i).getOutputSize(inputSize) deltas(i) = new BDM[Double](outputSize, currentBatchSize) inputSize = outputSize } } //计算整个的损失loss val L = layerModels.length - 1 // TODO: explain why delta of top layer is null (because it might contain loss+layer) val loss = layerModels.last match { case levelWithError: LossFunction => levelWithError.loss(outputs.last, target, deltas(L - 1)) case _ => throw new UnsupportedOperationException("Top layer is required to have objective.") } for (i <- (L - 2) to (0, -1)) { layerModels(i + 1).computePrevDelta(deltas(i + 1), outputs(i + 1), deltas(i)) } //计算梯度,用来更新权值 val cumGradientArray = cumGradient.toArray var offset = 0 for (i <- 0 until layerModels.length) { val input = if (i == 0) data else outputs(i - 1) layerModels(i).grad(deltas(i), input, new BDV[Double](cumGradientArray, offset, 1, layers(i).weightSize)) offset += layers(i).weightSize } loss } override def predict(data: Vector): Vector = { val size = data.size val result = forward(new BDM[Double](size, 1, data.toArray)) Vectors.dense(result.last.toArray) } } |
ANNUpdater权值更新器
/** * Simple updater */ private[ann] class ANNUpdater extends Updater { override def compute( weightsOld: OldVector, gradient: OldVector, stepSize: Double, iter: Int, regParam: Double): (OldVector, Double) = { val thisIterStepSize = stepSize val brzWeights: BV[Double] = weightsOld.asBreeze.toDenseVector Baxpy(-thisIterStepSize, gradient.asBreeze, brzWeights) (OldVectors.fromBreeze(brzWeights), 0) } } |
本作品采用知识共享署名 4.0 国际许可协议进行许可。