0

I'm doing some operations between two DataFrames:

val resultRdd = df1.join(df2, df1("column")===df2("column") &&
    df1("column2").contains(df2("column")), "left_outer").rdd

resultRdd.map { t => ... }

But I get this error every time: edited*

Job aborted due to stage failure: Task 114 in stage 40.0 failed 4 times, most recent failure: Lost task 114.3 in stage 40.0 (TID 908, 10.10.10.51, executor 1): java.lang.NullPointerException
    at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Because of this error, I try to print

I'm researching and I read in others questions that it could be possible that the executors cannot access to the DataFrame instead of the driver. NullPointerException in Spark RDD map when submitted as a spark job

I've tried both coalesce() and collect() but doesn't work for me.

I don't know how to deal with this issue, any help?

I'm using Spark 2.1.0

edited2***************************

After debugging I think I found some clues:

I have my application running in a Server and if I execute once It works, but if I execute again It fails. If I restart the server and SparkContext is created as new It works again.

The log error is:

17/01/25 16:10:40 INFO TaskSetManager: Starting task 86.0 in stage 126.0 (TID 5249, localhost, executor driver, partition 86, ANY, 6678 bytes)
17/01/25 16:10:40 INFO TaskSetManager: Finished task 43.0 in stage 126.0 (TID 5248) in 6 ms on localhost (executor driver) (197/200)
17/01/25 16:10:40 INFO Executor: Running task 86.0 in stage 126.0 (TID 5249)
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 96 non-empty blocks out of 200 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 4 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 ERROR Executor: Exception in task 86.0 in stage 126.0 (TID 5249)
java.lang.NullPointerException
    at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$class.foreach(Iterator.scala:750)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
    at scala.collection.AbstractIterator.to(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
17/01/25 16:10:40 INFO TaskSetManager: Starting task 114.0 in stage 126.0 (TID 5250, localhost, executor driver, partition 114, ANY, 6678 bytes)
17/01/25 16:10:40 INFO Executor: Running task 114.0 in stage 126.0 (TID 5250)
17/01/25 16:10:40 WARN TaskSetManager: Lost task 86.0 in stage 126.0 (TID 5249, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$class.foreach(Iterator.scala:750)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
    at scala.collection.AbstractIterator.to(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 4 blocks
17/01/25 16:10:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/01/25 16:10:40 ERROR TaskSetManager: Task 86 in stage 126.0 failed 1 times; aborting job
17/01/25 16:10:40 INFO Executor: Finished task 114.0 in stage 126.0 (TID 5250). 3800 bytes result sent to driver
17/01/25 16:10:40 INFO TaskSetManager: Finished task 114.0 in stage 126.0 (TID 5250) in 15 ms on localhost (executor driver) (198/200)
17/01/25 16:10:40 INFO TaskSchedulerImpl: Removed TaskSet 126.0, whose tasks have all completed, from pool 
17/01/25 16:10:40 INFO TaskSchedulerImpl: Cancelling stage 126
17/01/25 16:10:40 INFO DAGScheduler: ResultStage 126 (collect at CategorizationSystem.scala:123) failed in 1.664 s due to Job aborted due to stage failure: Task 86 in stage 126.0 failed 1 times, most recent failure: Lost task 86.0 in stage 126.0 (TID 5249, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.unsafe.types.UTF8String.contains(UTF8String.java:284)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$2.hasNext(WholeStageCodegenExec.scala:396)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$class.foreach(Iterator.scala:750)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1202)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:295)
    at scala.collection.AbstractIterator.to(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:287)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1202)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:274)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1202)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
17/01/25 16:10:40 INFO DAGScheduler: Job 19 failed: collect at CategorizationSystem.scala:123, took 8.517397 s
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116937
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116938
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116939
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116940
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116941
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116942
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116943
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116944
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 116945
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 16
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117330
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117331
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117332
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117333
17/01/25 16:10:41 INFO BlockManagerInfo: Removed broadcast_36_piece0 on 192.168.80.136:35004 in memory (size: 20.6 KB, free: 613.8 MB)
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117334
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117335
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117336
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117337
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117338
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117339
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117340
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 17
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117341
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117342
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117343
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117344
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 117345
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 18
17/01/25 16:10:41 INFO BlockManagerInfo: Removed broadcast_39_piece0 on 192.168.80.136:35004 in memory (size: 23.0 KB, free: 613.8 MB)
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102054
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102055
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102056
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102057
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102058
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102059
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102060
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102061
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 11
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102062
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102063
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102064
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102065
17/01/25 16:10:41 INFO ContextCleaner: Cleaned accumulator 102066
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 12
17/01/25 16:10:41 INFO ContextCleaner: Cleaned shuffle 15

Any help?

7
  • Try df1.show, df2.show and resultRdd.show in order to get some more details about your case Commented Jan 20, 2017 at 12:52
  • NullPointerException will come when you do operation on null value. need complete stack trace & better code snippet to address where exactly you are getting NPE Commented Jan 20, 2017 at 18:03
  • 1
    The result of df1.show and df2.show does not have any null value as I recently checked. To print resultrdd I have to resultRdd.collect.foreach(println(_)) and I get the same error that I edit in the question as edited* Commented Jan 23, 2017 at 9:16
  • 1
    Could be data type error. Update the output of df1.printSchema() & df2.printSchema() Commented Jan 24, 2017 at 4:25
  • 1
    I print both schemas and I find that the df1 "column2" value was nullable true and and df2 "column2" not. It is still failing. Commented Jan 24, 2017 at 15:36

1 Answer 1

5

I solved the problem removing a df1.cache that I had in some part of the code. I don't know why this solve the problem but it works.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.