0

I am new to scala and trying to make custom schema from array of elements to read files based on a new custom schema.

I read the arrays from json file and used explode method and created a dataframe for each element in column array.

val otherPeople = sqlContext.read.option("multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
column_values.printSchema()

Output obtained is:

column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
root
 |-- column_id: string (nullable = true)
 |-- data_sensitivty: string (nullable = true)
 |-- datatype: string (nullable = true)
 |-- length: string (nullable = true)
 |-- name: string (nullable = true)

val column_name = column_values.select("name","datatype")

column_name: org.apache.spark.sql.DataFrame = [name: string, datatype: string]
column_name.show(4)


+-----------------+--------+
|             name|datatype|
+-----------------+--------+
|    object_number| varchar|
|    function_type| varchar|
|            hof_1| varchar|
|            hof_2| varchar|
|           region| varchar|
|          country| varchar|
+-----------------+--------+

Now for all the values listed above i wanted create a val schema dynamically.

example:

val schema = new StructType()
      .add("object_number",StringType,true)
      .add("function_type",StringType,true)
      .add("hof_1",StringType,true)
      .add("hof_2",StringType,true)
      .add("region",StringType,true)
      .add("Country",StringType,true)

i want to build above struct dynamically once i obtained column dataframe, i read that first i need to create a map of datatype for each element and then create a struct in loop. can some one help here as i have limited knowledge of scala.

1
  • Can you show otherPeople dataframe printschema ? Also post same data for this otherPeople Commented May 2, 2020 at 10:14

2 Answers 2

2

DataFrame with fields data can be collected, and for each row field is added to "StructType":

val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
  (schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
  )

def getFieldType(typeName: String): DataType = typeName match {
    case "varchar" => StringType
    // TODO include other types here
    case _ => StringType
  }
Sign up to request clarification or add additional context in comments.

1 Comment

hi pasha, can you help me in extending above solution with decimal type. Link of modified question is as below: link stackoverflow.com/questions/64013805/…
0

You can follow this approach, it could work fine for your example:

 //The schema is encoded in a string
  val schemaString = "object_number function_type hof_1 hof_2 region Country"
  //Generate the schema based on the string of schema
  val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
  val schema = StructType(fields)
  //Convert records of the RDD (myRdd) to Rows
  val rowRDD = sc.textFile("dir").map(line => line.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2), attributes(3),attributes(4),attributes(5)))
  //Apply the schema to the RDD
  val perDF = spark.createDataFrame(rowRDD, schema)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.