0

I have a Sequence of maps. Each map contains column names as keys and column values as values. So one map describes one row. I do not know how many entries will be there in a map. So I can't create a fixed length tuple in my code. I want to convert the sequence to dataframe. I tried the below code:

val mapRDD= sc.parallelize(Seq(
  Map("col1" -> "10", "col2" -> "Rohan", "col3" -> "201"),
  Map("col1" -> "13", "col2" -> "Ross", "col3" -> "201")
  ))

val columns=mapRDD.take(1).flatMap(a=>a.keys)

val resultantDF=mapRDD.map{value=> // Exception is thrown from this block
  value.values.toList 
}.toDF(columns:_*)

resultantDF.show()

But it gave the below exception:

org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType 
java.lang.ClassCastException: org.apache.spark.sql.types.ArrayType cannot be cast to org.apache.spark.sql.types.StructType
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:414)
at org.apache.spark.sql.SQLImplicits.rddToDataFrameHolder(SQLImplicits.scala:155)
...

I tried few other approaches, but nothing worked.

2
  • Which version of Spark are you using Commented Sep 4, 2018 at 11:40
  • @merenptah Spark 2.10:1.6.2 Commented Sep 4, 2018 at 11:50

1 Answer 1

2

You can try below approach.

  1. Extract column names and create below dataframe from given rdd

    val columns=mapRDD.take(1).flatMap(a=>a.keys).toSeq
    val df=mapRDD.map(_.values.toList).toDF
    
    //df look like below
    +----------------+
    |           value|
    +----------------+
    |[10, Rohan, 201]|
    | [13, Ross, 201]|
    +----------------+
    
  2. Now create your schema dynamically and user defined function like below

    //Required imports
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.Row
    import scala.collection.mutable.WrappedArray
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.types.StructField
    import org.apache.spark.sql.types.StructType
    
    //Creating udf the will return a Row of your schema
    def getRow(arr:WrappedArray[String]):Row=Row.fromSeq(arr.toSeq)
    
    //Creating schema
    val udfSchema=StructType(columns.map(x=>StructField(x,StringType,true)))
    
    //Registering udf along with schema
    val getRowUDF=udf(getRow _,udfSchema)
    
    //Now calling udf and generating one new column
    val df2=df.withColumn("temp",getRowUDF(df.col("value")))
    
    //df2 will look like
    +----------------+--------------+
    |           value|          temp|
    +----------------+--------------+
    |[10, Rohan, 201]|[10,Rohan,201]|
    | [13, Ross, 201]| [13,Ross,201]|
    +----------------+--------------+
    
  3. Now get your final dataframe from df2 using your column list

    val query=columns.map(x=>df2.col("temp."+x))
    df2.select(query:_*).show
    
    //output
    +----+-----+----+
    |col1| col2|col3|
    +----+-----+----+
    |  10|Rohan| 201|
    |  13| Ross| 201|
    +----+-----+----+
    
Sign up to request clarification or add additional context in comments.

2 Comments

This solution assumes that there are exactly 3 entries in each map. I can't assume that. I do not know, at compile time, how many entries will be present in each map.
@madhusudhan Please find the updated answer. It is long but your desired.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.