0

I have a Flink job running FlinkSQL with the following setup:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

env.setMaxParallelism(env.getParallelism() * 8);
env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval());

final TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetention(Duration.ofMinutes(60));

tConfig.getConfiguration().setString("table.exec.source.idle-timeout", "180000 ms");

To test this locally with a Kafka source, I fired a few events to the Flink job. The Flink UI shows it produced one watermark. I waited 3 minutes to see if watermarks advance without sending in new events (i.e idle partition). However, no watermark advancement occurred.

Note: I use a Kafka broker locally with three partitions. and my test data is keyed and hence gets sent to the same partition. However, I am not seeing watermarks advance even if other partitions are idle and if I wait 3 minutes.

  1. Any place in the JOB UI I could see if the value i set for 3 minutes is actually picked up? Am I using the right units(seconds vs ms)

  2. Anything else I could check to test this setting?

We are running Flink 1.12.1.

Update: I see this exception in my Flink SQL job under exceptions: Wonder if there is a mismatch of versions.

2021-10-26 16:38:14
java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52)
    at java.base/java.util.Optional.ifPresent(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37)
    at org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37)
    at org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774)
    at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)

1 Answer 1

2

The issue was that this setting does not work in Flink 1.12.0 or 1.12.1. I had to upgrade to Flink 1.13.2 and the setting was honored and worked as expected.

The exception was a red herring and not consistently reproducible.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.