1

I have a complex nested json data file as below and I am trying to consume the data and convert it as

per the below class

case class DeviceData (id: Int, device: String)

where id = 0 and

device = "{""device_id"": 0, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"

But I am stuck at the first step itself while consuming the data and converting them to a simple data frame and getting _corrupt_record error. Please advise what mistake I have made. I am using Spark version 2.4.5

export1.json

0,"{""device_id"": 0, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""68.161.225.1"", ""cca3"": ""USA"", ""cn"": ""United States"", ""temp"": 25, ""signal"": 23, ""battery_level"": 8, ""c02_level"": 917, ""timestamp"" :1475600496 }"
1,"{""device_id"": 1, ""device_type"": ""sensor-igauge"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""213.161.254.1"", ""cca3"": ""NOR"", ""cn"": ""Norway"", ""temp"": 30, ""signal"": 18, ""battery_level"": 6, ""c02_level"": 1413, ""timestamp"" :1475600498 }"
2,"{""device_id"": 2, ""device_type"": ""sensor-ipad"",""battery"":[{""type"": ""electrical""} ,{""type"": ""solar""}], ""ip"": ""88.36.5.1"", ""cca3"": ""ITA"", ""cn"": ""Italy"", ""temp"": 18, ""signal"": 25, ""battery_level"": 5, ""c02_level"": 1372, ""timestamp"" :1475600500 }"

and my spark code is as below

package sparkWCExample.spWCExample

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Dataset
import java.util.Formatter.DateTime
import org.apache.spark.sql.types._  // include the Spark Types to define our schema
import org.apache.spark.sql.functions._ // include the Spark helper functions
import org.apache.spark.sql.functions.to_timestamp

case class DeviceData (id: Int, device: String)

object DatasetExample {

  def main(args: Array[String]) {
    println("Start now")
    val conf = new SparkConf().setAppName("Spark Scala WordCount Example").setMaster("local[1]")
    val spark = SparkSession.builder().config(conf).appName("CsvExample").master("local").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    import spark.implicits._

val readJSONDF = spark.read.json(sc.wholeTextFiles("C:\\Sankha\\Study\\data\\complex-nested-json\\export1.json").values).toDF()
  println(readJSONDF.show())
}
}

I am getting the exception

+--------------------+
|     _corrupt_record|
+--------------------+
|0,"{""device_id""...|
+--------------------+

1 Answer 1

1

sc.wholeTextFiles creates a PairRDD with the key being the file name and the value the content of the whole file. More details can be found here.

You might want to use spark.read.text and then split the lines afterwards:

val df = spark.read.text("export1.json")
  .map(row => {
    val s = row.getAs[String](0)
    val index = s.indexOf(',')
    DeviceData(s.substring(0, index).toInt, s.substring(index+1))
  })
df.show

prints

+---+--------------------+
| id|              device|
+---+--------------------+
|  0|"{""device_id"": ...|
|  1|"{""device_id"": ...|
|  2|"{""device_id"": ...|
+---+--------------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.