Three tips for making Spark and Hive work for you.

A common pattern that many people getting started with Hadoop seek to adopt is to put data in Hive tables, often either using the Hive computational engine directly or through Spark. The reasons for this are often:

  • Hive looks like SQL. In fact this is why it was created – instead of asking developers to write MapReduce, Facebook created an API for developers to write what looks like SQL that is actually generating MapReduce behind the scenes.
  • Hive “tables” have structure. I put “tables” in quotation marks because there are no actual tables in Hive, this is an illusion. Behind the scenes, Hive utilizes a “schema-on-read” approach to impose structure to files on HDFS.

I have seen and heard on a number of occasions that users of Hive start off happy, only to develop an intense dissatisfaction with it. In fact, I have experienced it myself and do so on a daily basis. That being said, there are a few tricks to know that can make your life easier with Hive.

Additionally, when users write Spark to interact with Hive tables, via drop, create, insert overwrite, and partition commands listed in query strings and passed to a HiveContext object, they often find themselves facing oddities and computational issues. Getting Spark jobs to run is the topic of my last blog post,  but when working with HiveContext there are additional gotchas to be concerned with. I’ll address a few of them here.

So, without further adieu, let’s get to it – here are three tips to make your life easier working with Hive and HiveContext in Spark.

1. Use external tables in Hive.

In Hive, there are two types of tables – internal and external tables. The difference is as follows:

  • Internal tables are owned and managed by the Hive metastore. Dropping an internal table actually deletes the files, and creating an external table requires a movement from the file location on HDFS to the /user/hive/warehouse directory.
  • External tables are still referenced in the metastore but are not owned by it. The metastore simply points to a file that can be anywhere on HDFS. Dropping and recreating or overwriting external tables often implies simply changing the location where the external table is pointing to.

Based on my experience, I would advocate the use of external tables in almost every case when choosing Hive to work with structured data on HDFS. This is because:

  1. No downtime for end users when overwriting tables – often, what can happen when “updating” data in Hive (i.e. dropping and replacing, there is no real update in Hive) is that users will see blank tables. External tables avoid this because you simply point to the new file in HDFS. You can also insert overwrite your tables, but this requires a lot of additional storage that you may not have for large data sets – and, you have to be intentional about doing it.
  2. More control in making changes to tables – when making a change to an internal table (drop, create, delete), the procedure for creating temp files, scrapping the old files, and pointing Hive to the new files is hidden from the developer. But when you use external tables, you can more easily manage the files on HDFS and where they reside. This allows the developer greater control for when a change is made to point to new tables, and how quickly it takes place.

In short, using external tables allows the developer much more control over how to create new files, point “tables” (remember it’s all an abstraction) to the new files, and what to do with the old files. This will make your life much easier in the long run.

2. Do not blindly use the SparkSQL API to create Hive tables.

Another common pattern I’ve seen is as follows:

  • Developer knows the way they would create the table and underlying workflow in a traditional environment like SQL Server.
  • Developer copies this string (joins and all) into a string in a Python script.
  • Developer passes this string to a HiveContext as such:

df = hc.sql('select * from table1 t1 join table2 t2 on t1.x = t2.y')

  • The job fails, running out of memory, even though the executors have more than enough memory.

This pattern almost always happens because of a skewed join (which is very common in left joins) and requires careful handling using repartitioning commands and management of the data frame. In this case, I implore you to use the standard Spark API instead of SparkSQL. You will have much more control over how your data frame is partitioned, and you can avoid out of memory errors in the same way you always would, following my previous post.

3. “Fuzz” or “salt” your partitions when writing files in a HiveContext.

The API for making a connection to Hive has changed in Spark 2, but you can still use the regular Hive context if you’re so inclined. Any code that’s been running in production for a while will likely use the HiveContext API for connecting to Hive.

When you go to write a table in Hive, you often want to partition the files for query performance down the road. This literally means write the files out to different directories on disk. For example, if you have a table with aggregated data by business unit, instead of storing all data like this:

  • /user/hive/warehouse/dbname.db/tablename/[sequence of avro / parquet files]

Store the data like this:

  • /user/hive/warehouse/dbname.db/tablename/businessUnit0
  • /user/hive/warehouse/dbname.db/tablename/businessUnit1
  • /user/hive/warehouse/dbname.db/tablename/businessUnit2

and so forth. This is accomplished through the PARTITION BY command in the create table statement.

Now what happens in the above example when businessUnit0 has way more data than businessUnit1? When writing the files out, there can be a major skew that causes a delay in job performance.

One way to get around this is to assign records in businessUnit0 with a random integer, say between 0 and 5, so that your directory structure looks as such:

  • /user/hive/warehouse/dbname.db/tablename/businesUnit0-parition0
  • /user/hive/warehouse/dbname.db/tablename/businesUnit0-parition1
  • /user/hive/warehouse/dbname.db/tablename/businesUnit0-parition2
  • /user/hive/warehouse/dbname.db/tablename/businesUnit0-parition3
  • /user/hive/warehouse/dbname.db/tablename/businesUnit0-parition4
  • /user/hive/warehouse/dbname.db/tablename/businesUnit1
  • /user/hive/warehouse/dbname.db/tablename/businesUnit2

This comes at the cost of additional shuffle when writing to disk, but it allows further parallelization in writing. You can do this simply by adding a field to your table / data frame that is a random integer, and then partition by this field in the PARTITION BY statement.

In conclusion…

Following these three tips can increase the speed of your jobs and make Hive generally easier for you. I would not recommend blindly using Hive like a database at all, but using external tables, using the base Spark API instead of Spark SQL, and properly partitioning tables on write can dull the pain.

