PySpark Broadcast variables and Accumulator

🢂 Broadcast variables are read-only shared variables that are cached and available on all nodes in a cluster in-order to access or use by the tasks.

🢂 Let me explain with an example when to use broadcast variables, assume you are getting a two-letter country state code in a file and you wanted to transform it to a full state name, (for example CA to California, NY to New York e.t.c) by doing a lookup to reference mapping. In some instances, this data could be large and you may have many such lookups (like zip code e.t.c).

🢂 Instead of distributing this information along with each task over the network (overhead and time-consuming), we can use the broadcast variable to cache this lookup info on each machine, and tasks use this cached info while executing the transformations.

PySpark does the following process:

  • PySpark breaks the job into stages that have distributed shuffling and actions are executed within the stage.
  • Later Stages are also broken into tasks
  • Spark broadcasts the common data (reusable) needed by tasks within each stage.
  • The broadcasted data is cached in serialized format and deserialized before executing each task.

Let’s see How to create a Broadcast variable:

The PySpark Broadcast is created using the broadcast(v) method of the SparkContext class. This method takes the argument v that you want to broadcast.

broadcastVar = sc.broadcast(Array(0, 1, 2, 3))

Below is a very simple example of how to use broadcast variables on RDD. This example defines commonly used data (states) in a Map variable and distributes the variable using SparkContext.broadcast() and then use these variables on the RDD map() transformation.



import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('').getOrCreate()

states = "NY":"New York", "CA":"California", "FL":"Florida"
broadcastStates = spark.sparkContext.broadcast(states)

data = [("James","Smith","USA","CA"),

rdd = spark.sparkContext.parallelize(data)

def state_convert(code):
return broadcastStates.value[code]

result = x: (x[0],x[1],x[2],state_convert(x[3]))).collect()


🢂 We can also use the broadcast variable on the filter and joins. Below is a filter example.

🢂 Accumulators have shared variables which are only “added” through an associative and commutative operation and are used to perform counters (Similar to Map-reduce counters) or sum operations.

🢂Accumulators are write-only and initialize once variables where only tasks that are running on workers are allowed to update and updates from the workers get propagated automatically to the driver program.

🢂But, only the driver program is allowed to access the Accumulator variable using the value property.

How to create the Accumulator variable in PySpark?

Using accumulator() from SparkContext class we can create an Accumulator in PySpark programming. Users can also create Accumulators for custom types using AccumulatorParam the class of PySpark.

  • sparkContext.accumulator() is used to define accumulator variables.
  • add() function is used to add/update a value in the accumulator
  • value property on the accumulator variable is used to retrieve the value from the accumulator.

Below Example is a complete RDD example of using different accumulators.

import pyspark
from pyspark.sql import SparkSession
rdd.foreach(lambda x:accum.add(x))
def countFun(x):
global accuSum
rdd2.foreach(lambda x:accumCount.add(1))

Source link

Leave a Reply

Your email address will not be published. Required fields are marked *

Previous Article

Richard Sherman in talks with Amazon: Source

Next Article

Philip Morris International in Talks to Buy European Smokeless-Tobacco Rival

Related Posts