5

Suppose I have the following case

from pyspark.sql.types import *
schema = StructType([  # schema
    StructField("id", StringType(), True),
    StructField("ev", ArrayType(StringType()), True),
    StructField("ev2", ArrayType(StringType()), True),])
df = spark.createDataFrame([{"id": "se1", "ev": ["ev11", "ev12"], "ev2": ["ev11"]},
                            {"id": "se2", "ev": ["ev11"], "ev2": ["ev11", "ev12"]},
                            {"id": "se3", "ev": ["ev21"], "ev2": ["ev11", "ev12"]},
                            {"id": "se4", "ev": ["ev21", "ev22"], "ev2": ["ev21", "ev22"]}],
                           schema=schema)

Which gives me:

df.show()
+---+------------+------------+
| id|          ev|         ev2|
+---+------------+------------+
|se1|[ev11, ev12]|      [ev11]|
|se2|      [ev11]|[ev11, ev12]|
|se3|      [ev21]|[ev11, ev12]|
|se4|[ev21, ev22]|[ev21, ev22]|
+---+------------+------------+

I want to create a new column of boolean (or select only the true cases) for the rows where the contents of the "ev" column are inside the "ev2" column, returning:

df_target.show()
+---+------------+------------+
| id|          ev|         ev2|
+---+------------+------------+
|se2|      [ev11]|[ev11, ev12]|
|se4|[ev21, ev22]|[ev21, ev22]|
+---+------------+------------+

or:

df_target.show()
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|   true|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+

I tried using the isin method:

df.withColumn('evInEv2', df['ev'].isin(df['ev2'])).show()
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|  false|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+

But it looks like it only checks if it's the same array.

I also tried the array_contains function from pyspark.sql.functions but only accepts one object and not an array to check.

I am having difficulties even searching for this due to phrasing the correct problem.

Thanks!

4 Answers 4

11

One more implementation for Spark >= 2.4.0 avoiding UDF and using the built-in array_except:

from pyspark.sql.functions import size, array_except

def is_subset(a, b):
  return size(array_except(a, b)) == 0
  
df.withColumn("is_subset", is_subset(df.ev, df.ev2))

Output:

+---+------------+------------+---------+
| id|          ev|         ev2|is_subset|
+---+------------+------------+---------+
|se1|[ev11, ev12]|      [ev11]|    false|
|se2|      [ev11]|[ev11, ev12]|     true|
|se3|      [ev21]|[ev11, ev12]|    false|
|se4|[ev21, ev22]|[ev21, ev22]|     true|
+---+------------+------------+---------+
Sign up to request clarification or add additional context in comments.

1 Comment

Better option then creating UDF
6

Here's an option using a udf, where we check the length of the difference between the columns ev and ev2. When the length of the resulting array is 0 , or all elements of ev are contained within ev2, we return True; otherwise False.

def contains(x,y):
  z = len(set(x) - set(y))
  if z == 0:
    return True
  else:
    return False

contains_udf = udf(contains)
df.withColumn("evInEv2", contains_udf(df.ev,df.ev2)).show()
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|   true|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+

1 Comment

It worked! Thanks! I didn't even think about using udfs
1

Alternatively, you can use

subsetOf=udf(lambda A,B: set(A).issubset(set(B)))
df.withColumn("evInEv2", subsetOf(df.ev,df.ev2)).show()

Comments

0

I created a Spark UDF:

from pyspark.sql.types import BooleanType
antecedent_inside_predictions = udf(lambda antecedent,prediction: all(elem in prediction for elem in antecedent), BooleanType()) 

and then use it in a join as so:

fp_predictions =  filtered_rules.join(personal_item_recos,antecedent_inside_predictions("antecedent", "item_predictions") )

Note that I needed to enable crossJoins:

spark.conf.set('spark.sql.crossJoin.enabled', True)

(Finally, I extract the particular item I want from the item as follows:

fp_predictions = fp_predictions.withColumn("ITEM_SK", fp_predictions.consequent.getItem(0))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.