18

I have two DataFrames with two columns

  • df1 with schema (key1:Long, Value)

  • df2 with schema (key2:Array[Long], Value)

I need to join these DataFrames on the key columns (find matching values between key1 and values in key2). But the problem is that they have not the same type. Is there a way to do this?

2
  • key2 from df2 must contain key2 from df1? Commented Jan 11, 2017 at 15:53
  • one way is like explode the Array[long] and then do the join with the df1 dataframe Commented Jan 11, 2017 at 17:49

2 Answers 2

28

The best way to do this (and the one that doesn't require any casting or exploding of dataframes) is to use the array_contains spark sql expression as shown below.

import org.apache.spark.sql.functions.expr
import spark.implicits._

val df1 = Seq((1L,"one.df1"), (2L,"two.df1"),(3L,"three.df1")).toDF("key1","Value")

val df2 = Seq((Array(1L,1L),"one.df2"), (Array(2L,2L),"two.df2"), (Array(3L,3L),"three.df2")).toDF("key2","Value")

val joinedRDD = df1.join(df2, expr("array_contains(key2, key1)")).show

+----+---------+------+---------+
|key1|    Value|  key2|    Value|
+----+---------+------+---------+
|   1|  one.df1|[1, 1]|  one.df2|
|   2|  two.df1|[2, 2]|  two.df2|
|   3|three.df1|[3, 3]|three.df2|
+----+---------+------+---------+

Please note that you cannot use the org.apache.spark.sql.functions.array_contains function directly as it requires the second argument to be a literal as opposed to a column expression.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks. The code worked in pyspark. But what is the purpose of import spark.implicits._? I am not able to find this module in pyspark
import spark.implicits._ is used in SCALA, you don't need it in PySpark
2

You can cast the type of key1 and key2 and then use the contains function, as follow.

val df1 = sc.parallelize(Seq((1L,"one.df1"), 
                             (2L,"two.df1"),      
                             (3L,"three.df1"))).toDF("key1","Value")  

DF1:
+----+---------+
|key1|Value    |
+----+---------+
|1   |one.df1  |
|2   |two.df1  |
|3   |three.df1|
+----+---------+

val df2 = sc.parallelize(Seq((Array(1L,1L),"one.df2"),
                             (Array(2L,2L),"two.df2"),
                             (Array(3L,3L),"three.df2"))).toDF("key2","Value")
DF2:
+------+---------+
|key2  |Value    |
+------+---------+
|[1, 1]|one.df2  |
|[2, 2]|two.df2  |
|[3, 3]|three.df2|
+------+---------+

val joinedRDD = df1.join(df2, col("key2").cast("string").contains(col("key1").cast("string")))

JOIN:
+----+---------+------+---------+
|key1|Value    |key2  |Value    |
+----+---------+------+---------+
|1   |one.df1  |[1, 1]|one.df2  |
|2   |two.df1  |[2, 2]|two.df2  |
|3   |three.df1|[3, 3]|three.df2|
+----+---------+------+---------+

1 Comment

The string "123" contains the strings "23", "12", "1", etc. Casting to strings is going to join things that shouldn't be joined.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.