10

I have read a JSON file into Spark. This file has the following structure:

scala> tweetBlob.printSchema
root
 |-- related: struct (nullable = true)
 |    |-- next: struct (nullable = true)
 |    |    |-- href: string (nullable = true)
 |-- search: struct (nullable = true)
 |    |-- current: long (nullable = true)
 |    |-- results: long (nullable = true)
 |-- tweets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- cde: struct (nullable = true)
...
...
 |    |    |-- cdeInternal: struct (nullable = true)
...
...
 |    |    |-- message: struct (nullable = true)
...
...

What I would ideally want is a DataFrame with columns "cde", "cdeInternal", "message"... as shown below

root
|-- cde: struct (nullable = true)
...
...
|-- cdeInternal: struct (nullable = true)
...
...
|-- message: struct (nullable = true)
...
...

I have managed to use "explode" to extract elements from the "tweets" array into a column called "tweets"

scala> val tweets = tweetBlob.select(explode($"tweets").as("tweets"))
tweets: org.apache.spark.sql.DataFrame = [tweets: struct<cde:struct<author:struct<gender:string,location:struct<city:string,country:string,state:string>,maritalStatus:struct<evidence:string,isMarried:string>,parenthood:struct<evidence:string,isParent:string>>,content:struct<sentiment:struct<evidence:array<struct<polarity:string,sentimentTerm:string>>,polarity:string>>>,cdeInternal:struct<compliance:struct<isActive:boolean,userProtected:boolean>,tracks:array<struct<id:string>>>,message:struct<actor:struct<displayName:string,favoritesCount:bigint,followersCount:bigint,friendsCount:bigint,id:string,image:string,languages:array<string>,link:string,links:array<struct<href:string,rel:string>>,listedCount:bigint,location:struct<displayName:string,objectType:string>,objectType:string,postedTime...
scala> tweets.printSchema
root
 |-- tweets: struct (nullable = true)
 |    |-- cde: struct (nullable = true)
...
...
 |    |-- cdeInternal: struct (nullable = true)
...
...
 |    |-- message: struct (nullable = true)
...
...

How can I select all columns inside the struct and create a DataFrame out of it? Explode does not work on a struct if my understanding is correct.

Any help is appreciated.

1
  • Question also asked at Databricks forum, but no response. Commented Nov 23, 2015 at 5:42

3 Answers 3

16

One possible way to handle this is to extract required information from the schema. Lets start with some dummy data:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types._


case class Bar(x: Int, y: String)
case class Foo(bar: Bar)

val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF

df.printSchema

// root
//  |-- bar: struct (nullable = true)
//  |    |-- x: integer (nullable = false)
//  |    |-- y: string (nullable = true)

and a helper function:

def children(colname: String, df: DataFrame) = {
  val parent = df.schema.fields.filter(_.name == colname).head
  val fields = parent.dataType match {
    case x: StructType => x.fields
    case _ => Array.empty[StructField]
  }
  fields.map(x => col(s"$colname.${x.name}"))
}

Finally the results:

df.select(children("bar", df): _*).printSchema

// root
// |-- x: integer (nullable = true)
// |-- y: string (nullable = true)
Sign up to request clarification or add additional context in comments.

4 Comments

Hi @zero323 it might be a silly question, but what does "children("bar", df): _*" this syntax mean?
children("bar", df) is just a call which returns Seq[Column]. :_* performs varargs unpacking.
Is there a way to do this in SparkR?
@nate It should be. SparkR schema is equivalent to the Scala one, and dot syntax is universal for all Spark SQL implementations.
13

You can use

df
  .select(explode(col("path_to_collection")).as("collection"))
  .select(col("collection.*"))`:

Example:

scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}"""

scala> val inline = sqlContext.read.json(sc.parallelize(json :: Nil)).select(explode(col("schools")).as("collection")).select(col("collection.*"))

scala> inline.printSchema
root
 |-- sname: string (nullable = true)
 |-- year: long (nullable = true)

scala> inline.show
+--------+----+
|   sname|year|
+--------+----+
|stanford|2010|
|berkeley|2012|
+--------+----+

Or, you can also use SQL function inline:

scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}"""

scala> sqlContext.read.json(sc.parallelize(json :: Nil)).registerTempTable("tmp")

scala> val inline = sqlContext.sql("SELECT inline(schools) FROM tmp")

scala> inline.printSchema
root
 |-- sname: string (nullable = true)
 |-- year: long (nullable = true)

scala> inline.show
+--------+----+
|   sname|year|
+--------+----+
|stanford|2010|
|berkeley|2012|
+--------+----+

2 Comments

improving your answer to w.r.t Spark 2.0. spark.read.json(spark.createDataset(json :: Nil)).createOrReplaceTempView("tmp")
This sql query may useful to the above solution """select name,inline(schools) from tmp"""
5
scala> import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> case class Bar(x: Int, y: String)
defined class Bar

scala> case class Foo(bar: Bar)
defined class Foo

scala> val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF
df: org.apache.spark.sql.DataFrame = [bar: struct<x: int, y: string>]


scala> df.printSchema
root
 |-- bar: struct (nullable = true)
 |    |-- x: integer (nullable = false)
 |    |-- y: string (nullable = true)


scala> df.select("bar.*").printSchema
root
 |-- x: integer (nullable = true)
 |-- y: string (nullable = true)


scala> 

1 Comment

This method is way easier.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.