0

I am using Spark 2.1 and Zeppelin 0.7 to do the following. (this is inspired by the Databricks tutorial (https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html))

I have created the following schema

val jsonSchema = new StructType()
.add("Records", ArrayType(new StructType()
    .add("Id", IntegerType)
    .add("eventDt", StringType)
    .add("appId", StringType)
    .add("userId", StringType)
    .add("eventName", StringType)
    .add("eventValues", StringType)
   )
  )

to read in the following json 'array' file, which i have in my 'inputPath' directory

{
"Records": [{
    "Id": 9550,
    "eventDt": "1491810477700",
    "appId": "dandb01",
    "userId": "985580",
    "eventName": "OG: HR: SELECT",
    "eventValues": "985087"
    },
    ... other records
]}

val rawRecords = spark.read.schema(jsonSchema).json(inputPath)

I then want to explode these records to get to the individual events

val events = rawRecords.select(explode($"Records").as("record"))

But rawRecords.show() and events.show() are both null.

Any idea what i am doing wrong? In the past i know i should be using JSONL for this, but the Databricks tutorial suggests that the latest version of spark should now support json arrays.

1
  • Actually your code works. It is your json file. Spark does not like formatted JSON. Try to format a one liner json and it will work. Commented Apr 11, 2017 at 14:33

1 Answer 1

1

I did the following :

  1. I have a file foo.txt with below data

{"Records":[{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"}]} {"Records":[{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"},{"Id":9550,"eventDt":"1491810477700","appId":"dandb01","userId":"985580","eventName":"OG:HR:SELECT","eventValues":"985087"}]}

  1. I have the following code

    import sqlContext.implicits._ import org.apache.spark.sql.functions._

    val df = sqlContext.read.json("foo.txt") df.printSchema()
    df.select(explode($"Records").as("record")).show

  2. I get the following output

root |-- Records: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- Id: long (nullable = true) |
| |-- appId: string (nullable = true) | | |-- eventDt: string (nullable = true) | | |-- eventName: string (nullable = true) | | |-- eventValues: string (nullable = true) | |
|-- userId: string (nullable = true)

+--------------------+
|              record|
+--------------------+
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
|[9550,dandb01,149...|
+--------------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.