You can use Spark Dataset join operators to join multiple dataframes in Spark. Two or more dataFrames are joined to perform specific tasks such as getting common data from both dataFrames. In this article, we will check how to perform Spark SQL DataFrame self join using Pyspark.
Spark SQL DataFrame Self Join using Pyspark
Spark DataFrame supports various join types as mentioned in Spark Dataset join operators. A self join in a DataFrame is a join in which dataFrame is joined to itself. The self join is used to identify the child and parent relation.
In a Spark, you can perform self joining using two methods:
- Use DataFrame to join
- Write Hive Self Join Query and Execute using Spark SQL
Let us check these two methods in details.
Spark SQL DataFrame Self Join
In this method, we will use the DataFrame to perform self join. i.e. join dataFrame to itself.
For example, consider dataFrame which holds employee related information. An employee can be a manager too. We will use self join to get employee and his/her supervisor.
scala> val emp_data = Seq((100,"AAA",101),(101,"BBB",0),(102,"CCC",101),(103,"DDD",102),(104,"EEE",103),(104,"EEE",103),( 10,"FFF",101)).toDF("eid","name","mid")
scala> emp_data.show
+---+----+---+
|eid|name|mid|
+---+----+---+
|100| AAA|101|
|101| BBB| 0|
|102| CCC|101|
|103| DDD|102|
|104| EEE|103|
|104| EEE|103|
| 10| FFF|101|
+---+----+---+
Now, perform self join to identify manager names.
emp_data.as("df1").join(emp_data.as("df2"), $"df1.eid" === $"df2.mid").select($"df1.eid",$"df1.name",$"df2.name").show()
+---+----+----+
|eid|name|name|
+---+----+----+
|101| BBB| AAA|
|101| BBB| CCC|
|101| BBB| FFF|
|102| CCC| DDD|
|103| DDD| EEE|
|103| DDD| EEE|
+---+----+----+
You can remove duplicate values if any. The joining condition will be slightly different if you are using pyspark.
Hive Self Join Query Using Spark SQL
In this option, you can write the self join query in Hive and execute the same using Spark SQL.
You should have connected Spark with Hive to use this method. You have hive_site.xml in your Spark config folder.
For example, consider consider below example with an employee table. an employee table contains details about the employees and an employee can be manager too. You can use the self join to identify the employee and supervisor.
sqlContext.sql("select EMPL.EID, EMPL.NAME, MANAGER.NAME from employee_manager EMPL, employee_manager MANAGER where EMPL.eid = MANAGER.mid").show()
+------+-------+-------+
| EID | NAME | NAME |
+------+-------+-------+
| 101 | BBB | FFF |
| 101 | BBB | CCC |
| 101 | BBB | AAA |
| 102 | CCC | DDD |
| 103 | DDD | EEE |
+------+-------+-------+
Related Articles
Hope this helps 🙂