0

I am experimenting with Apache Flink for a personal project and I have been struggling to make the resulting stream output to StdOut and to send it to a Kafka topic orders-output.

My goal is to calculate the sum of the price field per product in a tumbling time window of 3 minutes. Basically, the Apache flink job, receive orders as JSON formatted string from two Kafka source streams (orders-a and orders-b), it joins them together, get a tuple of the shape (product_name, product_price (double) , after that it group's it by product, apply the tumbling window of 3 minutes and computes the sum of the price per product in that window using a ReduceFunction. Here's the code :


    FlinkKafkaConsumer<String> consumerA = new FlinkKafkaConsumer<>("orders-a", new SimpleStringSchema(), props);
    FlinkKafkaConsumer<String> consumerB = new FlinkKafkaConsumer<>("orders-b", new SimpleStringSchema(), props);

    DataStream<String> streamA = env.addSource(consumerA);
    DataStream<String> streamB = env.addSource(consumerB);

    DataStream<Tuple2<String,Double>> mergedOrders = streamA
            .union(streamB)
            .map(new MapFunction<String, Tuple2<String, Double>>() {
                @Override
                public Tuple2<String, Double> map(String s) throws Exception {
                    return DataHelper.getTuple(s);
                }
            });

    DataStream<Tuple2<String, Double>> totals = mergedOrders
            .keyBy(value -> value.f0)
            .window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .reduce(new ReduceFunction<Tuple2<String, Double>>() {
                public Tuple2<String, Double> reduce(Tuple2<String, Double> v1, Tuple2<String, Double> v2) {
                    return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
                }
            });
    DataStream<String> result = totals.map(new MapFunction<Tuple2<String, Double>, String>() {
        @Override
        public String map(Tuple2<String, Double> stringDoubleTuple2) throws Exception {
            LOG.info(stringDoubleTuple2.toString());
            return stringDoubleTuple2.toString();
        }
    });

    result.print();
    result.addSink(new FlinkKafkaProducer<>("orders-output", new SimpleStringSchema(), props));

    env.execute("Product revenues per 3m");

The code does what was previously described, DataHelper is just a custom helper class that I created to help me convert the received orders from JSON string to Tuple2 and other types. The job runs fine once started (I am running everything locally) and I can even see that the data is received in Flink UI (see the image below).

Flink UI Dashboard The issue is that I am not seeing any results in StdOut (both on terminal and in Flink UI) nor in Kafka output topic (I started a consumer of orders-output independently in another terminal and I am not receiving anything).

I would appreciate some help on this as I've been stuck on it for two days.

1
  • Have you tried adding a log4j.properties to your code, so that logs will get created? Commented Apr 30, 2023 at 14:02

2 Answers 2

1

I won't probably answer your question, but I might help you find what is wrong.

First of all, FlinkKafkaProducer and FlinkKafkaConsumer are deprecated, use KafkaSink + KafkaSource instead. Second, I do not see specified time strategy used (event or processing) but maybe that does not have to be stated explicitly (not sure I only use event time).

To the problem: You can clearly see, that data are coming into your last operator, which does windowing, mapping and sinking. If you want to identify which function of those is problematic, you can do custom chaining and map each function to a standalone operator (see https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups).

I can see, that you added log into last map function before sinking but you can't see anything being logged. If your logging configuration is right, then you know, that data are being stuck in windowing function. The only reason that comes to mind why this would be a problem is because Flink's time doesn't advance, so it doesn't close any windows, so the data isn't processed. You can add more functions into your pipeline, just for debugging reasons, so you can log actual time in your pipeline and see where the data are (or use those custom chains).

You could add function right after keyby function before window and after window you are already logging elements. This way you could identify, where exactly is point where data arrive but does not proceed further. Than log flink's time. You can try to create some timers and override onTimer method to see if the time move forward.

Last thing to add - you can check operator's metrics. It is accessible through web UI. Check numRecordsOut (or something like that) for last operator to see if it works. Btw BytesSent will be zero for sinking operator, because it is not "sending" data to next operator, but sinking them

Sign up to request clarification or add additional context in comments.

1 Comment

Thank you for this helpful answer, it was indeed related to two problems : my records did not have any timestamp fields, and my logging configuration was wrong
1

You're using an event time window, but I don't see where you're setting the watermark strategy (which also assigns timestamps to your records). If your incoming records don't have any timestamps, then you'd want to use a processing time window.

Separately, as Dominik21 noted, you should be using the newer KafkaSource and KafkaSink.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.