-1

I have created a schema with following code

val schema=  new StructType().add("city", StringType, true).add("female", IntegerType, true).add("male", IntegerType, true)

Created a RDD from

val data = spark.sparkContext.textFile("cities.txt")

Converted to RDD of Row to apply schema

    val cities = data.map(line => line.split(";")).map(row => Row.fromSeq(row.zip(schema.toSeq)))


 val citiesRDD = spark.sqlContext.createDataFrame(cities, schema)

This gives me an error

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.Tuple2 is not a valid external type for schema of string

1 Answer 1

2

You don't need a schema to create a Row, you need the schema when you create the DataFrame. You also need to introduce some logic how to convert your splitted line (which produces 3 strings) into integers:

here a minimal solution without exception-handling:

val data = sc.parallelize(Seq("Bern;10;12")) // mock for real data

val schema = new StructType().add("city", StringType, true).add("female", IntegerType, true).add("male", IntegerType, true)

val cities = data.map(line => {
val Array(city,female,male) = line.split(";")
  Row(
    city,
    female.toInt,
    male.toInt
  )
 }
)

val citiesDF = sqlContext.createDataFrame(cities, schema)

I normally use case-classes to create a dataframe, because spark can infer the schema from the case class:

// "schema" for dataframe, define outside of main method
case class MyRow(city:Option[String],female:Option[Int],male:Option[Int]) 

val data = sc.parallelize(Seq("Bern;10;12")) // mock for real data

import sqlContext.implicits._

val citiesDF = data.map(line => {
val Array(city,female,male) = line.split(";")
  MyRow(
    Some(city),
    Some(female.toInt),
    Some(male.toInt)
  )
}
).toDF()
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for the solution. What if the number of columns are more then 300? What can be the best way?
@ShankarKoirala Not sure about that, I just know there are certain limitiations in scala, e.g. prior to scala 2.11, case classes were limited to 22 fields (same with tuples). The first approach should still work, but maybe it's worthwile to think about using more complex column types (structs, arrays, maps) to reduce the number of columns...

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.