原创文章,转载请注明: 转载自慢慢的回味
本文链接地址: KMeans(K均值)集群
引言
KMeans(K均值)集群算是集群算法中比较简单实现且又比较高效的算法了。
算法接受参数K即分类数,然后将输入数据划分为K个聚类使得:同一聚类中的对象相似度较高,而不同聚类中的对象相似度较小。
它一般的步骤为:
(1) 选择K个初始中心点,例如 C[0] = data[0], C[k-1] = data[k-1]。一般采用随机抽样的方法。
(2) 对于data[0] … data[n], 分别与C[0] … C[k-1]比较,假定与C[i]差值最少,就标记为i。计算差值有多种方式,如向量的距离计算方式。
(3) 对于所有标记为i点,重新计算C[i]={ 所有标记为i的data[j]之和 } / 标记为i的个数。
(4) 重复(2)(3),直到所有C[i]值的变化小于给定阈值或达到输入的最大迭代次数。
以前说过Mahout中的实现kmeans集群算法(cluster-reuters),Fuzzykmeans集群算法(cluster-reuters),本文介绍Spark ML中的实现方式。
代码分析
测试代码
class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { final val k = 5 @transient var dataset: Dataset[_] = _ override def beforeAll(): Unit = { super.beforeAll() //生成50条数据,其中每条数据3个维度(feature),共5类 dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k) } ...... test("fit, transform and summary") { val predictionColName = "kmeans_prediction" val kmeans = new KMeans().setK(k).setPredictionCol(predictionColName).setSeed(1) //调用算法训练数据 val model = kmeans.fit(dataset) assert(model.clusterCenters.length === k) val transformed = model.transform(dataset) val expectedColumns = Array("features", predictionColName) expectedColumns.foreach { column => assert(transformed.columns.contains(column)) } val clusters = transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet assert(clusters.size === k) assert(clusters === Set(0, 1, 2, 3, 4)) assert(model.computeCost(dataset) < 0.1) assert(model.hasParent) // Check validity of model summary val numRows = dataset.count() assert(model.hasSummary) val summary: KMeansSummary = model.summary assert(summary.predictionCol === predictionColName) assert(summary.featuresCol === "features") assert(summary.predictions.count() === numRows) for (c <- Array(predictionColName, "features")) { assert(summary.predictions.columns.contains(c)) } assert(summary.cluster.columns === Array(predictionColName)) val clusterSizes = summary.clusterSizes assert(clusterSizes.length === k) assert(clusterSizes.sum === numRows) assert(clusterSizes.forall(_ >= 0)) model.setSummary(None) assert(!model.hasSummary) } ...... |
KMeans算法
测试代码最终调用org\apache\spark\mllib\clustering\KMeans.scala中的runAlgorithm方法
/** * K-means clustering with a k-means++ like initialization mode * (the k-means|| algorithm by Bahmani et al). * * This is an iterative algorithm that will make multiple passes over the data, so any RDDs given * to it should be cached by the user. */ @Since("0.8.0") class KMeans private ( private var k: Int, private var maxIterations: Int, private var initializationMode: String, private var initializationSteps: Int, private var epsilon: Double, private var seed: Long) extends Serializable with Logging { ...... /** * Implementation of K-Means algorithm. */ private def runAlgorithm( data: RDD[VectorWithNorm], instr: Option[Instrumentation[NewKMeans]]): KMeansModel = { val sc = data.sparkContext val initStartTime = System.nanoTime() //选取初始中心点 val centers = initialModel match { case Some(kMeansCenters) => kMeansCenters.clusterCenters.map(new VectorWithNorm(_)) case None => if (initializationMode == KMeans.RANDOM) { //随机选取数据中的点 initRandom(data) } else { //随机选取但尽量选取代价小的点,即初分配数据一下 initKMeansParallel(data) } } val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9 logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.") var converged = false var cost = 0.0 var iteration = 0 val iterationStartTime = System.nanoTime() instr.foreach(_.logNumFeatures(centers.head.vector.size)) // Execute iterations of Lloyd's algorithm until converged while (iteration < maxIterations && !converged) { val costAccum = sc.doubleAccumulator val bcCenters = sc.broadcast(centers) // Find the sum and count of points mapping to each center val totalContribs = data.mapPartitions { points => val thisCenters = bcCenters.value val dims = thisCenters.head.vector.size //对每个分类的每个维度进行累加和对样本数量进行累加,然后可计算新的中心点 val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims)) val counts = Array.fill(thisCenters.length)(0L) points.foreach { point => val (bestCenter, cost) = KMeans.findClosest(thisCenters, point) costAccum.add(cost) val sum = sums(bestCenter) axpy(1.0, point.vector, sum) counts(bestCenter) += 1 } counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator }.reduceByKey { case ((sum1, count1), (sum2, count2)) => axpy(1.0, sum2, sum1) (sum1, count1 + count2) }.collectAsMap() bcCenters.destroy(blocking = false) // Update the cluster centers and costs converged = true totalContribs.foreach { case (j, (sum, count)) => scal(1.0 / count, sum)//计算新的中心点 val newCenter = new VectorWithNorm(sum) //如果有某个中心点的变化距离大于规定阀值,则继续迭代 if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) { converged = false } centers(j) = newCenter } cost = costAccum.value iteration += 1 } val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9 logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.") if (iteration == maxIterations) { logInfo(s"KMeans reached the max number of iterations: $maxIterations.") } else { logInfo(s"KMeans converged in $iteration iterations.") } logInfo(s"The cost is $cost.") new KMeansModel(centers.map(_.vector)) } |
本作品采用知识共享署名 4.0 国际许可协议进行许可。