35

I'm working through a Databricks example. The schema for the dataframe looks like:

> parquetDF.printSchema
root
|-- department: struct (nullable = true)
|    |-- id: string (nullable = true)
|    |-- name: string (nullable = true)
|-- employees: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- firstName: string (nullable = true)
|    |    |-- lastName: string (nullable = true)
|    |    |-- email: string (nullable = true)
|    |    |-- salary: integer (nullable = true)

In the example, they show how to explode the employees column into 4 additional columns:

val explodeDF = parquetDF.explode($"employees") { 
case Row(employee: Seq[Row]) => employee.map{ employee =>
  val firstName = employee(0).asInstanceOf[String]
  val lastName = employee(1).asInstanceOf[String]
  val email = employee(2).asInstanceOf[String]
  val salary = employee(3).asInstanceOf[Int]
  Employee(firstName, lastName, email, salary)
 }
}.cache()
display(explodeDF)

How would I do something similar with the department column (i.e. add two additional columns to the dataframe called "id" and "name")? The methods aren't exactly the same, and I can only figure out how to create a brand new data frame using:

val explodeDF = parquetDF.select("department.id","department.name")
display(explodeDF)

If I try:

val explodeDF = parquetDF.explode($"department") { 
  case Row(dept: Seq[String]) => dept.map{dept => 
  val id = dept(0) 
  val name = dept(1)
  } 
}.cache()
display(explodeDF)

I get the warning and error:

<console>:38: warning: non-variable type argument String in type pattern Seq[String] is unchecked since it is eliminated by erasure
            case Row(dept: Seq[String]) => dept.map{dept => 
                           ^
<console>:37: error: inferred type arguments [Unit] do not conform to    method explode's type parameter bounds [A <: Product]
  val explodeDF = parquetDF.explode($"department") { 
                                   ^

3 Answers 3

51

In my opinion the most elegant solution is to star expand a Struct using a select operator as shown below:

var explodedDf2 = explodedDf.select("department.*","*")

https://docs.databricks.com/spark/latest/spark-sql/complex-types.html

Sign up to request clarification or add additional context in comments.

Comments

25

You could use something like that:

var explodeDF = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDeptDF = explodeDeptDF.withColumn("name", explodeDeptDF("department.name"))

which you helped me into and these questions:

5 Comments

A stage failure: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage 41.0 (TID 1403, 10.81.214.49): scala.MatchError: [[789012,Mechanical Engineering]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
@Feynman27 does this help? It seems to match your attempt. I think the problem with my answer is that the employees has also an element, while department has not.
Yeah, the employees example creates new rows, whereas the department example should only create two new columns.
Can we do this for all nested columns with renaming at once? For example, department.id -> inner_id, department.name -> inner_name, ...
3

This seems to work (though maybe not the most elegant solution).

var explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDF2 = explodeDF2.withColumn("name", explodeDF2("department.name"))

1 Comment

you could val explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id")).withColumn("name", explodeDF2("department.name"))

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.