We have already discussed the Hive bucketing concept in my other post. The concept is also same in Spark SQL. Bucketing concept is dividing partition into a number of equal clusters (also called clustering) or buckets. The concept is very much similar to clustering in relational databases such as Netezza, Snowflake, etc. In this article, we will check Spark SQL bucketing on DataFrame instead of tables.
We will use Pyspark to demonstrate the bucketing examples. The concept is same in Scala as well.
Spark SQL Bucketing on DataFrame
Bucketing is an optimization technique in both Spark and Hive that uses buckets (clustering columns) to determine data partitioning and avoid data shuffle.
The Bucketing is commonly used to optimize performance of a join query by avoiding shuffles of tables participating in the join. Bucketing can benefit when pre-shuffled bucketed tables are used more than once in the query.
Apache Spark supports the bucketing on tables, but, with different syntax compared to Apache Hive. You can find different syntax in my other post –Apache Spark SQL Bucketing Support. In the mentioned article, we have concentrated mostly on creating tables using Spark SQL.
How to enable Bucketing on Spark?
Bucketing is enabled by default.
Alternatively, you can set following property either on Spark Shell or in property file.
SET spark.sql.sources.bucketing.enabled=true
Spark DataFrame Bucketing Example
We will create a sample dataFrame and on top of that dataFrame we will add partition and bucketing.
testDF = spark.createDataFrame([(1,"111"), (2,"111"), (3,"222"), (4,"222"), (5,"222"), (6,"111"), (7,"333"), (8,"444")], ["id", "d_id"])
Following is the bucketing example.
testDF.write.bucketBy(42, "id").sortBy("d_id").saveAsTable("test_bucketed")
Note that, we have tested above code on Spark version 2.3.x.
Advantages of Bucketing the Tables in Spark
Below are some of the advantages of bucketing (clustering) in Spark:
- Optimized tables.
- Optimized Joins when you use pre-shuffled bucketed tables.
- Enables more efficient queries when you have predicates defined on bucketed column.
- Optimized access to the table data. You will minimize the table scan for given query when using WHERE condition on bucket column.
- Evenly distribute the data across different buckets hence optimal access to table data.
Hope this helps 🙂