1

I have data like this:

One Kafka Message:

[{"source": "858256_6052+571", "numericValue": null, "created": 1725969039288, "textValue": "mytestData"}]

Another Kafka Message:

[{"source": "858256_6052+571", "numericValue": null, "created": 1725969039288, "textValue": "mytestData2"}, {"source": "858256_6052+571", "numericValue": null, "created": 1725969039288, "textValue": "mytestData3"}]

I want ksql to create a stream of three elements for this. How can i do this? I already tried it with explode but there i need a key that i do not have in my data.

My current query attempt:

CREATE STREAM ml_stream_raw (
    data ARRAY<STRUCT<source VARCHAR, numericValue DOUBLE, created BIGINT, textValue VARCHAR>>
) WITH (
    KAFKA_TOPIC='testml1withlist',
    VALUE_FORMAT='JSON'
);

CREATE STREAM ml_stream_raw_exploded AS
    SELECT EXPLODE(data) AS message
    FROM ml_stream_raw;

CREATE STREAM ml_stream_raw_processed AS
    SELECT 
        message->source AS source,
        message->numericValue AS numericValue,
        message->created AS created,
        message->textValue AS textValue
    FROM  ml_stream_raw_exploded;

CREATE TABLE ml_table_messages_per_second_list AS
    SELECT 'all_messages' AS message_group,
           WINDOWSTART AS window_start,
           COUNT(*) AS message_count
    FROM ml_stream_raw_processed
    WINDOW TUMBLING (SIZE 1 SECOND)
    GROUP BY 'all_messages';

SELECT TIMESTAMPTOSTRING(window_start, 'dd.MM.yyyy HH:mm:ss') AS window_start_formatted,
       message_count
FROM ml_table_messages_per_second_list
EMIT CHANGES;
2
  • 2
    Can you also show us your current query attempt? Commented Sep 18, 2024 at 14:48
  • 1
    @jarlh oh sorry, i added it :) Commented Sep 19, 2024 at 3:34

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.