1

I have below data in Scala in Spark environment -

val abc = Seq(
  (Array("A"),0.1),
  (Array("B"),0.11),
  (Array("C"),0.12),
  (Array("A","B"),0.24),
  (Array("A","C"),0.27),
  (Array("B","C"),0.30),
  (Array("A","B","C"),0.4)
).toDF("channel_set", "rate")

abc.show(false)
abc.createOrReplaceTempView("abc")

val df = abc.withColumn("totalChannels",size(col("channel_set"))).toDF()
df.show()

scala> df.show
+-----------+----+-------------+
|channel_set|rate|totalChannels|
+-----------+----+-------------+
|        [A]| 0.1|            1|
|        [B]|0.11|            1|
|        [C]|0.12|            1|
|     [A, B]|0.24|            2|
|     [A, C]|0.27|            2|
|     [B, C]| 0.3|            2|
|  [A, B, C]| 0.4|            3|
+-----------+----+-------------+



val oneChannelDF = df.filter($"totalChannels" === 1)
oneChannelDF.show()
oneChannelDF.createOrReplaceTempView("oneChannelDF")

+-----------+----+-------------+
|channel_set|rate|totalChannels|
+-----------+----+-------------+
|        [A]| 0.1|            1|
|        [B]|0.11|            1|
|        [C]|0.12|            1|
+-----------+----+-------------+


val twoChannelDF = df.filter($"totalChannels" === 2)
twoChannelDF.show()
twoChannelDF.createOrReplaceTempView("twoChannelDF")

+-----------+----+-------------+
|channel_set|rate|totalChannels|
+-----------+----+-------------+
|     [A, B]|0.24|            2|
|     [A, C]|0.27|            2|
|     [B, C]| 0.3|            2|
+-----------+----+-------------+

I want to join oneChannel and twoChannel dataframes so that I can see my resultant data as below -

+-----------+----+-------------+------------+-------+
|channel_set|rate|totalChannels|channel_set | rate  |
+-----------+----+-------------+------------+-------+
|        [A]| 0.1|            1|     [A,B]  |  0.24 |
|        [A]| 0.1|            1|     [A,C]  |  0.27 |
|        [B]|0.11|            1|     [A,B]  |  0.24 |
|        [B]|0.11|            1|     [B,C]  |  0.30 |
|        [C]|0.12|            1|     [A,C]  |  0.27 |
|        [C]|0.12|            1|     [B,C]  |  0.30 |
+-----------+----+-------------+------------+-------+

Basically I need all the rows where a record from oneChannel dataframe in present in twoChannel dataframe.

I have tried -

spark.sql("""select * from oneChannelDF one inner join twoChannelDF two on array_contains(one.channel_set,two.channel_set)""").show()

However, I am facing this error -

org.apache.spark.sql.AnalysisException: cannot resolve 'array_contains(one.`channel_set`, two.`channel_set`)' due to data type mismatch: Arguments must be an array followed by a value of same type as the array members; line 1 pos 62;

1 Answer 1

2

I guess I figured out the error. I need to pass a member as an argument to the array_contains() method. Since the size of every element in channel_set column for oneChannelDF is 1, hence below code gets me the correct data frame.

scala> spark.sql("""select * from oneChannelDF one inner join twoChannelDF two where array_contains(two.channel_set,one.channel_set[0])""").show()
+-----------+----+-------------+-----------+----+-------------+
|channel_set|rate|totalChannels|channel_set|rate|totalChannels|
+-----------+----+-------------+-----------+----+-------------+
|        [A]| 0.1|            1|     [A, B]|0.24|            2|
|        [A]| 0.1|            1|     [A, C]|0.27|            2|
|        [B]|0.11|            1|     [A, B]|0.24|            2|
|        [B]|0.11|            1|     [B, C]| 0.3|            2|
|        [C]|0.12|            1|     [A, C]|0.27|            2|
|        [C]|0.12|            1|     [B, C]| 0.3|            2|
+-----------+----+-------------+-----------+----+-------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.