1

I am using Apache Flink trying to get JSON records from Kafka to InfluxDB, splitting them from one JSON record into multiple InfluxDB points in the process.

I found the flatMap transform and it feels like it fits the purpose. Core code looks like this:

DataStream<InfluxDBPoint> dataStream = stream.flatMap(new FlatMapFunction<JsonConsumerRecord, InfluxDBPoint>() {
    @Override
    public void flatMap(JsonConsumerRecord record, Collector<InfluxDBPoint> out) throws Exception {
        Iterator<Entry<String, JsonNode>> iterator = //...

        while (iterator.hasNext()) {
            // extract point from input
            InfluxDBPoint point = //...

            out.collect(point);
        }
    }
});

For some reason, I only get one of those collected points streamed into the database.

Even when I print out all mapped entries, it seems to work just fine: dataStream.print() yields:

org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@144fd091
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@57256d1
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@28c38504
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@2d3a66b3

Am I misunderstanding flatMap or might there be some bug in the Influx connector?

2
  • 1
    If you see all the results that expect when you print the stream it might be a problem in the InfluxSink. But from what you show here it seems like you are using flatmap in the right way. Commented Jan 10, 2020 at 9:50
  • 1
    @TobiSH yeah, I just created a subclass of InfluxSink and could verify that all the points are arriving in the invoke method. So definitely seems like a sink problem at this point. Commented Jan 10, 2020 at 10:07

1 Answer 1

1

The problem was actually related to the fact that a series (defined by its tagset and measurement as seen here) in Influx can only have one point per time, therefore even though my fields differed, the final point overwrote all previous points with the same time value.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.