0

I'm trying to filter df1 by joining df2 based on some column and then filter some rows from df1 based on filter.

df1:

+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|            STS|     green|
|Rapid Cash Plus|     green|
|        DOTOPAL|     green|
|     RAPID CASH|     green|

df2:

+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|            STS|      blue|
|Rapid Cash Plus|      blue|
|        DOTOPAL|      blue|
+---------------+----------+

Sample code is:

df1.join(df2, df1.col("channel") === df2.col("channel"), "leftouter")
      .filter(not(df1.col("rag_status") === "green"))
      .select(df1.col("channel"), df1.col("rag_status")).show

Its not returning any records.

I'm expecting the output as below one, which is returned from df1 after filtering the records based on channel and green status condition. If the same channel is available in the df2 and the df1 rag_status is green, then remove that record from df1 and return the remaining records only from df1.

Expected output is:

+---------------+----------+
|        channel|rag_status|
+---------------+----------+
|     RAPID CASH|     green|
1
  • You don't have any rag_status other than green in your first dataframe, so the filter filter(not(df1.col("rag_status") === "green")) works as expected because you ask for the rows where rag_status is NOT green in df1, and there are none. Commented Nov 29, 2016 at 9:38

2 Answers 2

4

You can work something like this :

val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
val leftJoinResult=df1.join(df2,Array("channel"),"left")
val innerJoinResult=df1.join(df2,"channel")
val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF.show

Spark-shell Output:

scala> val df1=sc.parallelize(Seq(("STS","green"),("Rapid Cash Plus","green"),("RAPID CASH","green"))).toDF("channel","rag_status").where($"rag_status"==="green")
df1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [channel: string, rag_status: string]

scala> val df2=sc.parallelize(Seq(("STS","blue"),("Rapid Cash Plus","blue"),("DOTOPAL","blue"))).toDF("channel","rag_status").withColumnRenamed("rag_status","rag_status2")
df2: org.apache.spark.sql.DataFrame = [channel: string, rag_status2: string]

scala> val leftJoinResult=df1.join(df2,Array("channel"),"left")
leftJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]

scala> val innerJoinResult=df1.join(df2,"channel")
innerJoinResult: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string ... 1 more field]

scala> val resultDF=leftJoinResult.except(innerJoinResult).drop("rag_status2")
resultDF: org.apache.spark.sql.DataFrame = [channel: string, rag_status: string]

scala> resultDF.show
+----------+----------+                                                         
|   channel|rag_status|
+----------+----------+
|RAPID CASH|     green|
+----------+----------+
Sign up to request clarification or add additional context in comments.

5 Comments

your code is not working, if i remove DOTOPAL from df1.. i will edit my question to make it very simple..
As you have fully changed the question ! hence answer is completely updated !
the output contains two rag_status, where one rag_status is coming as null, so when i try to take rag_status it says, column is ambiguous
See i have renamed the column !
@Shankar : Good to know :) Accept it then :P
1

you can use below code to get expected output:

df1.join(df2, Seq("channel"), "leftouter").filter(row => row(3) != "blue")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.