How to Remove Duplicate Records from Spark DataFrame – Pyspark and Scala

  • Post author:
  • Post last modified:November 29, 2022
  • Post category:Apache Spark
  • Reading time:9 mins read

You can create Spark DataFrame with duplicate records. There are no methods that prevent you from adding duplicate records to Spark DataFrame. There are chances that some application such as ETL process may create dataframe with duplicate records. Spark SQL supports several methods to de-duplicate the table. In this article, we will check how to identify and remove duplicate records from Spark SQL DataFrame.

Remove Duplicate Records from Spark DataFrame

There are many methods that you can use to identify and remove the duplicate records from the Spark SQL DataFrame. For example, you can use the functions such as distinct() or dropDuplicates() to remove duplicate while creating another dataframe.

You can use any of the following methods to identify and remove duplicate rows from Spark SQL DataFrame.

Test Data

We will be using following sample DataFrame across all our subsequent examples,

Pyspark:

schema = 'id int, name string'
sampleDF = spark.createDataFrame(
[[1,'Scott'], 
[2,'Tiger'], 
[3,'Jane'], 
[4,'Jenny'], 
[5,'Judy'],
[3,'Jane'],
[2,'Tiger']], schema=schema)

Scala:

scala> val sampleDF = spark.createDataFrame(Seq(
(1,"Scott"), 
(2,"Tiger"), 
(3,"Jane"), 
(4,"Jenny"), 
(5,"Judy"),
(3,"Jane"),
(2,"Tiger")
)).toDF("id", "name")

Remove Duplicate from DataFrame using distinct() Function

The distinct() function on the DataFrame returns a new DataFrame containing the distinct rows in this DataFrame. The method take no arguments and thus all columns are taken into account when dropping the duplicates.

Consider following pyspark example remove duplicate from DataFrame using distinct() function.

Pyspark:

>>> newDF = sampleDF.distinct()
>>> newDF.sort('id').show()
+---+-----+
| id| name|
+---+-----+
|  1|Scott|
|  2|Tiger|
|  3| Jane|
|  4|Jenny|
|  5| Judy|
+---+-----+

and following is the Scala example remove duplicate from DataFrame using distinct() function.

Scala:

scala> val newDF = sampleDF.distinct()
scala> newDF.sort("id").show()
+---+-----+
| id| name|
+---+-----+
|  1|Scott|
|  2|Tiger|
|  3| Jane|
|  4|Jenny|
|  5| Judy|
+---+-----+

Remove Duplicate from DataFrame using dropDuplicates() Function

The dropDuplicates() function on the DataFrame return a new DataFrame with duplicate rows removed, optionally only considering certain columns.

Consider following pyspark example remove duplicate from DataFrame using dropDuplicates() function.

Pyspark:

>>> newDF2 = sampleDF.dropDuplicates()
>>> newDF2.sort('id').show()
+---+-----+
| id| name|
+---+-----+
|  1|Scott|
|  2|Tiger|
|  3| Jane|
|  4|Jenny|
|  5| Judy|
+---+-----+

and following is the Scala example remove duplicate from DataFrame using dropDuplicates() function.

Scala:

scala> val newDF2 = sampleDF.dropDuplicates()
scala> newDF2.sort("id").show()
+---+-----+
| id| name|
+---+-----+
|  1|Scott|
|  2|Tiger|
|  3| Jane|
|  4|Jenny|
|  5| Judy|
+---+-----+

Identify Spark DataFrame Duplicate records using groupBy method

The GROUP BY clause is used to group the rows based on a set of specified grouping columns and compute aggregations on the group of rows based on one or more specified aggregate function. You can use groupBy to group duplicate rows using the count aggregate function.

Consider following pyspark example remove duplicate from DataFrame using groupBy function.

Pyspark:

>>> newDF3 = sampleDF.groupBy("id", "name").count().select("id", "name").sort("id")
>>> newDF3.show()
+---+-----+
| id| name|
+---+-----+
|  1|Scott|
|  2|Tiger|
|  3| Jane|
|  4|Jenny|
|  5| Judy|
+---+-----+

and following is the Scala example remove duplicate from DataFrame using groupBy function.

Scala:

scala> val newDF3 = sampleDF.groupBy("id", "name").count().select("id", "name").sort("id")
scala> newDF3.show()
+---+-----+
| id| name|
+---+-----+
|  1|Scott|
|  2|Tiger|
|  3| Jane|
|  4|Jenny|
|  5| Judy|
+---+-----+

Identify Spark DataFrame Duplicate records using row_number window Function

Spark Window functions are used to calculate results such as the rank, row number etc over a range of input rows. The row_number() window function returns a sequential number starting from 1 within a window partition. All duplicates values will have row number other then 1.

Consider following pyspark example remove duplicate from DataFrame using row_number window Function.

Pyspark:

>>> from pyspark.sql.window import Window
>>> from pyspark.sql.functions import row_number

>>> newDF4 = sampleDF.withColumn("row_number", row_number().over(Window.partitionBy("id", "name").orderBy("id"))).where("row_number = 1").sort("id").select("id", "name")
>>> newDF4.show()
+---+-----+
| id| name|
+---+-----+
|  1|Scott|
|  2|Tiger|
|  3| Jane|
|  4|Jenny|
|  5| Judy|
+---+-----+

and following is the Scala example remove duplicate from DataFrame using row_number window Function.

Scala:

scala> import org.apache.spark.sql.functions._
scala> import org.apache.spark.sql.expressions.Window

scala> val newDF4 = sampleDF.withColumn("row_number", row_number().over(Window.partitionBy("id", "name").orderBy("id"))).where("row_number = 1").sort("id").select("id", "name")
scala> newDF4.show()
+---+-----+
| id| name|
+---+-----+
|  1|Scott|
|  2|Tiger|
|  3| Jane|
|  4|Jenny|
|  5| Judy|
+---+-----+

Related Articles,

Hope this helps 🙂