RDD execute flow

原创文章,转载请注明: 转载自慢慢的回味

本文链接地址: RDD execute flow

This article will explain how the RDD to be executed with comments on code.
The spark git version is: SHA-1: 8b325b17ecdf013b7a6edcb7ee3773546bd914df

1 Using a unit test for example
46
47
48
49
50
51
52
53
54
  class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
 
  test("Using TorrentBroadcast locally") {
    sc = new SparkContext("local", "test")
    val list = List[Int](1, 2, 3, 4)
    val broadcast = sc.broadcast(list)
    val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
    assert(results.collect().toSet === Set((1, 10), (2, 10)))
  }
2 Broadcast the data
  val broadcast = sc.broadcast(list)
 
  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
      "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }
BroadcastManager.scala
55
56
57
  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }
TorrentBroadcastFactory.scala
  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
    new TorrentBroadcast[T](value_, id)
  }
TorrentBroadcast.scala
  //lazy keyword is good for here, transfer the data to block store
  @transient private lazy val _value: T = readBroadcastBlock()
  //read the data from block store
  private val numBlocks: Int = writeBlocks(obj)
 
  override protected def getValue() = {
    _value
  }
3 Create RDD

It will create two RDDs,first parallelize the data, then a map,
the map RDD will reference the function. They are not be executed immediately.

  val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
 
  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }
 
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }
4 Run RDD to get the result
  results.collect()
 
  //Here is the really area for RDD execute
  def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

继续阅读“RDD execute flow”本作品采用知识共享署名 4.0 国际许可协议进行许可。