0

I have a dataframe with an array column like:

  val df = Seq(
  Array("abc", "abc", "null", "null"),
  Array("bcd", "bc", "bcd", "null"),
  Array("ijk", "abc", "bcd", "ijk")).toDF("col")

And looks like:

col:
["abc","abc","null","null"]
["bcd","bc","bcd","null"]
["ijk","abc","bcd","ijk"]

I am trying to get the duplicate value of each array in scala:

col_1:
['abc']
['bcd']
['ijk']

I tried to get the duplicate value in the list but no clue on how this can be done with array column

 val df = List("bcd", "bc", "bcd", "null")
 df.groupBy(identity).collect { case (x, List(_,_,_*)) => x }
3
  • 1
    You have to explode the array and then execute the groupBy as you already did Commented Apr 2, 2020 at 12:44
  • Do you have spark 2.4? Commented Apr 2, 2020 at 14:19
  • Yes I am using Spark 2.4.5 Commented Apr 2, 2020 at 14:37

3 Answers 3

2
df.withColumn("id", monotonically_increasing_id())
  .withColumn("col", explode(col("col")))
  .groupBy("id", "col")
  .count()
  .filter(col("count") > 1 /*&& col("col") =!= "null"*/)
  .select("col")
  .show()
Sign up to request clarification or add additional context in comments.

Comments

2

You can simply use custom UDF

def findDuplicate = udf((in: Seq[String]) =>
  in.groupBy(identity)
    .filter(_._2.length > 1)
    .keys
    .toArray
)

df.withColumn("col_1", explode(findDuplicate($"col")))
  .show()

if you are willing to skip null values (as in your example) just add .filterNot(_ == "null") before .groupBy

2 Comments

I am getting an error when I run the dataframe in spark. org.apache.spark.SparkException: Failed to execute user defined function($anonfun$findDuplicate$1: (array<string>) => array<string>)
If you are going to call .groupBy on the in arg, then Seq[String] might be the wrong type.
1

The duplicate values of an array column could be obtained by assigning a monotonically increasing id to each array, exploding the array, and then window grouping by id and col.

import org.apache.spark.sql.functions.max
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.monotonically_increasing_id
import spark.implicits._
val df = spark.sparkContext.parallelize(Seq(
  Array("abc", "abc", null, null),
  Array("bcd", "bc", "bcd", null),
  Array("ijk", "abc", "bcd", "ijk"))).toDF("col")
df.show(10)

val idfDF = df.withColumn("id", monotonically_increasing_id)
val explodeDF = idfDF.select(col("id"), explode(col("col")))

val countDF = explodeDF.groupBy("id", "col").count()

// Windows are partitions of id
val byId = Window.partitionBy("id")
val maxDF = countDF.withColumn("max", max("count") over byId)

val finalDf = maxDF.where("max == count").where("col is not null").select("col")
finalDf.show(10)

+---+
|col|
+---+
|abc|
|ijk|
|bcd| 
+---+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.