Parallelism in Apache Spark

Parallelism in Apache Spark

To achieve top performance from Apache Spark cluster you need to plan resource requirements for the job. Processing data in parallel is one of the performance factors you need to consider during planning. To process data in parallel you need to partition data and assign sufficient number of executors / cores to the job. In Spark number of executors and cores are the main factors to consider when calculating the parallelism. Let's review this in details in this article

In Spark RDD (Resilient Distributed Dataset ) is the abstraction provided by Spark for distributed data. RDD divides a data file into chunks and distribute among nodes. So each nodes in the cluster has a chunk of data in its memory. This distributed management of data in cluster environment is one of the main abstractions that Spark automatically manage internally. This abstraction facilitates the developer to focus on data processing logic instead of worrying about how to manage data in distributed environment

Above diagram shows that data file that needs to be processed is on disk (or some other source), when a spark application is launched Spark driver takes this file and divide it into chunks and distribute amount all nodes. In above image 4 nodes are assigned to the cluster so data file partitions are divided into 4 nodes

Next considerations is partition size. Each node can have multiple partitions of the file and usually this is set by default. If data file is loaded from HDFS (which is Hadoop's distributed disk storage system) then default partition size is 64MB. Let's suppose data file size is 300MB, so this file will be divided into 5 partitions. That also mean one of the node has to process 2 partitions of data while other 3 nodes will process one partition

When Spark job is launched, Spark launches one or more executors on each node. This number is set at launch time by developer. These are the executors that launch tasks to process data in each partition and each partition is processed by one task. Now let's suppose the developer has written a Spark application that execute a single MAP transformation and then save the file on the disk. Here is the code

val textFile = sc.textFile("hdfs://...")
val words = textFile.flatMap(line => line.split(" "))
words.saveAsTextFile("hdfs://...")

As each partition is processed by a single tasks, that means we need total 5 MAP tasks and 5 Save tasks to save data on disk. Total 10 tasks will be executed on 4 node cluster to process 300MB file. Now when map transformation is executed, 4 tasks will execute in parallel and then one task will execute separately. Similarly save to disk action will also execute 4 tasks in parallel and then one task will be executed separately. Now let's suppose a single Map task is executed in 2 minutes, which means 4 tasks will execute in 2 minutes in parallel and then 2 more minutes are needed to execute the single 5th task. In total we need 4 minutes to execute the Map transformation. Now if 1 minutes is needed to save a single partition on disk that means we need 2 minutes in total to save 5 partitions on disk. This need job needs total 6 minutes for execution

What we can do to speed up the above scenario. We know that 5 tasks are needed to execute a map transformation and we have 4 nodes. If partition size is default 64MB that means we have 5 partitions of the data. 4 tasks will execute in parallel in 2 minutes and then we need 2 more minutes to execute the single task. While the 5th task is executing on a node, other 3 nodes will be sitting idol. This will underutilized the resources of the cluster. Now if we increase the partition size from default 64MB to 75MB, that means data is divided into 4 equal partitions. Let's suppose each partition is executed in 2.5 minutes now and as we need 4 tasks to execute 4 partitions, we speed up the data processing time by increasing the partition size. Same will happen with save to disk action in which case say 1.5 minute is needed to save all 4 partitions on the disk. By just increasing size of partition, the overall affect was that the job executed in 4 minutes instead of 6 minutes.

Another consideration to increase parallelism is number of cores assigned to each executor. In above example it was assumed that one core is assigned to each executor as shown in following diagram. Remember executors are JVM processes that are launched by driver program on each node to execute tasks (that's why the name is executor i guess)

Now let's suppose we have 600MB file. If we set the partition size to 75MB we will have 8 data partitions, which means each node has 2 partitions of the data. If we keep number of cores to one as in the previous section, we need 2 sets of 4 tasks for map transformation and 2 sets of 4 tasks for save to disk action. In this case the job will execute in 8 minutes because 2 sets of both transformation and action are executed in parallel. Now let's suppose we increase number of cores to 2 for each executor, that means each executor will run 2 tasks in parallel to process its 2 partitions. By increasing the number of cores for each executor 600MB file will be processed in 4 minutes

In Spark to increase the parallelism we need to consider following factors

  • Size of the data file
  • Number of executors to launch on each node
  • Number of cores assigned to each executor on each node
  • RAM size for each executor

Another factor to keep in mind is how much RAM is given to each executor. This RAM is divided among cores that execute tasks on partitions. So if 2MB is given to each executor with 2 cores that means each task running on each core can use 1MB RAM

Another point to note is parallelism is controlled by number of cores assigned to each executor. That means if a executor has to process 2 tasks and if 2 cores are assigned then both tasks will run in parallel within a executor. If one core is assigned that means tasks will run one after other. So number of cores and partitions are basis of parallelism in Apache Spark

Here are the parameters to set while launching the Spark application

spark-submit 
--class org.apache.spark.examples.SparkPi 
--master yarn-cluster 
--num-executors 10 
--driver-memory 4g 
--executor-memory 2g 
--executor-cores 4 
lib/spark-examples*.jar 
 
  
  • Spark-submit command launches the spark application
  • master defines that his application will run on the YARN cluster
  • num-executors defines how many executors are required to launched in cluster
  • driver-memory defines how much RAM to assign to the driver program
  • executor-memory defines how much RAM to assign to each executor
  • executor-cores defines how many cores should be assigned to each executor

To view or add a comment, sign in

More articles by Shahzad Aslam

Others also viewed

Explore content categories