4

I have 2 dataframes df1 and df2,

df1 has column Name with values like a,b,c etc df2 has column Id with values like a,b

If Name column in df1 has a match in Id column in df2, then we need to have match status as 0. If there is no match then we need to have match status as 1. I know that I can put df2 ID column in a collection using collect and then check if Name column in df1 has matching entry.

val df1 = Seq(“Rey”, “John”).toDF(“Name”)
val df2 = Seq(“Rey”).toDF(“Id”)

val collect = df2.select("Id").map(r => r.getString(0)).collect.toList 

something like,

    val df3 = 
    df1.withColumn("match_sts",when(df1("Name").isin(collect).then(0).else(1)))

Expected output
+ — — + — -+
|Name|match_sts|
+ — — + — -+
| Rey| 0  |
|John| 1  |
+ — — + — -+

But I don't want to use collect here. Is there any alternate approach available.

6
  • May be an example and your own try? Can use spark sql with EXISTS or outer join... Commented Jul 15, 2019 at 14:24
  • AS thebluephantom said, please share your attempts or at least examples of your dataframes. Commented Jul 15, 2019 at 14:43
  • @thebluephantom shared the approach, I dont want to do a join for df1 with df2 Commented Jul 15, 2019 at 19:10
  • Indeed, but you need some sort of action to get a so-called side-effect. Aha, but how big is your collect? Commented Jul 15, 2019 at 19:13
  • @thebluephantom should be less than 1 lakh rows Commented Jul 15, 2019 at 19:25

1 Answer 1

2

With collect is not what you want, but is a well -known issue for DF col --> list conversion. If not a huge list, then you can do - this works actually, you can also broadcast the inlist:

import org.apache.spark.sql.functions._

val df1 = Seq("Rey", "John", "Donald", "Trump").toDF("Name")
val df2 = Seq("Rey", "Donald").toDF("Id")

val inlist = df2.select("Id").map(r => r.getString(0)).collect.toList

val df3 = df1.withColumn("match_status", when(df1("Name").isin(inlist: _*),1).otherwise(0))
df3.show(false)

Even in the classical examples that use the stopwords from a file for filtering output, they do this:

val stopWords = stopWordsInput.flatMap(x => x.split(" ")).map(_.trim).collect.toSet

and broadcast if too big to the Workers. But I am not sure what 1 lakh is!!!

Another approach is with Spark SQL, relying on Catalyst to optimize SQL when EXISTS is used:

import spark.implicits._ 
import org.apache.spark.sql.functions._

val df1 = Seq("Rey", "John", "Donald", "Trump").toDF("Name")
val df2 = Seq("Rey", "Donald").toDF("Id") // This can be read from file and split etc.

// Optimizer converts to better physical plan for performance in general
df1.createOrReplaceTempView("searchlist") 
df2.createOrReplaceTempView("inlist")    
val df3 = spark.sql("""SELECT Name, 1 
                     FROM searchlist A
                    WHERE EXISTS (select B.Id from inlist B WHERE B.Id = A.Name )
                                   UNION
                   SELECT Name, 0 
                     FROM searchlist A
                    WHERE NOT EXISTS (select B.Id from inlist B WHERE B.Id = A.Name )
                """)
df3.show(false)
Sign up to request clarification or add additional context in comments.

3 Comments

@thebluephantom I tried your first approach, but I don't understand something: the OP had said he wanted 1 if there was NO match, but your solution provides 1 if there IS a match, correct?
It's a while ago, I will look at tonight.
u could well be right, interchangeable. easy.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.