This article will explain how the RDD to be executed with comments on code.
The spark git version is: SHA-1: 8b325b17ecdf013b7a6edcb7ee3773546bd914df
1 Using a unit test for example
46
47
48
49
50
51
52
53
54
| class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
test("Using TorrentBroadcast locally") {
sc = new SparkContext("local", "test")
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
assert(results.collect().toSet === Set((1, 10), (2, 10)))
} |
class BroadcastSuite extends SparkFunSuite with LocalSparkContext {
test("Using TorrentBroadcast locally") {
sc = new SparkContext("local", "test")
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
assert(results.collect().toSet === Set((1, 10), (2, 10)))
}
2 Broadcast the data
val broadcast = sc.broadcast(list)
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
assertNotStopped()
require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
} |
val broadcast = sc.broadcast(list)
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
assertNotStopped()
require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}
BroadcastManager.scala
55
56
57
| def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
} |
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}
TorrentBroadcastFactory.scala
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
new TorrentBroadcast[T](value_, id)
} |
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
new TorrentBroadcast[T](value_, id)
}
TorrentBroadcast.scala
//lazy keyword is good for here, transfer the data to block store
@transient private lazy val _value: T = readBroadcastBlock()
//read the data from block store
private val numBlocks: Int = writeBlocks(obj)
override protected def getValue() = {
_value
} |
//lazy keyword is good for here, transfer the data to block store
@transient private lazy val _value: T = readBroadcastBlock()
//read the data from block store
private val numBlocks: Int = writeBlocks(obj)
override protected def getValue() = {
_value
}
3 Create RDD
It will create two RDDs,first parallelize the data, then a map,
the map RDD will reference the function. They are not be executed immediately.
val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
} |
val results = sc.parallelize(1 to 2).map(x => (x, broadcast.value.sum))
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
4 Run RDD to get the result
results.collect()
//Here is the really area for RDD execute
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
} |
results.collect()
//Here is the really area for RDD execute
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
继续阅读“RDD execute flow”本作品采用知识共享署名 4.0 国际许可协议进行许可。