I am experimenting with Apache Flink for a personal project and I have been struggling to make the resulting stream output to StdOut and to send it to a Kafka topic orders-output.
My goal is to calculate the sum of the price field per product in a tumbling time window of 3 minutes. Basically, the Apache flink job, receive orders as JSON formatted string from two Kafka source streams (orders-a and orders-b), it joins them together, get a tuple of the shape (product_name, product_price (double) , after that it group's it by product, apply the tumbling window of 3 minutes and computes the sum of the price per product in that window using a ReduceFunction. Here's the code :
FlinkKafkaConsumer<String> consumerA = new FlinkKafkaConsumer<>("orders-a", new SimpleStringSchema(), props);
FlinkKafkaConsumer<String> consumerB = new FlinkKafkaConsumer<>("orders-b", new SimpleStringSchema(), props);
DataStream<String> streamA = env.addSource(consumerA);
DataStream<String> streamB = env.addSource(consumerB);
DataStream<Tuple2<String,Double>> mergedOrders = streamA
.union(streamB)
.map(new MapFunction<String, Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> map(String s) throws Exception {
return DataHelper.getTuple(s);
}
});
DataStream<Tuple2<String, Double>> totals = mergedOrders
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Tuple2<String, Double>>() {
public Tuple2<String, Double> reduce(Tuple2<String, Double> v1, Tuple2<String, Double> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
DataStream<String> result = totals.map(new MapFunction<Tuple2<String, Double>, String>() {
@Override
public String map(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
LOG.info(stringDoubleTuple2.toString());
return stringDoubleTuple2.toString();
}
});
result.print();
result.addSink(new FlinkKafkaProducer<>("orders-output", new SimpleStringSchema(), props));
env.execute("Product revenues per 3m");
The code does what was previously described, DataHelper is just a custom helper class that I created to help me convert the received orders from JSON string to Tuple2 and other types. The job runs fine once started (I am running everything locally) and I can even see that the data is received in Flink UI (see the image below).
The issue is that I am not seeing any results in StdOut (both on terminal and in Flink UI) nor in Kafka output topic (I started a consumer of orders-output independently in another terminal and I am not receiving anything).
I would appreciate some help on this as I've been stuck on it for two days.