How to keep your Spark jobs from dying.

Apache Spark is a powerful tool for processing data at large scale on a computing cluster. The API is simple and well-documented, and compared to MapReduce or Pig is a breeze to write. The choice of language is fantastic, especially if you love Python like me. If you’re new to Spark, the following will almost certainly happen to you:

  1. You’ll write a proof of concept in Spark on a smaller data set. It will work just fine.
  2. You’ll apply your application to a larger data set and immediately get non-descriptive out-of-memory errors.
  3. You’ll increase your executor memory, only to find that the error doesn’t go away. You’ll down-sample, still to no avail.

Why is this happening, and what can be done to cure your Spark ailments? In this post, I’ll go through a few tricks I’ve learned to avoid out-of-memory errors that have solved 95% of my execution problems. These things are not well documented, and it is crucial for the Spark community to get this information out.

Disclaimer: The following is only assured to apply to YARN clusters. While I believe it is also relevant to Mesos or Standalone, your mileage may vary.

1. Joins – Just Say No

Joins are evil in distributed computing, and you should avoid them at all costs (or at least try some advanced techniques to optimize them). It helps to think through this by reviewing the canonical MapReduce diagram:

MapReduce

In the MapReduce framework, a key-value pair is emitted from the map function and shuffled across the cluster network. Crucially, unique keys are shuffled to one and only one server. The amount of shuffle is one of the main factors that increases processing time in distributed computing.

In most cases, a join of any size nearly always initiates a shuffle. To understand why, suppose the data looks as follows:

Table 1                              Table 2                                        
Key|Value                        Key|Value
123|Smith, Joe               123|Mazda
456|Doe, Jane                789|BMW
789|Paulson, Robert       012|Ford

To join these tables together on Key, we have to shuffle all unique keys to one and only one server. The amount of shuffle increases dramatically with the number of keys we’re trying to join.

Now think about what would happen in the above diagram if you have 95% red keys (this is called data skew). All these red keys are shuffled to the same server, and the reduce function is applied sequentially to each value. In this case, the join can actually take much longer in a distributed context than it would if you were to just do it on a single machine.

In short, if you can avoid joining, do so. If you have to join, there are a few things you can do:

  • If possible, broadcast. Broadcasting is similar to a map-side join in MapReduce, and puts all keys in memory of each executor (for more on the Spark execution model, see here). The key requirement here is that the broadcasted data set must fit in the driver’s memory. Also, if you are using Pyspark, you may have trouble broadcasting a data set more than 8GB in size because of Pickle limitations.
  • Try to “salt your join keys” to create even buckets of join keys. This is a fairly advanced topic that requires a blog post all on its own. Stay tuned.

2. Increase spark.sql.shuffle.partitions.

This is one of the most confusing items for most people when they get started with Spark at scale. RDDs and data frames are split into blocks (on Hadoop YARN, based on the number of file blocks read into memory for each data set), and this is the unit of parallelism that is processed over in your job. If you are looking at the Spark Application Master UI, you’ll see this as the number of tasks in the early stages of your job.

However, when using SparkSQL (which most people are if they are using data frames), any time a shuffle is initiated, data frames are split into shuffle blocks, the amount of which corresponds to this parameter, spark.sql.shuffle.partitions. The default value for this parameter is hardcoded at 200.

Say that with me again, the default value for spark.sql.shuffle.partitions is 200. This means that when a shuffle is initiated, your data frame will be split into 200 blocks, and these 200 blocks are shuffled across the network.

Another confusing fact is that these shuffle blocks must be less than 2GB in size. Going over 2GB in size on a shuffle block will throw an out-of-memory error, even if you increase the amount of memory per executor.

A few tricks to avoid these problems:

  • Simply increase spark.sql.shuffle.partitions. If you’re using a HiveContext (which you can still do in Spark2 using the SparkSession API), issue:

hc.sql('set spark.sql.shuffle.partitions=10000')

  • Repartition your data liberally. Feel free to litter your code with commands like df.repartition(x) (where x is an integer). This will change the data locality across executors, leveling out to help avoid skew. Successive joins lead to more skew, which lead to more unbalanced shuffle blocks.

3. Increase spark.yarn.executor.memoryOverhead

This parameter is configured when the job is submitted in the spark-submit command and is defined as follows:

The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).

Visually, this corresponds to the the block on the bottom left of the following diagram (see more here):

spark_memory.png

The spark.yarn.executor.memoryOverhead parameter defaults to 10% of executor memory or a minimum of 384 MB.

One particular off-heap executor process that can require substantial memory is garbage collection. In a particularly memory-intensive Spark job (or on a cluster with multiple heavy-duty ELT jobs), more garbage collection will be initiated. Therefore, it makes sense to increase this parameter liberally, as long as there is enough memory for data across the RDD blocks.

This can be configured in a spark-submit command via a conf parameter, as such using Pyspark:

/usr/bin/spark2-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 30g \
--executor-cores 5 \
--driver-memory 20g \
--conf spark.yarn.executor.memoryOverhead=4g my_spark_script.py

Again, this should ideally be configured at the time of job submission. Additionally, you can try different garbage collectors, for example G1GC (submitted via --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC").

Conclusion

These tips and tricks are well known by seasoned Spark developers, but they are not well documented or easily understood. Implementing these fixes can go a long way to helping save your Spark jobs from out-of-memory errors and hopefully get your jobs to the finish line. Spark development can be a black box, but a little tinkering can go a long way.

One thought on “How to keep your Spark jobs from dying.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s