0

I have two dataframes as below:

df1: Which will have few values as below. This is dynamic.

+--------------+
|tags          |
+--------------+
|first_name    |
|last_name     |
|primary_email |
|other_email   |
+--------------+

df2: The second dataframe has few pre-defined combinations as below:

+---------------------------------------------------------------------------------------------+
|combinations                                                                                 |
+---------------------------------------------------------------------------------------------+
|last_name, first_name, primary_email                                                         |
|last_name, first_name, other_email                                                           |
|last_name, primary_email, primary_phone                                                      |
|last_name, primary_email, secondary_phone                                                    |
|last_name, address_line1, address_line2,city_name, state_name,postal_code, country_code, guid|
+---------------------------------------------------------------------------------------------+

Expected Result DF: Now, I wanted to find that from my dataframe, is there any valid combinations I can make. The result should have all the valid combinations if it matches any with combinations dataframe.

resultDF:

+---------------------------------------------------------------------------------------------+
|combinations                                                                                 |
+---------------------------------------------------------------------------------------------+
|last_name, first_name, primary_email                                                         |
|last_name, first_name, other_email                                                           |
+---------------------------------------------------------------------------------------------+

I tried an approach of converting both the dataframes into list and try to compare it, but I always getting 0 combinations.

The scala code I tried.

val combinationList = combinations.map(r => r.getString(0)).collect.toList

var combList: Seq[Seq[String]]  = Seq.empty

    for (comb <- combinationList) {
      var tmp: Seq[String] = Seq.empty
      tmp = tmp :+ comb
      combList = combList :+ tmp
    }

val result = combList.filter(
      list => df1.filter(df1.col("tags").isin(list: _*)).count == list.size
    )

println(result.size)

This is always return 0. The answer should be 2.

Can someone guide me what is the best approach ?

1 Answer 1

1

Try this. Collect your df1, make a new array column in df2 with df1's values. Compare the two arrays using array_except if using Spark 2.4, which returns the difference of the tow arrays. Then filter if the size of that == 0

scala> val df1 = Seq(
     |   "first_name",
     |   "last_name",
     |   "primary_email",
     |   "other_email" 
     | ).toDF("tags")
df1: org.apache.spark.sql.DataFrame = [tags: string]

scala> 

scala> val df2 = Seq(
     | Seq("last_name", "first_name", "primary_email"),                                                         
     | Seq("last_name", "first_name", "other_email"),
     | Seq("last_name", "primary_email", "primary_phone"),                                                      
     | Seq("last_name", "primary_email", "secondary_phone"),
     | Seq("last_name", "address_line1", "address_line2", "city_name", "state_name", "postal_code", "country_code", "guid")
     | ).toDF("combinations")
df2: org.apache.spark.sql.DataFrame = [combinations: array<string>]

scala> 

scala> df1.show(false)
+-------------+
|tags         |
+-------------+
|first_name   |
|last_name    |
|primary_email|
|other_email  |
+-------------+


scala> 

