3

I don't even know what the best title to phrase the questions.

I have the following dataset

df = spark.createDataFrame([\
            (["1", "2","3","4"], ),\
            (["1","2","3"], ),\
            (["2","1","3"], ),\
            (["2","3","4","1"], ),\
            (["6","7"], )\
], ['cycle', ])
df.show()

+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|   [1, 2, 3]|
|   [2, 1, 3]|
|[2, 3, 4, 1]|
|      [6, 7]|
+------------+

What I would like to have at the end is:

  1. remove the permutations
  2. keep only the row with the maximum row that contains the all other sets

I can use sort_array() and distinct() to get rid of the permutations

df.select(f.sort_array("cycle").alias("cycle")).distinct().show() 
+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|      [6, 7]|
|   [1, 2, 3]|
+------------+

What I would like to reduce the dataset with Pyspark is:

+------------+
|       cycle|
+------------+
|[1, 2, 3, 4]|
|      [6, 7]|
+------------+

So check somehow that [1, 2, 3] is part of [1, 2, 3, 4] and only keep So the Python Subset command A.issubset(B) applied in the Pyspark, Spark way over a column

The only way I can currently think of is a horrible iterative loop over very row which will kill every performance

4
  • Just as a hint in a direction: Did you have a look at rolling window functions? Commented Nov 19, 2019 at 16:03
  • what is your spark version, 2.4+ or below? Commented Nov 20, 2019 at 4:08
  • My Spark version is 2.4+ Commented Nov 20, 2019 at 7:39
  • I tried around with window functions to aggregate but did not find a way so far. Commented Nov 20, 2019 at 7:41

1 Answer 1

2

One way you might try is to first find all cycles with at least one superset (exclude self) by using a self join to find d2.cycle which satisfies the following conditions:

  • size(array_except(d2.cycle, d1.cycle))==0: none items in d2.cycle are excluded in d1.cycle (EMPTY array will satisfy)
  • size(d2.cycle) < size(d1.cycle): the size of d2.cycle is less than the size of d1.cycle:

and then take a left_anti join to exclude the above list from the original dataframe, finally run sort_array and drop_duplicates(or distinct):

from pyspark.sql.functions import expr

df_sub = df.alias('d1').join(
      df.alias('d2')
    , expr('size(array_except(d2.cycle, d1.cycle))==0 AND size(d2.cycle) < size(d1.cycle)')
).select('d2.cycle').distinct()

df_sub.show()
#+---------+
#|    cycle|
#+---------+
#|[1, 2, 3]|
#|[2, 1, 3]|
#+---------+

df.join(df_sub , on=['cycle'], how='left_anti') \
  .withColumn('cycle', expr('sort_array(cycle)')) \
  .distinct() \
  .show()
#+------------+                                                                  
#|       cycle|
#+------------+
#|[1, 2, 3, 4]|
#|      [6, 7]|
#+------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.