0

hi guys I'm working with kafka > spark streaming > Elasticsearch. but i dont make spark streaming JavaInputDStream JSON to elasticsearch.

My Code :

    SparkConf conf = new SparkConf()
            .setAppName("Streaming")
            .setMaster("local")
            .set("es.nodes","localhost:9200")
            .set("es.index.auto.create","true");
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf, new Duration(5000));
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "exastax");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("loglar");
    JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    JavaPairDStream<String, String> finisStream = stream.mapToPair(record -> new Tuple2<>("", record.value()));
    finisStream.print();
    JavaEsSparkStreaming.saveJsonToEs(finisStream,"spark/docs");
    streamingContext.start();
    streamingContext.awaitTermination();


}

JavaEsSparkStreaming.saveJsonToEs(finisStream,"spark/docs"); >> finisStream isn't working because it's not JavaDStream. How to convert JavaDStream ?

2 Answers 2

1

JavaEsSparkStreaming.saveJsonToEs works with JavaDStream

JavaEsSparkStreaming.saveToEsWithMeta works with JavaPairDStream

To fix your code:

JavaDStream<String> finisStream = stream.map(new Function<Tuple2<String, String>, String>() {
    public String call(Tuple2<String, String> stringStringTuple2) throws Exception {
        return stringStringTuple2._2();
    }
});

JavaEsSparkStreaming.saveJsonToEs(finisStream,"");
Sign up to request clarification or add additional context in comments.

Comments

1

thnx a lot for answer! but i solved this code :

 JavaDStream<String> stream1 = stream.map(
                new Function<ConsumerRecord<String, String>, String>() {
                    @Override
                    public String call(ConsumerRecord<String, String> r) {
                        return r.value();
                    }
                }
        );
           JavaEsSparkStreaming.saveJsonToEs(stream1,"spark/docs");

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.