What are broadcast variables and what problems do they solve ?
What are broadcast variables and what problems do they solve ?
From Spark Closures, When large numbers of array which is accessed, for instance some dummy data and this array will be shipped to each spark node with closure.
Eg: When there is 10 nodes cluster with 100 partitions (10 partitions per node), For 100 times, the array will be distributed as 10 times to each node.
When broadcast is used and By using efficient p2p protocol, this is distributed once per node.
val array: Array[Int] = ??? // some huge array val broadcasted = sc.broadcast(array)
With the RDD
val rdd: RDD[Int] = ???
Every time the case array will be shipped with closure.
rdd.map(i => array.contains(i))
and The more performance is seen with broadcast.
rdd.map(i => broadcasted.value.contains(i))
broadcast variable is any variable, other than the loop variable or a sliced variable, that does not change inside the loop. At the start of a parfor -loop, the values of any broadcast variables are sent to all workers. This type of variable can be useful or even essential for particular tasks.
Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.
If you have huge array that is accessed from Spark Closures, for example some reference data, this array will be shipped to each spark node with closure. For example if you have 10 nodes cluster with 100 partitions (10 partitions per node), this Array will be distributed at least 100 times (10 times to each node).
If you use broadcast it will be distributed once per node using efficient p2p protocol.
val array: Array[Int] = ??? // some huge array
val broadcasted = sc.broadcast(array)
And some RDD
val rdd: RDD[Int] = ???
In this case array will be shipped with closure each time
rdd.map(i => array.contains(i))
and with broadcast you’ll get huge performance benefit
rdd.map(i => broadcasted.value.contains(i))
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
Considering the above, what are the use cases of broadcast variables? What problems do broadcast variables solve?
When we create any broadcast variable like below, the variable reference, here it is broadcastVar
available in all the nodes in the cluster?
val broadcastVar = sc.broadcast(Array(1, 2, 3)
If you have huge array that is accessed from Spark Closures, for example some reference data, this array will be shipped to each spark node with closure. For example if you have 10 nodes cluster with 100 partitions (10 partitions per node), this Array will be distributed at least 100 times (10 times to each node).
If you use broadcast it will be distributed once per node using efficient p2p protocol.
val array: Array[Int] = ??? // some huge array
val broadcasted = sc.broadcast(array)
And some RDD
val rdd: RDD[Int] = ???
In this case array will be shipped with closure each time
rdd.map(i => array.contains(i))
and with broadcast you’ll get huge performance benefit
rdd.map(i => broadcasted.value.contains(i))
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.If you have huge array that is accessed from Spark Closures, for example some reference data, this array will be shipped to each spark node with closure. For example if you have 10 nodes cluster with 100 partitions (10 partitions per node), this Array will be distributed at least 100 times (10 times to each node).
If you use broadcast it will be distributed once per node using efficient p2p protocol.
val array: Array[Int] = ??? // some huge array
val broadcasted = sc.broadcast(array)
And some RDD
val rdd: RDD[Int] = ???
In this case array will be shipped with closure each time
rdd.map(i => array.contains(i))
and with broadcast you’ll get huge performance benefit
rdd.map(i => broadcasted.value.contains(i))
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost. eg val data = List(1, 2, 3, 4, 5, 6) val bdata = sc.broadcast(data) val rdd = sc.parallelize(1 to 6, 2) val observedSizes = rdd.map(_ => bdata.value.size)
val acMap = sc.broadcast(myRDD.map { case (a,b,c,b) => (a, c) }.collectAsMap)
val otherMap = sc.broadcast(myOtherRDD.collectAsMap)
myBigRDD.map { case (a, b, c, d) =>
(acMap.value.get(a).get, otherMap.value.get(c).get)
}.collect
scala> b.destroy
org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at :27)
at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)
at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:107)
at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98)
… 48 elided
The first part of this post describes some points about broadcast variables. It makes an insight of their purposes. The second part shows how these variables are sent through the network. The last part, with usual learning tests, gives some use cases of broadcast variables.
uses:Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: broadcast variables, which can be used to cache a value in memory on all nodes, and accumulators, which are variables that are only “added” to, such as counters and sums.