KMeans(K均值)集群

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: KMeans(K均值)集群

引言

KMeans(K均值)集群算是集群算法中比较简单实现且又比较高效的算法了。
算法接受参数K即分类数,然后将输入数据划分为K个聚类使得:同一聚类中的对象相似度较高,而不同聚类中的对象相似度较小。
它一般的步骤为:
(1) 选择K个初始中心点,例如 C[0] = data[0], C[k-1] = data[k-1]。一般采用随机抽样的方法。
(2) 对于data[0] … data[n], 分别与C[0] … C[k-1]比较,假定与C[i]差值最少,就标记为i。计算差值有多种方式,如向量的距离计算方式。
(3) 对于所有标记为i点,重新计算C[i]={ 所有标记为i的data[j]之和 } / 标记为i的个数。
(4) 重复(2)(3),直到所有C[i]值的变化小于给定阈值或达到输入的最大迭代次数。
以前说过Mahout中的实现kmeans集群算法(cluster-reuters)Fuzzykmeans集群算法(cluster-reuters),本文介绍Spark ML中的实现方式。

代码分析

测试代码
class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
 
  final val k = 5
  @transient var dataset: Dataset[_] = _
 
  override def beforeAll(): Unit = {
    super.beforeAll()
//生成50条数据,其中每条数据3个维度(feature),共5类
    dataset = KMeansSuite.generateKMeansData(spark, 50, 3, k)
  }
......
  test("fit, transform and summary") {
    val predictionColName = "kmeans_prediction"
    val kmeans = new KMeans().setK(k).setPredictionCol(predictionColName).setSeed(1)
//调用算法训练数据
    val model = kmeans.fit(dataset)
    assert(model.clusterCenters.length === k)
 
    val transformed = model.transform(dataset)
    val expectedColumns = Array("features", predictionColName)
    expectedColumns.foreach { column =>
      assert(transformed.columns.contains(column))
    }
    val clusters =
      transformed.select(predictionColName).rdd.map(_.getInt(0)).distinct().collect().toSet
    assert(clusters.size === k)
    assert(clusters === Set(0, 1, 2, 3, 4))
    assert(model.computeCost(dataset) < 0.1)
    assert(model.hasParent)
 
    // Check validity of model summary
    val numRows = dataset.count()
    assert(model.hasSummary)
    val summary: KMeansSummary = model.summary
    assert(summary.predictionCol === predictionColName)
    assert(summary.featuresCol === "features")
    assert(summary.predictions.count() === numRows)
    for (c <- Array(predictionColName, "features")) {
      assert(summary.predictions.columns.contains(c))
    }
    assert(summary.cluster.columns === Array(predictionColName))
    val clusterSizes = summary.clusterSizes
    assert(clusterSizes.length === k)
    assert(clusterSizes.sum === numRows)
    assert(clusterSizes.forall(_ >= 0))
 
    model.setSummary(None)
    assert(!model.hasSummary)
  }
......
KMeans算法

测试代码最终调用org\apache\spark\mllib\clustering\KMeans.scala中的runAlgorithm方法

/**
 * K-means clustering with a k-means++ like initialization mode
 * (the k-means|| algorithm by Bahmani et al).
 *
 * This is an iterative algorithm that will make multiple passes over the data, so any RDDs given
 * to it should be cached by the user.
 */
@Since("0.8.0")
class KMeans private (
    private var k: Int,
    private var maxIterations: Int,
    private var initializationMode: String,
    private var initializationSteps: Int,
    private var epsilon: Double,
    private var seed: Long) extends Serializable with Logging {
......
  /**
   * Implementation of K-Means algorithm.
   */
  private def runAlgorithm(
      data: RDD[VectorWithNorm],
      instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {
 
    val sc = data.sparkContext
 
    val initStartTime = System.nanoTime()
//选取初始中心点
    val centers = initialModel match {
      case Some(kMeansCenters) =>
        kMeansCenters.clusterCenters.map(new VectorWithNorm(_))
      case None =>
        if (initializationMode == KMeans.RANDOM) {
//随机选取数据中的点
          initRandom(data)
        } else {
//随机选取但尽量选取代价小的点,即初分配数据一下
          initKMeansParallel(data)
        }
    }
    val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
    logInfo(f"Initialization with $initializationMode took $initTimeInSeconds%.3f seconds.")
 
    var converged = false
    var cost = 0.0
    var iteration = 0
 
    val iterationStartTime = System.nanoTime()
 
    instr.foreach(_.logNumFeatures(centers.head.vector.size))
 
    // Execute iterations of Lloyd's algorithm until converged
    while (iteration < maxIterations && !converged) {
      val costAccum = sc.doubleAccumulator
      val bcCenters = sc.broadcast(centers)
 
      // Find the sum and count of points mapping to each center
      val totalContribs = data.mapPartitions { points =>
        val thisCenters = bcCenters.value
        val dims = thisCenters.head.vector.size
//对每个分类的每个维度进行累加和对样本数量进行累加,然后可计算新的中心点
        val sums = Array.fill(thisCenters.length)(Vectors.zeros(dims))
        val counts = Array.fill(thisCenters.length)(0L)
 
        points.foreach { point =>
          val (bestCenter, cost) = KMeans.findClosest(thisCenters, point)
          costAccum.add(cost)
          val sum = sums(bestCenter)
          axpy(1.0, point.vector, sum)
          counts(bestCenter) += 1
        }
 
        counts.indices.filter(counts(_) > 0).map(j => (j, (sums(j), counts(j)))).iterator
      }.reduceByKey { case ((sum1, count1), (sum2, count2)) =>
        axpy(1.0, sum2, sum1)
        (sum1, count1 + count2)
      }.collectAsMap()
 
      bcCenters.destroy(blocking = false)
 
      // Update the cluster centers and costs
      converged = true
      totalContribs.foreach { case (j, (sum, count)) =>
        scal(1.0 / count, sum)//计算新的中心点
        val newCenter = new VectorWithNorm(sum)
//如果有某个中心点的变化距离大于规定阀值,则继续迭代
        if (converged && KMeans.fastSquaredDistance(newCenter, centers(j)) > epsilon * epsilon) {
          converged = false
        }
        centers(j) = newCenter
      }
 
      cost = costAccum.value
      iteration += 1
    }
 
    val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) / 1e9
    logInfo(f"Iterations took $iterationTimeInSeconds%.3f seconds.")
 
    if (iteration == maxIterations) {
      logInfo(s"KMeans reached the max number of iterations: $maxIterations.")
    } else {
      logInfo(s"KMeans converged in $iteration iterations.")
    }
 
    logInfo(s"The cost is $cost.")
 
    new KMeansModel(centers.map(_.vector))
  }

本作品采用知识共享署名 4.0 国际许可协议进行许可。

发表回复