Three tips for making Spark and Hive work for you.

A common pattern that many people getting started with Hadoop seek to adopt is to put data in Hive tables, often either using the Hive computational engine directly or through Spark. The reasons for this are often:

  • Hive looks like SQL. In fact this is why it was created – instead of asking developers to write MapReduce, Facebook created an API for developers to write what looks like SQL that is actually generating MapReduce behind the scenes.
  • Hive “tables” have structure. I put “tables” in quotation marks because there are no actual tables in Hive, this is an illusion. Behind the scenes, Hive utilizes a “schema-on-read” approach to impose structure to files on HDFS.

I have seen and heard on a number of occasions that users of Hive start off happy, only to develop an intense dissatisfaction with it. In fact, I have experienced it myself and do so on a daily basis. That being said, there are a few tricks to know that can make your life easier with Hive.

Additionally, when users write Spark to interact with Hive tables, via drop, create, insert overwrite, and partition commands listed in query strings and passed to a HiveContext object, they often find themselves facing oddities and computational issues. Getting Spark jobs to run is the topic of my last blog post,  but when working with HiveContext there are additional gotchas to be concerned with. I’ll address a few of them here.

So, without further adieu, let’s get to it – here are three tips to make your life easier working with Hive and HiveContext in Spark.

1. Use external tables in Hive.

In Hive, there are two types of tables – internal and external tables. The difference is as follows:

  • Internal tables are owned and managed by the Hive metastore. Dropping an internal table actually deletes the files, and creating an external table requires a movement from the file location on HDFS to the /user/hive/warehouse directory.
  • External tables are still referenced in the metastore but are not owned by it. The metastore simply points to a file that can be anywhere on HDFS. Dropping and recreating or overwriting external tables often implies simply changing the location where the external table is pointing to.

Based on my experience, I would advocate the use of external tables in almost every case when choosing Hive to work with structured data on HDFS. This is because:

  1. No downtime for end users when overwriting tables – often, what can happen when “updating” data in Hive (i.e. dropping and replacing, there is no real update in Hive) is that users will see blank tables. External tables avoid this because you simply point to the new file in HDFS. You can also insert overwrite your tables, but this requires a lot of additional storage that you may not have for large data sets – and, you have to be intentional about doing it.
  2. More control in making changes to tables – when making a change to an internal table (drop, create, delete), the procedure for creating temp files, scrapping the old files, and pointing Hive to the new files is hidden from the developer. But when you use external tables, you can more easily manage the files on HDFS and where they reside. This allows the developer greater control for when a change is made to point to new tables, and how quickly it takes place.

In short, using external tables allows the developer much more control over how to create new files, point “tables” (remember it’s all an abstraction) to the new files, and what to do with the old files. This will make your life much easier in the long run.

2. Do not blindly use the SparkSQL API to create Hive tables.

Another common pattern I’ve seen is as follows:

  • Developer knows the way they would create the table and underlying workflow in a traditional environment like SQL Server.
  • Developer copies this string (joins and all) into a string in a Python script.
  • Developer passes this string to a HiveContext as such:

df = hc.sql('select * from table1 t1 join table2 t2 on t1.x = t2.y')

  • The job fails, running out of memory, even though the executors have more than enough memory.

This pattern almost always happens because of a skewed join (which is very common in left joins) and requires careful handling using repartitioning commands and management of the data frame. In this case, I implore you to use the standard Spark API instead of SparkSQL. You will have much more control over how your data frame is partitioned, and you can avoid out of memory errors in the same way you always would, following my previous post.

3. “Fuzz” or “salt” your partitions when writing files in a HiveContext.

The API for making a connection to Hive has changed in Spark 2, but you can still use the regular Hive context if you’re so inclined. Any code that’s been running in production for a while will likely use the HiveContext API for connecting to Hive.

When you go to write a table in Hive, you often want to partition the files for query performance down the road. This literally means write the files out to different directories on disk. For example, if you have a table with aggregated data by business unit, instead of storing all data like this:

  • /user/hive/warehouse/dbname.db/tablename/[sequence of avro / parquet files]

Store the data like this:

  • /user/hive/warehouse/dbname.db/tablename/businessUnit0
  • /user/hive/warehouse/dbname.db/tablename/businessUnit1
  • /user/hive/warehouse/dbname.db/tablename/businessUnit2

and so forth. This is accomplished through the PARTITION BY command in the create table statement.

Now what happens in the above example when businessUnit0 has way more data than businessUnit1? When writing the files out, there can be a major skew that causes a delay in job performance.

One way to get around this is to assign records in businessUnit0 with a random integer, say between 0 and 5, so that your directory structure looks as such:

  • /user/hive/warehouse/dbname.db/tablename/businesUnit0-parition0
  • /user/hive/warehouse/dbname.db/tablename/businesUnit0-parition1
  • /user/hive/warehouse/dbname.db/tablename/businesUnit0-parition2
  • /user/hive/warehouse/dbname.db/tablename/businesUnit0-parition3
  • /user/hive/warehouse/dbname.db/tablename/businesUnit0-parition4
  • /user/hive/warehouse/dbname.db/tablename/businesUnit1
  • /user/hive/warehouse/dbname.db/tablename/businesUnit2

This comes at the cost of additional shuffle when writing to disk, but it allows further parallelization in writing. You can do this simply by adding a field to your table / data frame that is a random integer, and then partition by this field in the PARTITION BY statement.

In conclusion…

Following these three tips can increase the speed of your jobs and make Hive generally easier for you. I would not recommend blindly using Hive like a database at all, but using external tables, using the base Spark API instead of Spark SQL, and properly partitioning tables on write can dull the pain.

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s