2

I am executing following code to create a data frame from a text file .

    import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StructType, StringType, StructField}


/**
  * Created by PSwain on 6/19/2016.
  */
object RddToDataframe extends App {

  val scnf=new SparkConf().setAppName("RddToDataFrame").setMaster("local[1]")
  val sc = new SparkContext(scnf)
  val sqlContext = new SQLContext(sc)

  val employeeRdd=sc.textFile("C:\\Users\\pswain\\IdeaProjects\\test1\\src\\main\\resources\\employee")

  //Creating schema

  val employeeSchemaString="id name age"
  val schema = StructType(employeeSchemaString.split(",").map( colNmae => StructField(colNmae,StringType,true)))

  //Creating  RowRdd
  val rowRdd= employeeRdd.map(row => row.split(",")).map(row => Row(row(0).trim.toInt,row(1),row(2).trim.toInt))

  //Creating dataframe = RDD[rowRdd] + schema
  val employeeDF=sqlContext.createDataFrame(rowRdd,schema). registerTempTable("Employee")

  sqlContext.sql("select * from Employee").show()


}

But while executing in InteliJ I am finding type mismatch error as below . Not able to identify why this error is comming I am just converting string to integer . Employee file has below input , they are showing all in one line , but they are one line each .

1201, satish, 25 1202, krishna, 28 1203, amith, 39 1204, javed, 23 1205, prudvi, 23

16/06/19 15:18:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
scala.MatchError: 1201 (of class java.lang.Integer)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
1
  • Why are you splitting employeeSchemaString.split(",") with , if the string is separated with spaces "id name age"? Commented Jun 19, 2016 at 10:20

1 Answer 1

6

The schema is created with all the column types are defined as StringType.

val schema = StructType(employeeSchemaString.split(",").map( colNmae => StructField(colNmae,StringType,true)))

But the rowRDD has columns of type int, string and int.

Here is the working code

val structType= {
    val id = StructField("id", IntegerType)
    val name = StructField("name", StringType)
    val age = StructField("age", IntegerType)
    new StructType(Array(id, name , age))
}

val rowRdd= employeeRdd.map(row => row.split(",")).map(row => Row(row(0).trim().toInt,row(1),row(2).trim().toInt))

sqlContext.createDataFrame(rowRdd,structType). registerTempTable("Employee")

sqlContext.sql("select * from Employee").show()
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.