How to keep your Spark jobs from dying.

Apache Spark is a powerful tool for processing data at large scale on a computing cluster. The API is simple and well-documented, and compared to MapReduce or Pig is a breeze to write. The choice of language is fantastic, especially if you love Python like me. If you’re new to Spark, the following will almost certainly happen to you:

  1. You’ll write a proof of concept in Spark on a smaller data set. It will work just fine.
  2. You’ll apply your application to a larger data set and immediately get non-descriptive out-of-memory errors.
  3. You’ll increase your executor memory, only to find that the error doesn’t go away. You’ll down-sample, still to no avail.

Why is this happening, and what can be done to cure your Spark ailments? In this post, I’ll go through a few tricks I’ve learned to avoid out-of-memory errors that have solved 95% of my execution problems. These things are not well documented, and it is crucial for the Spark community to get this information out.

Disclaimer: The following is only assured to apply to YARN clusters. While I believe it is also relevant to Mesos or Standalone, your mileage may vary.

1. Joins – Just Say No

Joins are evil in distributed computing, and you should avoid them at all costs (or at least try some advanced techniques to optimize them). It helps to think through this by reviewing the canonical MapReduce diagram:

MapReduce

In the MapReduce framework, a key-value pair is emitted from the map function and shuffled across the cluster network. Crucially, unique keys are shuffled to one and only one server. The amount of shuffle is one of the main factors that increases processing time in distributed computing.

In most cases, a join of any size nearly always initiates a shuffle. To understand why, suppose the data looks as follows:

Table 1                              Table 2                                        
Key|Value                        Key|Value
123|Smith, Joe               123|Mazda
456|Doe, Jane                789|BMW
789|Paulson, Robert       012|Ford

To join these tables together on Key, we have to shuffle all unique keys to one and only one server. The amount of shuffle increases dramatically with the number of keys we’re trying to join.

Now think about what would happen in the above diagram if you have 95% red keys (this is called data skew). All these red keys are shuffled to the same server, and the reduce function is applied sequentially to each value. In this case, the join can actually take much longer in a distributed context than it would if you were to just do it on a single machine.

In short, if you can avoid joining, do so. If you have to join, there are a few things you can do:

  • If possible, broadcast. Broadcasting is similar to a map-side join in MapReduce, and puts all keys in memory of each executor (for more on the Spark execution model, see here). The key requirement here is that the broadcasted data set must fit in the driver’s memory. Also, if you are using Pyspark, you may have trouble broadcasting a data set more than 8GB in size because of Pickle limitations.
  • Try to “salt your join keys” to create even buckets of join keys. This is a fairly advanced topic that requires a blog post all on its own. Stay tuned.

2. Increase spark.sql.shuffle.partitions.

This is one of the most confusing items for most people when they get started with Spark at scale. RDDs and data frames are split into blocks (on Hadoop YARN, based on the number of file blocks read into memory for each data set), and this is the unit of parallelism that is processed over in your job. If you are looking at the Spark Application Master UI, you’ll see this as the number of tasks in the early stages of your job.

However, when using SparkSQL (which most people are if they are using data frames), any time a shuffle is initiated, data frames are split into shuffle blocks, the amount of which corresponds to this parameter, spark.sql.shuffle.partitions. The default value for this parameter is hardcoded at 200.

Say that with me again, the default value for spark.sql.shuffle.partitions is 200. This means that when a shuffle is initiated, your data frame will be split into 200 blocks, and these 200 blocks are shuffled across the network.

Another confusing fact is that these shuffle blocks must be less than 2GB in size. Going over 2GB in size on a shuffle block will throw an out-of-memory error, even if you increase the amount of memory per executor.

A few tricks to avoid these problems:

  • Simply increase spark.sql.shuffle.partitions. If you’re using a HiveContext (which you can still do in Spark2 using the SparkSession API), issue:

hc.sql('set spark.sql.shuffle.partitions=10000')

  • Repartition your data liberally. Feel free to litter your code with commands like df.repartition(x) (where x is an integer). This will change the data locality across executors, leveling out to help avoid skew. Successive joins lead to more skew, which lead to more unbalanced shuffle blocks.

3. Increase spark.yarn.executor.memoryOverhead

This parameter is configured when the job is submitted in the spark-submit command and is defined as follows:

The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).

Visually, this corresponds to the the block on the bottom left of the following diagram (see more here):

spark_memory.png

The spark.yarn.executor.memoryOverhead parameter defaults to 10% of executor memory or a minimum of 384 MB.

One particular off-heap executor process that can require substantial memory is garbage collection. In a particularly memory-intensive Spark job (or on a cluster with multiple heavy-duty ELT jobs), more garbage collection will be initiated. Therefore, it makes sense to increase this parameter liberally, as long as there is enough memory for data across the RDD blocks.

This can be configured in a spark-submit command via a conf parameter, as such using Pyspark:

/usr/bin/spark2-submit \
--master yarn \
--deploy-mode cluster \
--executor-memory 30g \
--executor-cores 5 \
--driver-memory 20g \
--conf spark.yarn.executor.memoryOverhead=4g my_spark_script.py

Again, this should ideally be configured at the time of job submission. Additionally, you can try different garbage collectors, for example G1GC (submitted via --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC").

Conclusion

These tips and tricks are well known by seasoned Spark developers, but they are not well documented or easily understood. Implementing these fixes can go a long way to helping save your Spark jobs from out-of-memory errors and hopefully get your jobs to the finish line. Spark development can be a black box, but a little tinkering can go a long way.