Everyday data dedupe with Spark

Eshan Haval
4 min readOct 24, 2020

--

Photo by Matthew Henry on Unsplash

Recently, I was given a task to dedupe a dataset using Apache spark. There were different ways I considered to accomplish it. Some of them are pretty well known and straight out of the box. In this article I would like to talk about some of them and what how they could be used depending on the use case. I will use Scala for writing all of the code.

Fimport spark.implicits._val installsDF = Seq( 
("01/01/2020", "a", 123),("01/02/2020", "b", 234),
("01/03/2020", "a", 345),("01/04/2020", "b", 234),
("01/01/2020", "a", 123),("01/05/2020", "x", 567) ).toDF("installDate", "userId", "appId")
installsDF.show(false)
+-----------+------+-----+
|installDate|userId|appId|
+-----------+------+-----+
|01/01/2020 |a |123 |
|01/02/2020 |b |234 |
|01/03/2020 |a |345 |
|01/04/2020 |b |234 |
|01/01/2020 |a |123 |
|01/05/2020 |x |567 |
+-----------+------+-----+

Based on different scenarios, one may need to eliminate duplicates differently.

distinct()

Using .distinct to de-dupe, is the very first way that may come to anyone’s mind. Its pretty standard and quick way to eliminate duplicates if one has to drop identical duplicate rows. .distinct() or simply .distinct will de-dupe the DataFrame by considering all columns. Calling installsDF.distinct() will give us something like this:

+-----------+------+-----+
|installDate|userId|appId|
+-----------+------+-----+
|01/01/2020 |a |123 |
|01/05/2020 |x |567 |
|01/03/2020 |a |345 |
|01/04/2020 |b |234 |
|01/02/2020 |b |234 |
+-----------+------+-----+

dropDuplicates() / dropDuplicates(Seq<String> colNames)

Another way is to use .dropDuplicates. I think .dropDuplicates was introduced since Apache Spark 1.4. Simply calling .dropDuplicates() without any arguments behaves like calling .distinct(). Calling .dropDuplicates(Seq<String> colNames) gives us the flexibility of using only particular columns as a condition to eliminate partially identical rows. Calling installsDF.dropDuplicates("userId", "appId") will give us something of the either two below:

+-----------+------+-----+
|installDate|userId|appId|
+-----------+------+-----+
|01/01/2020 |a |123 |
|01/03/2020 |a |345 |
|01/05/2020 |x |567 |
|01/02/2020 |b |234 |
+-----------+------+-----+

we don’t get the following since, user id b’s 4th day entry comes after 2nd day and the RDD is small enough to fit into one partition.

+-----------+------+-----+
|installDate|userId|appId|
+-----------+------+-----+
|01/01/2020 |a |123 |
|01/03/2020 |a |345 |
|01/05/2020 |x |567 |
|01/04/2020 |b |234 |
+-----------+------+-----+

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

Lets say I want to find the apps installed by a user along with its first install date. Using dropDuplicates, seemed to be giving a similar result but if the records arrived out of order, then it fails to give the right result. In this case, I can use reduceByKey. For this first I will have to convert the RDD into a pairRDD and include installDate and userId as the key and appId as the value.

import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
installsDF
.map( r=> ((r.getAs[String]("userId"), r.getAs[Integer]("appId")), r.getAs[String]("installDate") ))
.rdd
.reduceByKey( (x,y) => {
val pattern = "MM/dd/yyyy"
val installDateMillis1 = DateTime.parse(x, DateTimeFormat.forPattern(pattern)).getMillis
val installDateMillis2 = DateTime.parse(y, DateTimeFormat.forPattern(pattern)).getMillis
if(installDateMillis1 < installDateMillis2) x else y
} ).map{case((userId, appId), (installDate)) => (installDate, userId, appId)}
.toDF("installDate", "userId", "appId")
.show(false)
+-----------+------+-----+
|installDate|userId|appId|
+-----------+------+-----+
|01/02/2020 |b |234 |
|01/01/2020 |a |123 |
|01/05/2020 |x |567 |
|01/03/2020 |a |345 |
+-----------+------+-----+

Using reduceByKey should help in many of the dedup cases, but there is an exclusive condition wherein reduceByKey will be unusable. If there was a case where there were more than 22 columns in keys or values, then Scala tuple’s 22 element limit kicks in. This can be mitigated with a change in design of the RDD by reducing the columns.

Window Function

In cases where the 22 column limit cannot be avoided, one can resort to using Spark Sql’s window function to perform dedupe. To tackle the same problem of finding apps installed by a user along with its first install date, one needs to partition the RDD by userId and appId, ascending sort the installDate column( by converting installDate to unix timestamp), provide a rank to each row and then filter for rows with rank 1.

import org.apache.spark.sql.expressions._val windowConf = Window.partitionBy("userId","appId").orderBy(unix_timestamp($"installDate","MM/dd/yyyy").cast("timestamp”).asc)installsDF
.withColumn("rank", row_number().over(windowConf))
.filter($"rank" === 1)
.show(false)
+-----------+------+-----+----+
|installDate|userId|appId|rank|
+-----------+------+-----+----+
|01/01/2020 |a |123 |1 |
|01/03/2020 |a |345 |1 |
|01/05/2020 |x |567 |1 |
|01/02/2020 |b |234 |1 |
+-----------+------+-----+----+

collect_set(Column e)

Another way of performing a deduping is using collect_set function from spark sql. Its not actually deduping but collapsing records by performaing a groupBy and collecting the unique values for a column related to each group.

import org.apache.spark.sql.functions.collect_setinstallsDF
.groupBy("userId", "appId")
.agg(collect_set("installDate").alias("installDate"))
.show(false)
+------+-----+------------------------+
|userId|appId|installDate |
+------+-----+------------------------+
|a |123 |[01/01/2020] |
|a |345 |[01/03/2020] |
|x |567 |[01/05/2020] |
|b |234 |[01/02/2020, 01/04/2020]|
+------+-----+------------------------+

Sign up to discover human stories that deepen your understanding of the world.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Responses (1)

Write a response