12

Spark Java application throws NotSerializableException on hadoop writables.

public final class myAPP {
  public static void main(String[] args) throws Exception {    
    if (args.length < 1) {
      System.err.println("Usage: myAPP <file>");
      System.exit(1);
    }
    SparkConf sparkConf = new SparkConf().setAppName("myAPP").setMaster("local");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    Configuration conf = new Configuration();
    JavaPairRDD<LongWritable,Text> lines = ctx.newAPIHadoopFile(args[0], TextInputFormat.class, LongWritable.class, Text.class, conf);
    System.out.println(    lines.collect().toString());
    ctx.stop();
  }

.

java.io.NotSerializableException: org.apache.hadoop.io.LongWritable
Serialization stack:
    - object not serializable (class: org.apache.hadoop.io.LongWritable, value: 15227295)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (15227295,))
    - element of array (index: 0)
    - array (class [Lscala.Tuple2;, size 1153163)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
15/04/26 16:05:05 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.LongWritable
Serialization stack:
    - object not serializable (class: org.apache.hadoop.io.LongWritable, value: 15227295)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (15227295,))
    - element of array (index: 0)
    - array (class [Lscala.Tuple2;, size 1153163); not retrying
15/04/26 16:05:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/04/26 16:05:05 INFO TaskSchedulerImpl: Cancelling stage 0
15/04/26 16:05:05 INFO DAGScheduler: Job 0 failed: collect at Parser2.java:60, took 0.460181 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.LongWritable

In Spark Scala program I register hadoop writables as below and it works fine.

sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable], classOf[org.apache.hadoop.io.Text]))

Apparently this approach doesn't work with Apache Spark API

sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", LongWritable.class.getName());

.

Exception in thread "main" org.apache.spark.SparkException: Failed to register classes with Kryo
    at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:101)
    at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:153)
    at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:115)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
    at org.apache.spark.rdd.NewHadoopRDD.<init>(NewHadoopRDD.scala:77)
    at org.apache.spark.SparkContext.newAPIHadoopFile(SparkContext.scala:848)
    at org.apache.spark.api.java.JavaSparkContext.newAPIHadoopFile(JavaSparkContext.scala:488)
    at com.nsn.PMParser.Parser2.main(Parser2.java:56)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.spark.serializer.KryoRegistrator
    at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$3.apply(KryoSerializer.scala:97)
    at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$3.apply(KryoSerializer.scala:97)
    at scala.Option.map(Option.scala:145)
    at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:97)
    ... 13 more

hadoop writables NotSerializableException with Apache Spark Java API?

2 Answers 2

8

As of Spark v1.4.0, you can use this Java API to register classes to be serialized using Kryo: https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkConf.html#registerKryoClasses(java.lang.Class[]) , by passing in an array of Class objects, each of which can be obtained using http://docs.oracle.com/javase/7/docs/api/java/lang/Class.html#forName(java.lang.String)

such as:

new SparkConf().registerKryoClasses(new Class<?>[]{
    Class.forName("org.apache.hadoop.io.LongWritable"),
    Class.forName("org.apache.hadoop.io.Text")
});

Hope this helps.

Sign up to request clarification or add additional context in comments.

1 Comment

If you don't mind an extra import, you can replace Class.forName("org.apache.hadoop.io.LongWritable") with LongWritable.class.
1

use

sparkConf.set("spark.kryo.classesToRegister", "org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text")

or you can simply use

ctx.textFile(args[0]);

to load RDD

2 Comments

It looks above code is for Spark Scala API but the issue is with Spark Java API...
@VijayInnamuri yes in java use sparkConf.set("spark.kryo.classesToRegister", "org.apache.hadoop.io.LongWritable,org.apache.hadoop.io.Text"); spark.kryo.registrator is used to define kryo registrator class which extends org.apache.spark.serializer.KryoRegistrator and overrides registerClasses method. in registerClasses method you define classes to register like kryo.register(LongWritable.class);

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.