You can create Spark DataFrame with duplicate records. There are no methods that prevent you from adding duplicate records to Spark DataFrame. There are chances that some application such as ETL process may create dataframe with duplicate records. Spark SQL supports several methods to de-duplicate the table. In this article, we will check how to identify and remove duplicate records from Spark SQL DataFrame.
Remove Duplicate Records from Spark DataFrame
There are many methods that you can use to identify and remove the duplicate records from the Spark SQL DataFrame. For example, you can use the functions such as distinct()
or dropDuplicates()
to remove duplicate while creating another dataframe.
You can use any of the following methods to identify and remove duplicate rows from Spark SQL DataFrame.
- Remove Duplicate using distinct() Function
- Remove Duplicate using dropDuplicates() Function
- Identify Spark DataFrame Duplicate records using groupBy method
- Identify Spark DataFrame Duplicate records using row_number window Function
Test Data
We will be using following sample DataFrame across all our subsequent examples,
Pyspark:
schema = 'id int, name string'
sampleDF = spark.createDataFrame(
[[1,'Scott'],
[2,'Tiger'],
[3,'Jane'],
[4,'Jenny'],
[5,'Judy'],
[3,'Jane'],
[2,'Tiger']], schema=schema)
Scala:
scala> val sampleDF = spark.createDataFrame(Seq(
(1,"Scott"),
(2,"Tiger"),
(3,"Jane"),
(4,"Jenny"),
(5,"Judy"),
(3,"Jane"),
(2,"Tiger")
)).toDF("id", "name")
Remove Duplicate from DataFrame using distinct() Function
The distinct()
function on the DataFrame returns a new DataFrame containing the distinct rows in this DataFrame. The method take no arguments and thus all columns are taken into account when dropping the duplicates.
Consider following pyspark example remove duplicate from DataFrame using distinct() function.
Pyspark:
>>> newDF = sampleDF.distinct()
>>> newDF.sort('id').show()
+---+-----+
| id| name|
+---+-----+
| 1|Scott|
| 2|Tiger|
| 3| Jane|
| 4|Jenny|
| 5| Judy|
+---+-----+
and following is the Scala example remove duplicate from DataFrame using distinct() function.
Scala:
scala> val newDF = sampleDF.distinct()
scala> newDF.sort("id").show()
+---+-----+
| id| name|
+---+-----+
| 1|Scott|
| 2|Tiger|
| 3| Jane|
| 4|Jenny|
| 5| Judy|
+---+-----+
Remove Duplicate from DataFrame using dropDuplicates() Function
The dropDuplicates()
function on the DataFrame return a new DataFrame with duplicate rows removed, optionally only considering certain columns.
Consider following pyspark example remove duplicate from DataFrame using dropDuplicates() function.
Pyspark:
>>> newDF2 = sampleDF.dropDuplicates()
>>> newDF2.sort('id').show()
+---+-----+
| id| name|
+---+-----+
| 1|Scott|
| 2|Tiger|
| 3| Jane|
| 4|Jenny|
| 5| Judy|
+---+-----+
and following is the Scala example remove duplicate from DataFrame using dropDuplicates() function.
Scala:
scala> val newDF2 = sampleDF.dropDuplicates()
scala> newDF2.sort("id").show()
+---+-----+
| id| name|
+---+-----+
| 1|Scott|
| 2|Tiger|
| 3| Jane|
| 4|Jenny|
| 5| Judy|
+---+-----+
Identify Spark DataFrame Duplicate records using groupBy method
The GROUP BY
clause is used to group the rows based on a set of specified grouping columns and compute aggregations on the group of rows based on one or more specified aggregate function. You can use groupBy to group duplicate rows using the count aggregate function.
Consider following pyspark example remove duplicate from DataFrame using groupBy function.
Pyspark:
>>> newDF3 = sampleDF.groupBy("id", "name").count().select("id", "name").sort("id")
>>> newDF3.show()
+---+-----+
| id| name|
+---+-----+
| 1|Scott|
| 2|Tiger|
| 3| Jane|
| 4|Jenny|
| 5| Judy|
+---+-----+
and following is the Scala example remove duplicate from DataFrame using groupBy function.
Scala:
scala> val newDF3 = sampleDF.groupBy("id", "name").count().select("id", "name").sort("id")
scala> newDF3.show()
+---+-----+
| id| name|
+---+-----+
| 1|Scott|
| 2|Tiger|
| 3| Jane|
| 4|Jenny|
| 5| Judy|
+---+-----+
Identify Spark DataFrame Duplicate records using row_number window Function
Spark Window functions are used to calculate results such as the rank, row number etc over a range of input rows. The row_number() window function returns a sequential number starting from 1 within a window partition. All duplicates values will have row number other then 1.
Consider following pyspark example remove duplicate from DataFrame using row_number window Function.
Pyspark:
>>> from pyspark.sql.window import Window
>>> from pyspark.sql.functions import row_number
>>> newDF4 = sampleDF.withColumn("row_number", row_number().over(Window.partitionBy("id", "name").orderBy("id"))).where("row_number = 1").sort("id").select("id", "name")
>>> newDF4.show()
+---+-----+
| id| name|
+---+-----+
| 1|Scott|
| 2|Tiger|
| 3| Jane|
| 4|Jenny|
| 5| Judy|
+---+-----+
and following is the Scala example remove duplicate from DataFrame using row_number window Function.
Scala:
scala> import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.expressions.Window
scala> val newDF4 = sampleDF.withColumn("row_number", row_number().over(Window.partitionBy("id", "name").orderBy("id"))).where("row_number = 1").sort("id").select("id", "name")
scala> newDF4.show()
+---+-----+
| id| name|
+---+-----+
| 1|Scott|
| 2|Tiger|
| 3| Jane|
| 4|Jenny|
| 5| Judy|
+---+-----+
Related Articles,
- Spark SQL Recursive DataFrame – Pyspark and Scala
- Replace Pyspark DataFrame Column Value – Methods
- Spark SQL Count Distinct Window Function
- Rename PySpark DataFrame Column – Methods and Examples
- Spark SQL to_date() Function – Pyspark and Scala
Hope this helps 🙂