scala> df2.show(false)
+-------------------------------------------------------------------------------------------------+
|combinations                                                                                     |
+-------------------------------------------------------------------------------------------------+
|[last_name, first_name, primary_email]                                                           |
|[last_name, first_name, other_email]                                                             |
|[last_name, primary_email, primary_phone]                                                        |
|[last_name, primary_email, secondary_phone]                                                      |
|[last_name, address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|
+-------------------------------------------------------------------------------------------------+


scala> 

scala> val df1tags = df1.collect.map(r => r.getString(0))
df1tags: Array[String] = Array(first_name, last_name, primary_email, other_email)

scala> 

scala> val df3 = df2.withColumn("tags", lit(df1tags))
df3: org.apache.spark.sql.DataFrame = [combinations: array<string>, tags: array<string>]

scala> df3.show(false)
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+
|combinations                                                                                     |tags                                               |
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+
|[last_name, first_name, primary_email]                                                           |[first_name, last_name, primary_email, other_email]|
|[last_name, first_name, other_email]                                                             |[first_name, last_name, primary_email, other_email]|
|[last_name, primary_email, primary_phone]                                                        |[first_name, last_name, primary_email, other_email]|
|[last_name, primary_email, secondary_phone]                                                      |[first_name, last_name, primary_email, other_email]|
|[last_name, address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|[first_name, last_name, primary_email, other_email]|
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+


scala> 

scala> val df4 = df3.withColumn("combMinusTags", array_except($"combinations", $"tags"))
df4: org.apache.spark.sql.DataFrame = [combinations: array<string>, tags: array<string> ... 1 more field]

scala> df4.show(false)
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+
|combinations                                                                                     |tags                                               |combMinusTags                                                                         |
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+
|[last_name, first_name, primary_email]                                                           |[first_name, last_name, primary_email, other_email]|[]                                                                                    |
|[last_name, first_name, other_email]                                                             |[first_name, last_name, primary_email, other_email]|[]                                                                                    |
|[last_name, primary_email, primary_phone]                                                        |[first_name, last_name, primary_email, other_email]|[primary_phone]                                                                       |
|[last_name, primary_email, secondary_phone]                                                      |[first_name, last_name, primary_email, other_email]|[secondary_phone]                                                                     |
|[last_name, address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|[first_name, last_name, primary_email, other_email]|[address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+


scala> 

scala> 

scala> df4.filter(size($"combMinusTags") === 0).show(false)
+--------------------------------------+---------------------------------------------------+-------------+
|combinations                          |tags                                               |combMinusTags|
+--------------------------------------+---------------------------------------------------+-------------+
|[last_name, first_name, primary_email]|[first_name, last_name, primary_email, other_email]|[]           |
|[last_name, first_name, other_email]  |[first_name, last_name, primary_email, other_email]|[]           |
+--------------------------------------+---------------------------------------------------+-------------+


Spark 2.3

write your own array_except function as udf.

scala> def array_expt[T](a: Seq[T], b:Seq[T]):Seq[T] = {
     |   a.diff(b)
     | } 
array_expt: [T](a: Seq[T], b: Seq[T])Seq[T]

scala> 

scala> val myUdf = udf { array_expt[String] _ }
myUdf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function2>,ArrayType(StringType,true),Some(List(ArrayType(StringType,true), ArrayType(StringType,true))))

scala> 

scala> val df4 = df3.withColumn("combMinusTags", myUdf($"combinations", $"tags"))
df4: org.apache.spark.sql.DataFrame = [combinations: array<string>, tags: array<string> ... 1 more field]

scala> df4.show(false)
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+
|combinations                                                                                     |tags                                               |combMinusTags                                                                         |
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+
|[last_name, first_name, primary_email]                                                           |[first_name, last_name, primary_email, other_email]|[]                                                                                    |
|[last_name, first_name, other_email]                                                             |[first_name, last_name, primary_email, other_email]|[]                                                                                    |
|[last_name, primary_email, primary_phone]                                                        |[first_name, last_name, primary_email, other_email]|[primary_phone]                                                                       |
|[last_name, primary_email, secondary_phone]                                                      |[first_name, last_name, primary_email, other_email]|[secondary_phone]                                                                     |
|[last_name, address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|[first_name, last_name, primary_email, other_email]|[address_line1, address_line2, city_name, state_name, postal_code, country_code, guid]|
+-------------------------------------------------------------------------------------------------+---------------------------------------------------+--------------------------------------------------------------------------------------+


scala> 

scala> df4.filter(size($"combMinusTags") === 0).show(false)
+--------------------------------------+---------------------------------------------------+-------------+
|combinations                          |tags                                               |combMinusTags|
+--------------------------------------+---------------------------------------------------+-------------+
|[last_name, first_name, primary_email]|[first_name, last_name, primary_email, other_email]|[]           |
|[last_name, first_name, other_email]  |[first_name, last_name, primary_email, other_email]|[]           |
+--------------------------------------+---------------------------------------------------+-------------+



Sign up to request clarification or add additional context in comments.

4 Comments

Hi @C.S.Reddy Gadipally, I am trying your approach and stuck at array_except, I am on Spark 2.3 and this method seems not available. I tried few other ways to do except but nothing seems worked out.
Write your own array_except UDF, see my ans. I am editing my answer to include spark 2.3
Hi @C.S.Reddy Gadipally, I tried the udf approach and it is not giving me the correct results. I think diff is comparing the two arrays and provide the difference of the first array. But the goal is to find whether the combinations are subset of tags
if the diff returns an empty list, then cobinations is a proper subset of tags, which is what you need. Post the input and incorrect result and your expected result.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.