0

How do we use query configurations while using SQL client in Flink SQL?

The same fashion as mentioned in the link below for https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html

Want to use Idle state retention time.

1 Answer 1

1

Flink's SQL client can read a YAML configuration file on startup, and this file can include idle state retention specifications. For example, you might start the client via

sql-client.sh embedded -e sql-client-config.yaml

where the config file contains

execution:
  planner: old                      # optional: either 'old' (default) or 'blink'
  type: streaming                   # required: execution mode either 'batch' or 'streaming'
  result-mode: table                # required: either 'table' or 'changelog'
  max-table-result-rows: 1000000    # optional: maximum number of maintained rows in
                                    #   'table' mode (1000000 by default, smaller 1 means unlimited)
  time-characteristic: event-time   # optional: 'processing-time' or 'event-time' (default)
  parallelism: 1                    # optional: Flink's parallelism (1 by default)
  periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
  min-idle-state-retention: 0       # optional: table program's minimum idle state time
  max-idle-state-retention: 0       # optional: table program's maximum idle state time

See the docs for more details.

Sign up to request clarification or add additional context in comments.

6 Comments

Thanks David for replying back quick. Does this min-idle-state-retention, max-idle-state-retention holds appropriate for query like SELECT sourceKafka.* FROM sourceKafka INNER JOIN badips ON sourceKafka.source.ip=badips.ip WHERE sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' MINUTE AND CURRENT_TIMESTAMP; Since there is no grouping and I read that these parameters define for how long the state of a key is retained
For a join like that, each stream will be keyed by the ip, so yes, this will be keyed state that is subject to the state retention policy. But moreover, you don't need to bother with this, because with a time windowed join, Flink SQL will automatically clear older records that have become irrelevant.
Time windowed join, Flink SQL will automatically clear older records that have become irrelevant => That is what I also thought so... but when i tried the above query, 60+GB heap space runs out. Don't know why
That's potentially a rather complex topic, not well suited for stack overflow. Suggest you ask for help on the user mailing list.
Sure David. Thanks
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.