4,017 questions
Best practices
0
votes
0
replies
22
views
Throw message on DLT/DLQ from GlobalKTable using Kafka Streams
I am reading data from a Kafka topic using Kafka Streams and more specifically GlobalKTable to populate my store. In the case of corrupt data that can not successfully be parsed I wish to throw a ...
Best practices
0
votes
6
replies
36
views
How to populate two stores from one topic using Kafka Streams GlobalKTable
I am using Kafka Streams, and trying to figure out the best practice for handling multiple stores that are populated using the same data without having to read it twice.
My Kafka Streams service is ...
0
votes
0
answers
73
views
Kafka Streams main consumer fetch rate stays low after GlobalKTable (RocksDB) restore
I’m running a Kafka Streams application (tested with versions 3.9.1 and 4.1.0) that uses a GlobalKTable backed by RocksDB. There are multiple instances of the application, each with 4 stream threads.
...
2
votes
1
answer
64
views
Unknown magic byte with spring cloud stream aggregation
I'm developing a Spring Cloud Stream app that accepts messages in Avro format. After setting up everything, I start my app, then get an Unknown magic byte error. I'm thinking this is because I'm using ...
0
votes
1
answer
32
views
Can the statestore be shared across two processors?
Here is a simple topology:
stream1
.merge(stream2)
.merge(stream3)
.to("topic1)
stream2 = builder.stream("topic1")
stream2.
process(() -> new Processor<>() { process() { Read ...
2
votes
1
answer
57
views
Kafka Streams application does not purge repartition topic records when manual commit
I'm currently developing a Kafka Streams application. Due to internal requirements, I've disabled auto-commits by setting commit.interval.ms to a very long duration (e.g. Long.MAX). Instead, I'm using ...
0
votes
1
answer
47
views
Wrong partitions are assigned to KStream threads after topic selectKey / repartition
I need to consume two topics A with 40 partitions and B with 10 partitions and keep some info in a shared persistent state store with social security number SSN of type string as its key and a custom ...
1
vote
0
answers
28
views
When migrating Kafka Streams to Spring Cloud Stream how to resolve InconsistentGroupProtocolException
The prior application is on kafka-streams (KStream) and has been rewritten to use Spring Cloud Stream. Both apps are on kafka-clients-3.1.2.
When starting the upgraded application it gives:
java.lang....
-2
votes
1
answer
35
views
Linear processing a kafka topic based on a constraint
I have a kafka topic producing product for multiple companies
{"company": "c1", "productIds": ["p1", "p2", "p3"]}
{"company": &...
0
votes
1
answer
38
views
Kafka Streams KTable Race Condition: Multiple Concurrent Updates See Same Stale State
I'm building a conference system using Kafka Streams where users can join/leave rooms.
I'm experiencing a race condition where multiple concurrent leave requests see the same stale room state, causing ...
0
votes
1
answer
51
views
Does Kafka Streams StreamTask process records from multiple co-partitioned topics sequentially or in parallel?
I have read the explanation written here that one StreamTask is handeling all messages from co-partitioned topics: How do co-partitioning ensure that partition from 2 different topics end up assigned ...
3
votes
0
answers
77
views
Kafka Stream .checkpoint not created
I’m comparing how GlobalKTable Kafka Streams handles checkpointing in version 2.5.0 versus 3.7.0, and I’ve identified a regression: in 3.7.0 the .checkpoint file isn’t created the first time you drain ...
0
votes
1
answer
45
views
How can I emit the delete records to the output topic, and make that key tombstoned and removed from KTable internal state?
I'm working on a Change Data Capture (CDC) application using Kafka Streams, and I have the following setup:
A KStream is converted to a KTable (let's call it kt1).
I perform a left join between kt1 ...
0
votes
0
answers
108
views
Kafka Streams State store behaving differently if stream is converted beforehand
I am expiriencing a very weird behaviour in my kafka streams application.
Setup is the following:
I create the state store "user-store" manually and connect it to a processor "filter-...
0
votes
0
answers
23
views
Kafka Consumer unable to read messages because of partitions
I have written sample Kafka Producer application without mentioning partition which sends message to Consumer application.
My Consumer application is written using Kafka Streams as it is using state ...
1
vote
2
answers
61
views
Kafka stream in memory heap memory [closed]
I'm having a topic with a size of 3gb of raw data in 60 Million records. But when I consume those records and I allocate in memory (Not using RocksDB) the needed heap memory is around 10GB. I cannot ...
4
votes
0
answers
168
views
decrease response time in Kafka streams
I have a project with kafka streams to create one minute candle on price for stock. My topology code is :
List<String> inputTopics = new ArrayList<>();
inputTopics.add(tradeTopic);
...
0
votes
1
answer
136
views
Data Loss in Kafka Stream Application
We have a KStream application (Kafka 3.6) which consumes from 3 topics and do some repartitioning, selectKey and reduce operation to create KTable and we use these 3 KTables to do LeftJoin and then ...
0
votes
1
answer
35
views
How does Kafka Streams' KTable.groupBy + aggregate guarantee correct order of the re-partitioned messages?
If I have a kafka input topic with multiple partitions and then in Kafka Streams I use kStream.map to change the key of each record and write that to an output topic, I will face the problem, that ...
3
votes
1
answer
36
views
KGroupedStream reduce use key in Reducer logic
I am using Kafka Streams to group and reduce a kafka topic.
I want to generate an output for a key, if the key and the value are equal for all values against a given key; otherwise don't output ...
0
votes
1
answer
29
views
How do I access the actual messages comprising a kstream sliding window aggregation?
I have a stream of <K,V> messages. When emitting any satisfied sliding windows, I want to know the list that the window matched.
I want to avoid taking on the accumulation job myself within my ....
0
votes
1
answer
30
views
instanceof not working in KafkaStreams filter
In the code below:
var processed = builder.stream(topics.getRateLimitProcessorTopic(), Consumed.with(SerdesFactory.bucketKeySerdes(), SerdesFactory.bucketOperationSerdes())) .process(() -> ...
1
vote
1
answer
121
views
Kafka stream message processing semantics
I am designing a Kafka stream app and want to know few details in order to design my failover strategy. I tried reading Kafka stream doc and existing stackoverflow posts, but was unable to find the ...
0
votes
0
answers
190
views
Using Kafka Streams with MSK Express brokers - internal topic creation
I want to migrate my Kafka infrastructure to AWS MSK, and I've noticed there is new broker type called Express Brokers.
I really want to use it since it is supposed to be faster, supports "hands-...
2
votes
1
answer
125
views
Kafka Streams default value for session.timeout.ms?
We are using Kafka Streams and Karpenter with normal Deployment in order to manage the pods for a service that we have.
After Karpenter decides to kill the pod, it brings a new Pod up, and we are ...
0
votes
1
answer
37
views
Does stream time in a windowed table advance per key or globally?
I have a tricky case where a lot of historical JSON data was written to Kafka with a custom partitioning strategy.
We are currently developing a system to make use of this historical data and perform ...
0
votes
1
answer
32
views
default max.compaction.lag.ms for changelog
I use below:
windowedBy(TimeWindows.of(Duration.ofHours(6)))
.aggregate(aggregator, aggregator,
Materialized.as("my-agg"))
Changelog was created with below configs
cleanup....
0
votes
0
answers
71
views
Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. Aindrail
The error is
**
Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
2025-02-21T16:09:37.761+05:30 ERROR 38852 --- [Ecommerce] [ ...
0
votes
1
answer
54
views
Kafka Streams KTable-KTable foreign key join emits null even if right side is empty
What is the semantics for a Kafka Streams (3.7.1) KTable-KTable foreign key join, where the extracted foreign key has never matched against the primary key in the right-side ktable?
In this example ...
1
vote
1
answer
253
views
Kafka Streams partition assignment problem with multiple topics and scaling
I'm working on a kafka streams application that consumes from a consumer group with three topics. One topic has 20 partitions, another 10, and the last one 5. So in total, this consumer group has 35 ...
0
votes
1
answer
29
views
In Helidon SE 4.1.6 , how to send data to a specific partition using kafka producer
I want to use Helidon SE 4.1.6 and producer the data to a specific partition of Apache Kafka using producer.
Detail :
I have gone through the https://helidon.io/docs/latest/se/reactive-messaging#...
0
votes
0
answers
93
views
Analyzing Kafka Streams For Anomaly Detection
I am new to Kafka streams and have come across a use case where there is a need to detect anomalies in an event stream.
Here is the high level use case:
Events
There is a stream of incoming events in ...
1
vote
1
answer
59
views
Is there any side effect of setting repartition.purge.interval.ms config with too high value in order to prevent purge?
We are using Kafka Streams in our application to process events.
In order to join operation we use both repartition and selectKey methods of Kafka Streams and this cause to create an internal ...
0
votes
0
answers
78
views
Kafka Streams Consumer Constantly Rebalance over 100k tps
We have a kafka streams service that performs a left join (KStreams) operation by their message key. The message size is 1 KB more or less.
The left topic has around two hundred thousand (200,000) ...
1
vote
0
answers
84
views
Error retrieving state store during graceful shutdown
Context and analysis
Our Spring Boot application relies on information stored in Kafka to answer REST requests. It does so by retrieving information from a global state store via the ...
2
votes
1
answer
96
views
How do extract messages from a specific producer in Kafka
I have two Kafka queues(queue1 & queue2), and multiple producers(clients) are posting messages to both queues. Each client sends different types of messages to the queues (e.g., client1 send "...
2
votes
1
answer
44
views
Kafka streams partition assigner not using all available clients
We have a Java application which uses Streams library to process data. The streams application does not join data from multiple topics and processes each message received independently. The ...
1
vote
1
answer
53
views
Kafka Stream KTable Foreign Key Left join outputs more tombstones than expected
After migrating to the latest streams version 3.9.0 from 3.5.0, I notice a behaviour in the left foreign key join, that I am not able to understand.
For a left foreign key join :
KTable<String, ...
0
votes
0
answers
22
views
SessionWindows in Kafka
I'm reading a Confluent blog about Windowing in Kafka Strams:
https://www.confluent.io/blog/windowing-in-kafka-streams/
and I found this under the Session Windows:
If your inactivity window is too ...
2
votes
2
answers
111
views
Inconsistent Results with KStream-KTable Join Under Load
I'm developing a Quarkus microservice that utilizes Kafka Streams to process messages from multiple topics.
Specifically, I'm attempting to join a KStream and a KTable derived from two of these topics....
1
vote
1
answer
75
views
Seeing duplicate records created in a streams app's output
I have a Kafka Streams app that takes input from a Kafka Topic, aggregates it on three fields in the original's value in 5 minute windows. On the output side of this, I need to translate the ...
4
votes
0
answers
172
views
Java virtual threads and kafka streams
We have built real time data processing pipelines on kafka topics in the past with kafka streams technology. But we were always limited by the number of partitions on the kafka topic for concurrency ...
0
votes
0
answers
65
views
kafka-streams does not add the right kafka-clients dependency
org.apache.kafka:kafka-streams:jar:3.9.0 is suposed to use org.apache.kafka:kafka-clients:jar:3.9.0, but when I run mvn dependency:tree, I get
[INFO] +- org.apache.kafka:kafka-streams:jar:3.9.0:...
0
votes
1
answer
77
views
Kafka Streams state store vs MongoDB for state management
I am working on a distributed system using Kafka Streams for communication between components.
One of the components, (for simplicity BRAIN), manages a sequence of messages to other components (A, B, ...
1
vote
1
answer
191
views
Multiple topologies in Spring Kafka streams
What's the easiest or best way to create multiple topologies in a Spring Boot Kafka streams application?
Is it possible to use the same default StreamBuilder bean? Or if I need to create a new ...
0
votes
1
answer
49
views
Kafka Streams Window Size (15 mins) is greater than the max poll interval (5 min)
I have a Kstreams application where I am reading from an input topic, performing aggregation in a window of 15 min , suppressing and then performing some operation on each record, following is the ...
0
votes
0
answers
71
views
TraceId not printing in kafka-stream application
I have kafka stream application which is running in springboot-2.x, kafka-streams-2.5.1 and spring-cloud-sleuth (log tracing) and I’m using KafkaStreamsTracing for print traceId and spanId. Which is ...
3
votes
1
answer
42
views
Kafka Streams: NPE in ProcessorRecordContext and Suppress issues with processValues()
I'm experiencing two issues with Kafka Streams' processValues() and suppress operations:
Getting NPE when using processValues():
@Bean
public Function<KStream<String, String>, KStream<...
0
votes
1
answer
31
views
App producing 2 messages for only 1 input
I'm trying to debug a problem in our production Kafka Streams app. The (simplified) topology looks something like this
builder.stream("input").groupByKey().reduce(
(agg, val) -> "...
0
votes
1
answer
65
views
Kafka Streams send custom headers with Transformer when multiple messages output
I want to send multiple messages downstream using Transformer (kafka streams dsl)
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this....