5

Lets say i loaded a json file into Spark 1.6 via

sqlContext.read.json("/hdfs/")

it gives me a Dataframe with following schema:

root
 |-- id: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- checked: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- color: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: array (nullable = true)
 |    |-- element: string (containsNull = true)

The DF has only one row with an Array of all my Items inside.

+--------------------+--------------------+--------------------+
|                id_e|           checked_e|             color_e|
+--------------------+--------------------+--------------------+
|[0218797c-77a6-45...|[false, true, tru...|[null, null, null...|
+--------------------+--------------------+--------------------+

I want to have a DF with the arrays exploded into one item per line.

+--------------------+-----+-------+
|                  id|color|checked|
+--------------------+-----+-------+
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
|0218797c-77a6-45f...| null|  false|
...

So far i achieved this by creating a temporary table from the array DF and used sql with lateral view explode on these lines.

val results = sqlContext.sql("
SELECT id, color, checked from temptable 
lateral view explode(checked_e) temptable as checked 
lateral view explode(id_e) temptable as id 
lateral view explode(color_e) temptable as color
")

Is there any way to achieve this directly with Dataframe functions without using SQL? I know there is something like df.explode(...) but i could not get it to work with my Data

EDIT: It seems the explode isnt what i really wanted in the first place. I want a new dataframe that has each item of the arrays line by line. The explode function actually gives back way more lines than my initial dataset has.

2 Answers 2

5

The following solution should work.

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._

val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
val df = sqlContext.createDataFrame(data)

val udf3 = udf[Seq[(Int, Int, Int)], Seq[Int], Seq[Int], Seq[Int]]{
    case (a, b, c) => (a,b, c).zipped.toSeq
}

val df3 = df.select(udf3($"_1", $"_2", $"_3").alias("udf3"))
val exploded = df3.select(explode($"udf3").alias("col3"))

exploded.withColumn("first", $"col3".getItem("_1"))
    .withColumn("second", $"col3".getItem("_2"))
    .withColumn("third", $"col3".getItem("_3")).show

While it is more straightforward if using normal Scala code directly. It might be more efficient too. Spark could not help anyway if there is only one row.

val data = Seq((Seq(1,2,3),Seq(4,5,6),Seq(7,8,9)))
val seqExploded = data.flatMap{
    case (a: Seq[Int], b: Seq[Int], c: Seq[Int]) => (a, b, c).zipped.toSeq
}
val dfTheSame=sqlContext.createDataFrame(seqExploded)
dfTheSame.show
Sign up to request clarification or add additional context in comments.

Comments

1

It should be simple like this:

df.withColumn("id", explode(col("id_e")))
  .withColumn("checked", explode(col("checked_e")))
  .withColumn("color", explode(col("color_e")))

2 Comments

well your code seems to do what i did with the sql statement, but when i checked it, i figured out that this explode isn't really what i need. My initial dataset is ~600 lines each. After the explode i have ~ 180 mil lines. what i actually want is just taking out the elements of the arrays line by line to create a new dataframe
you're right, my answer produces cartesian product, looks like @Rockie Yang solution should do it in the right way

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.