Everyday data dedupe with Spark
Recently, I was given a task to dedupe a dataset using Apache spark. There were different ways I considered to accomplish it. Some of them are pretty well known and straight out of the box. In this article I would like to talk about some of them and what how they could be used depending on the use case. I will use Scala for writing all of the code.
Fimport spark.implicits._val installsDF = Seq(
("01/01/2020", "a", 123),("01/02/2020", "b", 234),
("01/03/2020", "a", 345),("01/04/2020", "b", 234),
("01/01/2020", "a", 123),("01/05/2020", "x", 567) ).toDF("installDate", "userId", "appId")installsDF.show(false)
+-----------+------+-----+
|installDate|userId|appId|
+-----------+------+-----+
|01/01/2020 |a |123 |
|01/02/2020 |b |234 |
|01/03/2020 |a |345 |
|01/04/2020 |b |234 |
|01/01/2020 |a |123 |
|01/05/2020 |x |567 |
+-----------+------+-----+
Based on different scenarios, one may need to eliminate duplicates differently.
distinct()
Using .distinct to de-dupe, is the very first way that may come to anyone’s mind. Its pretty standard and quick way to eliminate duplicates if one has to drop identical duplicate rows. .distinct()
or simply .distinct will de-dupe the DataFrame by considering all columns. Calling installsDF.distinct()
will give us something like this:
+-----------+------+-----+
|installDate|userId|appId|
+-----------+------+-----+
|01/01/2020 |a |123 |
|01/05/2020 |x |567 |
|01/03/2020 |a |345 |
|01/04/2020 |b |234 |
|01/02/2020 |b |234 |
+-----------+------+-----+
dropDuplicates() / dropDuplicates(Seq<String> colNames)
Another way is to use .dropDuplicates. I think .dropDuplicates was introduced since Apache Spark 1.4. Simply calling .dropDuplicates()
without any arguments behaves like calling .distinct()
. Calling .dropDuplicates(Seq<String> colNames)
gives us the flexibility of using only particular columns as a condition to eliminate partially identical rows. Calling installsDF.dropDuplicates("userId", "appId")
will give us something of the either two below:
+-----------+------+-----+
|installDate|userId|appId|
+-----------+------+-----+
|01/01/2020 |a |123 |
|01/03/2020 |a |345 |
|01/05/2020 |x |567 |
|01/02/2020 |b |234 |
+-----------+------+-----+
we don’t get the following since, user id b’s 4th day entry comes after 2nd day and the RDD is small enough to fit into one partition.
+-----------+------+-----+
|installDate|userId|appId|
+-----------+------+-----+
|01/01/2020 |a |123 |
|01/03/2020 |a |345 |
|01/05/2020 |x |567 |
|01/04/2020 |b |234 |
+-----------+------+-----+
reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
Lets say I want to find the apps installed by a user along with its first install date. Using dropDuplicates, seemed to be giving a similar result but if the records arrived out of order, then it fails to give the right result. In this case, I can use reduceByKey. For this first I will have to convert the RDD into a pairRDD and include installDate and userId as the key and appId as the value.
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormatinstallsDF
.map( r=> ((r.getAs[String]("userId"), r.getAs[Integer]("appId")), r.getAs[String]("installDate") ))
.rdd
.reduceByKey( (x,y) => {
val pattern = "MM/dd/yyyy"
val installDateMillis1 = DateTime.parse(x, DateTimeFormat.forPattern(pattern)).getMillis
val installDateMillis2 = DateTime.parse(y, DateTimeFormat.forPattern(pattern)).getMillisif(installDateMillis1 < installDateMillis2) x else y
} ).map{case((userId, appId), (installDate)) => (installDate, userId, appId)}
.toDF("installDate", "userId", "appId")
.show(false)
+-----------+------+-----+
|installDate|userId|appId|
+-----------+------+-----+
|01/02/2020 |b |234 |
|01/01/2020 |a |123 |
|01/05/2020 |x |567 |
|01/03/2020 |a |345 |
+-----------+------+-----+
Using reduceByKey should help in many of the dedup cases, but there is an exclusive condition wherein reduceByKey will be unusable. If there was a case where there were more than 22 columns in keys or values, then Scala tuple’s 22 element limit kicks in. This can be mitigated with a change in design of the RDD by reducing the columns.
Window Function
In cases where the 22 column limit cannot be avoided, one can resort to using Spark Sql’s window function to perform dedupe. To tackle the same problem of finding apps installed by a user along with its first install date, one needs to partition the RDD by userId and appId, ascending sort the installDate column( by converting installDate to unix timestamp), provide a rank to each row and then filter for rows with rank 1.
import org.apache.spark.sql.expressions._val windowConf = Window.partitionBy("userId","appId").orderBy(unix_timestamp($"installDate","MM/dd/yyyy").cast("timestamp”).asc)installsDF
.withColumn("rank", row_number().over(windowConf))
.filter($"rank" === 1)
.show(false)
+-----------+------+-----+----+
|installDate|userId|appId|rank|
+-----------+------+-----+----+
|01/01/2020 |a |123 |1 |
|01/03/2020 |a |345 |1 |
|01/05/2020 |x |567 |1 |
|01/02/2020 |b |234 |1 |
+-----------+------+-----+----+
collect_set(Column e)
Another way of performing a deduping is using collect_set function from spark sql. Its not actually deduping but collapsing records by performaing a groupBy and collecting the unique values for a column related to each group.
import org.apache.spark.sql.functions.collect_setinstallsDF
.groupBy("userId", "appId")
.agg(collect_set("installDate").alias("installDate"))
.show(false)
+------+-----+------------------------+
|userId|appId|installDate |
+------+-----+------------------------+
|a |123 |[01/01/2020] |
|a |345 |[01/03/2020] |
|x |567 |[01/05/2020] |
|b |234 |[01/02/2020, 01/04/2020]|
+------+-----+------------------------+