I have data like this:
One Kafka Message:
[{"source": "858256_6052+571", "numericValue": null, "created": 1725969039288, "textValue": "mytestData"}]
Another Kafka Message:
[{"source": "858256_6052+571", "numericValue": null, "created": 1725969039288, "textValue": "mytestData2"}, {"source": "858256_6052+571", "numericValue": null, "created": 1725969039288, "textValue": "mytestData3"}]
I want ksql to create a stream of three elements for this. How can i do this? I already tried it with explode but there i need a key that i do not have in my data.
My current query attempt:
CREATE STREAM ml_stream_raw (
data ARRAY<STRUCT<source VARCHAR, numericValue DOUBLE, created BIGINT, textValue VARCHAR>>
) WITH (
KAFKA_TOPIC='testml1withlist',
VALUE_FORMAT='JSON'
);
CREATE STREAM ml_stream_raw_exploded AS
SELECT EXPLODE(data) AS message
FROM ml_stream_raw;
CREATE STREAM ml_stream_raw_processed AS
SELECT
message->source AS source,
message->numericValue AS numericValue,
message->created AS created,
message->textValue AS textValue
FROM ml_stream_raw_exploded;
CREATE TABLE ml_table_messages_per_second_list AS
SELECT 'all_messages' AS message_group,
WINDOWSTART AS window_start,
COUNT(*) AS message_count
FROM ml_stream_raw_processed
WINDOW TUMBLING (SIZE 1 SECOND)
GROUP BY 'all_messages';
SELECT TIMESTAMPTOSTRING(window_start, 'dd.MM.yyyy HH:mm:ss') AS window_start_formatted,
message_count
FROM ml_table_messages_per_second_list
EMIT CHANGES;