4

I just tried to make dataframe with spark. I just tried to made codes as like below.

First, I imported as like below

import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.spark.sql.Row
import scala.collection.mutable.ListBuffer
import org.apache.spark.util._
import org.apache.spark.sql.types.IntegerType`

and then, I tried to make Row and Schema for dataframe like below.

val Employee = Seq(Row("Kim","Seoul","1000000"),Row("Lee","Busan","2000000"),Row("Park","Jeju","3000000"),Row("Jeong","Daejon","3400000"))

val EmployeeSchema = List(StructField("Name", StringType, true), StructField("City", StringType, true), StructField("Salary", IntegerType, true))

val EmpDF = spark.createDataFrame(spark.sparkContext.parallelize(Employee),StructType(EmployeeSchema))

Finally, I tried to see is the dataframe is alright with using

EmpDF.show

and I got the errors like below

    ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 3)
    java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: 
    java.lang.String is not a valid external type for schema of int
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, Name), StringType), true, false) AS Name#0
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, City), StringType), true, false) AS City#1
    if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, Salary), IntegerType) AS Salary#2
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_1$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289)
        ... 25 more
20/07/12 16:32:51 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, Name), StringType), true, false) AS Name#0
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, City), StringType), true, false) AS City#1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, Salary), IntegerType) AS Salary#2
        at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:292)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:594)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)  

How can I fix this problem?

I have already tried to import as like below

    import org.apache.spark.serializer.KryoSerializer
    import org.apache.spark.serializer.Serializer

But now it shows the error with

ERROR Executor: Exception in task 2.0 in stage 5.0 (TID 13)

2 Answers 2

6

Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of int is due to the Type mismatch between defined schema and the actual data "Jeong","Daejon","3400000" -> (string,string,string). but you specified in the schema as (String,String,Integer).

updated code 1 in specific to integer type:

import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.spark.sql.Row
import scala.collection.mutable.ListBuffer
import org.apache.spark.util._
import org.apache.spark.sql.types._

val Employee = Seq(Row("Kim","Seoul",1000000),Row("Lee","Busan",2000000),Row("Park","Jeju",3000000),Row("Jeong","Daejon",3400000))

val EmployeeSchema = List(StructField("Name", StringType, true), StructField("City", StringType, true), StructField("Salary", IntegerType, true))

val EmpDF = spark.createDataFrame(spark.sparkContext.parallelize(Employee),StructType(EmployeeSchema))
EmpDF.show()
/*+-----+------+-------+
| Name|  City| Salary|
+-----+------+-------+
|  Kim| Seoul|1000000|
|  Lee| Busan|2000000|
| Park|  Jeju|3000000|
|Jeong|Daejon|3400000|
+-----+------+-------+*/

updated code in specific to string type:

import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.spark.sql.Row
import scala.collection.mutable.ListBuffer
import org.apache.spark.util._
import org.apache.spark.sql.types._

val Employee = Seq(Row("Kim","Seoul","1000000"),Row("Lee","Busan","2000000"),Row("Park","Jeju","3000000"),Row("Jeong","Daejon","3400000"))

val EmployeeSchema = List(StructField("Name", StringType, true), StructField("City", StringType, true), StructField("Salary", StringType, true))

val EmpDF = spark.createDataFrame(spark.sparkContext.parallelize(Employee),StructType(EmployeeSchema))
EmpDF.show()
/*+-----+------+-------+
| Name|  City| Salary|
+-----+------+-------+
|  Kim| Seoul|1000000|
|  Lee| Busan|2000000|
| Park|  Jeju|3000000|
|Jeong|Daejon|3400000|
+-----+------+-------+*/
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you so much! problem is solved! I guess I try to study more about the schema and string. Thanks)
0
val spark = SparkSession.builder().master("local[*]").getOrCreate()
import spark.implicits._
Seq(("Kim","Seoul",1000000),("Lee","Busan",2000000),("Park","Jeju",3000000),("Jeong","Daejon",3400000))
        .toDF("name","city", "salary")
        .show()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.