1

Assume I have a Spark DataFrame d1 with two columns, elements_1 and elements_2, that contain sets of integers of size k, and value_1, value_2 that contain a integer value. For example, with k = 3:

d1 = 
+------------+------------+
| elements_1 | elements_2 |
+-------------------------+
| (1, 4, 3)  |  (3, 4, 5) |
| (2, 1, 3)  |  (1, 0, 2) |
| (4, 3, 1)  |  (3, 5, 6) |
+-------------------------+

I need to create a new column combinations made that contains, for each pair of sets elements_1 and elements_2, a list of the sets from all possible combinations of their elements. These sets must have the following properties:

  1. Their size must be k+1
  2. They must contain either the set in elements_1 or the set in elements_2

For example, from (1, 2, 3) and (3, 4, 5) we obtain [(1, 2, 3, 4), (1, 2, 3, 5), (3, 4, 5, 1) and (3, 4, 5, 2)]. The list does not contain (1, 2, 5) because it is not of length 3+1, and it does not contain (1, 2, 4, 5) because it contains neither of the original sets.

1 Answer 1

2

You need to create a custom user-defined function to perform the transformation, create a spark-compatible UserDefinedFunction from it, then apply using withColumn. So really, there are two questions here: (1) how to do the set transformation you described, and (2) how to create a new column in a DataFrame using a user-defined function.

Here's a first shot at the set logic, let me know if it does what you're looking for:

def combo[A](a: Set[A], b: Set[A]): Set[Set[A]] = 
    a.diff(b).map(b+_) ++ b.diff(a).map(a+_)

Now create the UDF wrapper. Note that under the hood these sets are all represented by WrappedArrays, so we need to handle this. There's probably a more elegant way to deal with this by defining some implicit conversions, but this should work:

import scala.collection.mutable.WrappedArray
val comboWrap: (WrappedArray[Int],WrappedArray[Int])=>Array[Array[Int]] = 
    (x,y) => combo(x.toSet,y.toSet).map(_.toArray).toArray
val comboUDF = udf(comboWrap)

Finally, apply it to the DataFrame by creating a new column:

val data = Seq((Set(1,2,3),Set(3,4,5))).toDF("elements_1","elements_2")
val result = data.withColumn("result", 
    comboUDF(col("elements_1"),col("elements_2")))
result.show
Sign up to request clarification or add additional context in comments.

7 Comments

Thank you! I get a type mismatch saying that the expected type of the inputs should be Set but it is instead org.apache.spark.sql.Column. What could be the issue?
Did you make sure to use combinationsUDF(), and not combinations(), in the withColumn function? You use org.apache.spark.sql.functions.udf to create a wrapper around your function, so that it can operate on entire columns instead of single values.
I still have another issue though. I get the error scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Set. It looks like the types of my dataset are automatically transformed into array even though the dataset is build from a Seq[Set]
Oops, I didn't realize the DataFrame / Dataset / etc API didn't have a native representation for Sets, under the hood it's all Arrays. The basic approach should still work, but needs to account for this. I'll update the answer in a few minutes
I think the issue is deeper than that: I tried to create a function that takes an Array[Int] and returns the same array performing no operation. Still, I get the same error (this time with type array)
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.