1

How to convert csv data to custom object in spark. Below are my code snippet

val sparkSession = SparkSession
      .builder()
      .appName("Spark SQL basic example")
      .master("local[2]")
      .getOrCreate()

    val citiData = sparkSession.read.option("header", "true").option("inferSchema", "true").csv(filePath) // removing header,and applying schema

    //citiData.describe().show()
    import sparkSession.implicits._
    val s: Dataset[CityData] = citiData.as[CityData]

  }
  //Date,Open,High,Low,Close,Volume
  case class CityData(processingDate: java.util.Date, Open: Double, High: Double, Low: Double, Volume: Double)

Sample DataSet:

Date,Open,High,Low,Close,Volume
2006-01-03,490.0,493.8,481.1,492.9,1537660
2006-01-04,488.6,491.0,483.5,483.8,1871020
2006-01-05,484.4,487.8,484.0,486.2,1143160
2006-01-06,488.8,489.0,482.0,486.2,1370250

i have changed to case class CityData input param type to String , then it is causing "cannot resolve 'processingDate' given input columns: [Volume, Close, High, Date, Low, Open];" exception.

  1. How can i create custom object
  2. Another tricky here convert to Date object

How can i do ? please share your ideas.

1
  • After did some research , changed my dataset header "Date" to MyDate. and changed my case class "case class CityData(MyDate: Date, Open: String, High: String, Low: String, Volume: String)" . Now how can i change to original data type like date ,double and so on Commented Oct 9, 2018 at 12:23

1 Answer 1

1

In your case, if you do not set option header to true, Spark will read columns with String type. With option header, you can see;

val df = sqlContext.read.option("header", true).option("inferSchema", true).csv("pathToFile")
df.printSchema
//Prints
root
|-- Date: timestamp (nullable = true)
|-- Open: double (nullable = true)
|-- High: double (nullable = true)
|-- Low: double (nullable = true)
|-- Close: double (nullable = true)
|-- Volume: integer (nullable = true)

If you try to convert rows into CityData, you will get the following error;

java.lang.UnsupportedOperationException: No Encoder found for java.util.Date

This means, you cannot convert TimestampType directly into java.util.Date. Here is the type mappings;

  • TimestampType => java.sql.Timestamp
  • DateType => java.sql.Date

After changing type of processingDate from java.util.Date to java.sql.Timestamp, you will still get an error which says cannot resolve 'processingDate'. You also need to change name of the field processingDate to Date in CityData. then you can convert your data set into Dataset[CityData] by using df.as[CityData]. I hope it helps!

Sign up to request clarification or add additional context in comments.

4 Comments

How can i print list of value from open column using select.
Do you want to select Open column after converting rows into CityData?
Yes.. is it possible
Sure, since you have CityData as rows, you can use map function on this RDD to select only Open column. It is quite same with applying map operations on Scala list.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.