1

I am running a program which uses Apache Spark to get get data from Apache Kafka cluster and puts the data in a Hadoop file. My program is below:

public final class SparkKafkaConsumer {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        String[] topics = "Topic1, Topic2, Topic3".split(",");
        for (String topic: topics) {
            topicMap.put(topic, 3);
        }
        JavaPairReceiverInputDStream<String, String> messages =
                KafkaUtils.createStream(jssc, "kafka.test.com:2181", "NameConsumer", topicMap);
        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String x) {
                return Lists.newArrayList(",".split(x));
            }
        });
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                    public Tuple2<String, Integer> call(String s) {
                        return new Tuple2<String, Integer>(s, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                    public Integer call(Integer i1, Integer i2) {
                        return i1 + i2;
                    }
                });
        wordCounts.print();
        wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt");
        jssc.start();
        jssc.awaitTermination();
    }
}

I am using the this command to submit the application: C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.2 --class "SparkKafkaConsumer" --master local[4] target\simple-project-1.0.jar

I am getting this error: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:2148)

What is causing this error and how do I solve it?

6
  • This looks like an issue in Spark stackoverflow.com/questions/29007085/… .. Commented Jul 21, 2016 at 12:11
  • Could you try saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt", Text.class, IntWritable.class, TextOutputFormat.class) instead ? Commented Jul 21, 2016 at 12:20
  • @Hawknight What is the full package of Text.class and TextOutputFormat.class? Commented Jul 21, 2016 at 12:40
  • Text comes from org.apache.hadoop.io and TextOutputFormat comes from org.apache.hadoop.mapred Commented Jul 21, 2016 at 12:51
  • @Hawknight That solved it. Thanks! You should add this as an answer. Commented Jul 21, 2016 at 13:12

2 Answers 2

4

I agree that the error is not really evocative, but it is usually better to specify the format of the data you want to output in any of the saveAsHadoopFile methods to protect yourself from this type of exception.

Here's the prototype of your particular method in the documentation :

saveAsHadoopFiles(java.lang.String prefix, java.lang.String suffix, java.lang.Class<?> keyClass, java.lang.Class<?> valueClass, java.lang.Class<F> outputFormatClass)

In your example, that would correspond to :

wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt", Text.class, IntWritable.class, TextOutputFormat.class)

Based on the format of your wordCounts PairDStream, I chose Text as the key is of type String, and IntWritable as the value associated to the key is of type Integer.

Use TextOutputFormat if you just want basic plain text files, but you can look into the subclasses of FileOutputFormat for more output options.

As this was also asked, the Text class comes from the org.apache.hadoop.io package and the TextOutputFormat comes from the org.apache.hadoop.mapred package.

Sign up to request clarification or add additional context in comments.

Comments

2

Just for completeness (@Jonathan gave the right answer )

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;

...
wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt", Text.class, IntWritable.class, TextOutputFormat.class)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.