SQL Merge Operation Using Pyspark – UPSERT Example

  • Post author:
  • Post last modified:January 27, 2020
  • Post category:Apache Spark
  • Reading time:5 mins read

In the relational databases such as Snowflake, Netezza, Oracle, etc, Merge statement is used to manipulate the data stored in the table. In this article, we will check how to SQL Merge operation simulation using Pyspark. The method is same in Scala with little modification.

SQL Merge Statement

The MERGE command in relational databases, allows you to update old records and insert new records simultaneously. This command is sometimes called UPSERT (UPdate and inSERT command).

Following is the sample merge statement available in RDBMS.

merge into merge_test
  using merge_test2 on merge_test.a = merge_test2.a
when matched then 
  update set merge_test.b = merge_test2.b
when not matched then 
  insert (a, b) values (merge_test2.a, merge_test2.b);

Test Data

Following data frames are used to demonstrate the merge statement alternative in pyspark.

In these dataframes, id column is the primary key on that we are going to merge the two data frames.

testDFOld = sqlContext.createDataFrame([(1,"111"), (2,"222"), (3,"333"), (4,"444"), (5,"555")], ["id", "d_id"])
+---+----+
| id|d_id|
+---+----+
|  1| 111|
|  2| 222|
|  3| 333|
|  4| 444|
|  5| 555|
+---+----+

testDFNew = sqlContext.createDataFrame([(1,"100"), (2,"200"), (6,"666"), (7,"777")], ["id", "d_id"])
+---+----+
| id|d_id|
+---+----+
|  1| 100|
|  2| 200|
|  6| 666|
|  7| 777|
+---+----+

We need to update the value for ID 1 and 2. Add new id 6 and 7.

SQL Merge Operation Using Pyspark

Apache Spark does not support the merge operation function yet. We can simulate the MERGE operation using window function and unionAll functions available in Spark.

Following steps can be use to implement SQL merge command in Apache Spark.

  • Merge Statement involves two data frames. Use unionALL function to combine the two DF’s and create new merge data frame which has data from both data frames. Note that, you can use union function if your Spark version is 2.0 and above.

For example, following example uses unionAll.

df_merge = testDFOld.unionAll(testDFNew)
df_merge.show()

+---+----+
| id|d_id|
+---+----+
|  1| 111|
|  2| 222|
|  3| 333|
|  4| 444|
|  5| 555|
|  1| 100|
|  2| 200|
|  6| 666|
|  7| 777|
+---+----+
  • Secondly, assign a row number to the each row (_row_number). You can use a window function to group and partition records.

For example, following example with the primary key ‘id’ grouped together and ordered by d_id in ascending order.

df_merge = df_merge.withColumn("_row_number", row_number().over(Window.partitionBy (df_merge['id']).orderBy('d_id')))

df_merge.show()
+---+----+-----------+
| id|d_id|_row_number|
+---+----+-----------+
|  1| 100|          1|
|  1| 111|          2|
|  2| 200|          1|
|  2| 222|          2|
|  3| 333|          1|
|  4| 444|          1|
|  5| 555|          1|
|  6| 666|          1|
|  7| 777|          1|
+---+----+-----------+
  • Finally, filter the DataFrame, keeping only _row_number = 1, since it represents a new record. Also remove _row_number column as it is no longer required.
df_merge = df_merge.where(df_merge._row_number == 1).drop("_row_number")

df_merge.orderBy('id').show()
+---+----+
| id|d_id|
+---+----+
|  1| 100|
|  2| 200|
|  3| 333|
|  4| 444|
|  5| 555|
|  6| 666|
|  7| 777|
+---+----+

As you can see, we have desired data frame.

Related Article,

Hope this helps 🙂

This Post Has One Comment

  1. Kicha

    I am surprised by the result. what if testDFNew.d_id = 112 for id=1 then in that case only the old value (111) will be retained and not 112

Comments are closed.