df = spark.createDataFrame([("1gh","25g","36h"),("2gf","3ku","4we"),("12w","53v","c74"),("1a2","3d4","4c5"),("232","3df","4rt")], ["a","b","c"])
filter_df = spark.createDataFrame([("2gf","3ku"),("12w","53v"), ("12w","53v")], ["a","b"])
I took "a" OF "filter_df" and created an rdd to then to list from the following code
unique_list = filter_df.select("a").rdd.flatMap(lambda x: x).distinct().collect()
This gives me:
unique_list = [u'2gf', u'12w']
Tried converting the rdd into list using collect operation. But this gives me Allocation Errors shown below
final_df = df.filter(F.col("a").isin(unique_list))
118.255: [GC (Allocation Failure) [PSYoungGen: 1380832K->538097K(1772544K)] 2085158K->1573272K(3994112K), 0.0622847 secs] [Times: user=2.31 sys=1.76, real=0.06 secs]
122.540: [GC (Allocation Failure) [PSYoungGen: 1772529K->542497K(2028544K)] 2807704K->1581484K(4250112K), 0.3217980 secs] [Times: user=11.16 sys=13.15, real=0.33 secs]
127.071: [GC (Allocation Failure) [PSYoungGen: 1776929K->542721K(2411008K)] 2815916K->1582011K(4632576K), 0.8024852 secs] [Times: user=58.43 sys=4.85, real=0.80 secs]
133.284: [GC (Allocation Failure) [PSYoungGen: 2106881K->400752K(2446848K)] 3146171K->1583953K(4668416K), 0.4198589 secs] [Times: user=18.31 sys=12.58, real=0.42 secs]
139.050: [GC (Allocation Failure) [PSYoungGen: 1964912K->10304K(2993152K)] 3148113K->1584408K(5214720K), 0.0712454 secs] [Times: user=2.92 sys=0.88, real=0.08 secs]
146.638: [GC (Allocation Failure) [PSYoungGen: 2188864K->12768K(3036160K)] 3762968K->1588544K(5257728K), 0.1212116 secs] [Times: user=3.05 sys=3.74, real=0.12 secs]
154.153: [GC (Allocation Failure) [PSYoungGen: 2191328K->12128K(3691008K)] 3767104K->1590112K(5912576K), 0.1179030 secs] [Times: user=6.94 sys=0.11, real=0.12 secs
required Output:
final_df
+---+---+---+
| a| b| c|
+---+---+---+
|2gf|3ku|4we|
|12w|53v|c74|
+---+---+---+
What is the effective to filter out the spark dataframe using another rdd or a list or a different dataframe. The above mentioned data is sample. I have bigger dataset in real time
left_semijoin between thedf , filter_dfif you don't wanna see any duplicates. Useinner joinif you don't mind seeing duplicates. If you still wanna use inner join over semi and filter duplicates then do a.distinct on filter_dfbefore performing the join