I have below data in Scala in Spark environment -
val abc = Seq(
(Array("A"),0.1),
(Array("B"),0.11),
(Array("C"),0.12),
(Array("A","B"),0.24),
(Array("A","C"),0.27),
(Array("B","C"),0.30),
(Array("A","B","C"),0.4)
).toDF("channel_set", "rate")
abc.show(false)
abc.createOrReplaceTempView("abc")
val df = abc.withColumn("totalChannels",size(col("channel_set"))).toDF()
df.show()
scala> df.show
+-----------+----+-------------+
|channel_set|rate|totalChannels|
+-----------+----+-------------+
| [A]| 0.1| 1|
| [B]|0.11| 1|
| [C]|0.12| 1|
| [A, B]|0.24| 2|
| [A, C]|0.27| 2|
| [B, C]| 0.3| 2|
| [A, B, C]| 0.4| 3|
+-----------+----+-------------+
val oneChannelDF = df.filter($"totalChannels" === 1)
oneChannelDF.show()
oneChannelDF.createOrReplaceTempView("oneChannelDF")
+-----------+----+-------------+
|channel_set|rate|totalChannels|
+-----------+----+-------------+
| [A]| 0.1| 1|
| [B]|0.11| 1|
| [C]|0.12| 1|
+-----------+----+-------------+
val twoChannelDF = df.filter($"totalChannels" === 2)
twoChannelDF.show()
twoChannelDF.createOrReplaceTempView("twoChannelDF")
+-----------+----+-------------+
|channel_set|rate|totalChannels|
+-----------+----+-------------+
| [A, B]|0.24| 2|
| [A, C]|0.27| 2|
| [B, C]| 0.3| 2|
+-----------+----+-------------+
I want to join oneChannel and twoChannel dataframes so that I can see my resultant data as below -
+-----------+----+-------------+------------+-------+
|channel_set|rate|totalChannels|channel_set | rate |
+-----------+----+-------------+------------+-------+
| [A]| 0.1| 1| [A,B] | 0.24 |
| [A]| 0.1| 1| [A,C] | 0.27 |
| [B]|0.11| 1| [A,B] | 0.24 |
| [B]|0.11| 1| [B,C] | 0.30 |
| [C]|0.12| 1| [A,C] | 0.27 |
| [C]|0.12| 1| [B,C] | 0.30 |
+-----------+----+-------------+------------+-------+
Basically I need all the rows where a record from oneChannel dataframe in present in twoChannel dataframe.
I have tried -
spark.sql("""select * from oneChannelDF one inner join twoChannelDF two on array_contains(one.channel_set,two.channel_set)""").show()
However, I am facing this error -
org.apache.spark.sql.AnalysisException: cannot resolve 'array_contains(one.`channel_set`, two.`channel_set`)' due to data type mismatch: Arguments must be an array followed by a value of same type as the array members; line 1 pos 62;