0

I'm loading a CSV file using Spark's csv loader and transforming it into a specific Dataset by providing a case class schema and by using .as[T].

spark.read
  .option("header", "false")
  .option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS")
  .schema(schemaOf[T])
  .csv(filePath)
  .as[T]

My question here is, I have more than one system sending the same file and say if one system is sending a file containing less than the two columns from my defined schema then I would like to just put null for those two columns and load all the other columns.

And for all the other systems, load all the fields when sent conforming to the schema.

How do I do this in an efficient way? I dont want to create case class for each system.

3
  • You can.laod data ,if column null in schema define nullable Commented Mar 26, 2019 at 0:24
  • Do you meant to say input csv file itself will not have value for some column or it would have empty data for respetive column? Commented Mar 26, 2019 at 12:24
  • It wouldn't even have some columns like if the actual case class or schema is with 25 columns then it can come with 23 columns (22 comma). Commented Mar 26, 2019 at 13:09

1 Answer 1

2

You can process your csv data as a Dataframe first before converting to Dataset. This way, you can easily add/remove columns to match your case class with utility functions like:

implicit class DataFrameOps(df: DataFrame) {
  def withColumnIfNotExists(colName: String, col: Column): DataFrame = {
    if(df.columns.contains(colName)) df
    else df.withColumn(colName, col)
  }
}

// then use it like this
???.csv(filePath).withColumnIfNotExists("missing_col", lit(null).cast("string"))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.