Skip to main content
We’ve updated our Terms of Service. A new AI Addendum clarifies how Stack Overflow utilizes AI interactions.
Filter by
Sorted by
Tagged with
Best practices
0 votes
0 replies
22 views

Throw message on DLT/DLQ from GlobalKTable using Kafka Streams

I am reading data from a Kafka topic using Kafka Streams and more specifically GlobalKTable to populate my store. In the case of corrupt data that can not successfully be parsed I wish to throw a ...
Andreas10001's user avatar
Best practices
0 votes
6 replies
36 views

How to populate two stores from one topic using Kafka Streams GlobalKTable

I am using Kafka Streams, and trying to figure out the best practice for handling multiple stores that are populated using the same data without having to read it twice. My Kafka Streams service is ...
Andreas10001's user avatar
0 votes
0 answers
73 views

Kafka Streams main consumer fetch rate stays low after GlobalKTable (RocksDB) restore

I’m running a Kafka Streams application (tested with versions 3.9.1 and 4.1.0) that uses a GlobalKTable backed by RocksDB. There are multiple instances of the application, each with 4 stream threads. ...
Sona Torosyan's user avatar
2 votes
1 answer
64 views

Unknown magic byte with spring cloud stream aggregation

I'm developing a Spring Cloud Stream app that accepts messages in Avro format. After setting up everything, I start my app, then get an Unknown magic byte error. I'm thinking this is because I'm using ...
manaclan's user avatar
  • 1,044
0 votes
1 answer
32 views

Can the statestore be shared across two processors?

Here is a simple topology: stream1 .merge(stream2) .merge(stream3) .to("topic1) stream2 = builder.stream("topic1") stream2. process(() -> new Processor<>() { process() { Read ...
user3092576's user avatar
2 votes
1 answer
57 views

Kafka Streams application does not purge repartition topic records when manual commit

I'm currently developing a Kafka Streams application. Due to internal requirements, I've disabled auto-commits by setting commit.interval.ms to a very long duration (e.g. Long.MAX). Instead, I'm using ...
Junhyun Kim's user avatar
0 votes
1 answer
47 views

Wrong partitions are assigned to KStream threads after topic selectKey / repartition

I need to consume two topics A with 40 partitions and B with 10 partitions and keep some info in a shared persistent state store with social security number SSN of type string as its key and a custom ...
losingsleeep's user avatar
  • 1,899
1 vote
0 answers
28 views

When migrating Kafka Streams to Spring Cloud Stream how to resolve InconsistentGroupProtocolException

The prior application is on kafka-streams (KStream) and has been rewritten to use Spring Cloud Stream. Both apps are on kafka-clients-3.1.2. When starting the upgraded application it gives: java.lang....
awgtek's user avatar
  • 1,899
-2 votes
1 answer
35 views

Linear processing a kafka topic based on a constraint

I have a kafka topic producing product for multiple companies {"company": "c1", "productIds": ["p1", "p2", "p3"]} {"company": &...
user2890683's user avatar
0 votes
1 answer
38 views

Kafka Streams KTable Race Condition: Multiple Concurrent Updates See Same Stale State

I'm building a conference system using Kafka Streams where users can join/leave rooms. I'm experiencing a race condition where multiple concurrent leave requests see the same stale room state, causing ...
lastpeony4's user avatar
0 votes
1 answer
51 views

Does Kafka Streams StreamTask process records from multiple co-partitioned topics sequentially or in parallel?

I have read the explanation written here that one StreamTask is handeling all messages from co-partitioned topics: How do co-partitioning ensure that partition from 2 different topics end up assigned ...
Fatemah Soliman's user avatar
3 votes
0 answers
77 views

Kafka Stream .checkpoint not created

I’m comparing how GlobalKTable Kafka Streams handles checkpointing in version 2.5.0 versus 3.7.0, and I’ve identified a regression: in 3.7.0 the .checkpoint file isn’t created the first time you drain ...
paul's user avatar
  • 13.6k
0 votes
1 answer
45 views

How can I emit the delete records to the output topic, and make that key tombstoned and removed from KTable internal state?

I'm working on a Change Data Capture (CDC) application using Kafka Streams, and I have the following setup: A KStream is converted to a KTable (let's call it kt1). I perform a left join between kt1 ...
Hari Krishnan U's user avatar
0 votes
0 answers
108 views

Kafka Streams State store behaving differently if stream is converted beforehand

I am expiriencing a very weird behaviour in my kafka streams application. Setup is the following: I create the state store "user-store" manually and connect it to a processor "filter-...
Tino's user avatar
  • 3
0 votes
0 answers
23 views

Kafka Consumer unable to read messages because of partitions

I have written sample Kafka Producer application without mentioning partition which sends message to Consumer application. My Consumer application is written using Kafka Streams as it is using state ...
Bandita Pradhan's user avatar
1 vote
2 answers
61 views

Kafka stream in memory heap memory [closed]

I'm having a topic with a size of 3gb of raw data in 60 Million records. But when I consume those records and I allocate in memory (Not using RocksDB) the needed heap memory is around 10GB. I cannot ...
PABLO GARCIA's user avatar
4 votes
0 answers
168 views

decrease response time in Kafka streams

I have a project with kafka streams to create one minute candle on price for stock. My topology code is : List<String> inputTopics = new ArrayList<>(); inputTopics.add(tradeTopic); ...
mohammadjavadkh's user avatar
0 votes
1 answer
136 views

Data Loss in Kafka Stream Application

We have a KStream application (Kafka 3.6) which consumes from 3 topics and do some repartitioning, selectKey and reduce operation to create KTable and we use these 3 KTables to do LeftJoin and then ...
Justin's user avatar
  • 745
0 votes
1 answer
35 views

How does Kafka Streams' KTable.groupBy + aggregate guarantee correct order of the re-partitioned messages?

If I have a kafka input topic with multiple partitions and then in Kafka Streams I use kStream.map to change the key of each record and write that to an output topic, I will face the problem, that ...
selbstereg's user avatar
3 votes
1 answer
36 views

KGroupedStream reduce use key in Reducer logic

I am using Kafka Streams to group and reduce a kafka topic. I want to generate an output for a key, if the key and the value are equal for all values against a given key; otherwise don't output ...
simonalexander2005's user avatar
0 votes
1 answer
29 views

How do I access the actual messages comprising a kstream sliding window aggregation?

I have a stream of <K,V> messages. When emitting any satisfied sliding windows, I want to know the list that the window matched. I want to avoid taking on the accumulation job myself within my ....
redgiant's user avatar
  • 606
0 votes
1 answer
30 views

instanceof not working in KafkaStreams filter

In the code below: var processed = builder.stream(topics.getRateLimitProcessorTopic(), Consumed.with(SerdesFactory.bucketKeySerdes(), SerdesFactory.bucketOperationSerdes())) .process(() -> ...
Max's user avatar
  • 1
1 vote
1 answer
121 views

Kafka stream message processing semantics

I am designing a Kafka stream app and want to know few details in order to design my failover strategy. I tried reading Kafka stream doc and existing stackoverflow posts, but was unable to find the ...
Aakash Gupta's user avatar
0 votes
0 answers
190 views

Using Kafka Streams with MSK Express brokers - internal topic creation

I want to migrate my Kafka infrastructure to AWS MSK, and I've noticed there is new broker type called Express Brokers. I really want to use it since it is supposed to be faster, supports "hands-...
Amitb's user avatar
  • 612
2 votes
1 answer
125 views

Kafka Streams default value for session.timeout.ms?

We are using Kafka Streams and Karpenter with normal Deployment in order to manage the pods for a service that we have. After Karpenter decides to kill the pod, it brings a new Pod up, and we are ...
alext's user avatar
  • 842
0 votes
1 answer
37 views

Does stream time in a windowed table advance per key or globally?

I have a tricky case where a lot of historical JSON data was written to Kafka with a custom partitioning strategy. We are currently developing a system to make use of this historical data and perform ...
filpa's user avatar
  • 3,744
0 votes
1 answer
32 views

default max.compaction.lag.ms for changelog

I use below: windowedBy(TimeWindows.of(Duration.ofHours(6))) .aggregate(aggregator, aggregator, Materialized.as("my-agg")) Changelog was created with below configs cleanup....
Abe's user avatar
  • 716
0 votes
0 answers
71 views

Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table. Aindrail

The error is ** Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled. 2025-02-21T16:09:37.761+05:30 ERROR 38852 --- [Ecommerce] [ ...
Aindrail Santra's user avatar
0 votes
1 answer
54 views

Kafka Streams KTable-KTable foreign key join emits null even if right side is empty

What is the semantics for a Kafka Streams (3.7.1) KTable-KTable foreign key join, where the extracted foreign key has never matched against the primary key in the right-side ktable? In this example ...
KarlP's user avatar
  • 5,221
1 vote
1 answer
253 views

Kafka Streams partition assignment problem with multiple topics and scaling

I'm working on a kafka streams application that consumes from a consumer group with three topics. One topic has 20 partitions, another 10, and the last one 5. So in total, this consumer group has 35 ...
rwachter's user avatar
0 votes
1 answer
29 views

In Helidon SE 4.1.6 , how to send data to a specific partition using kafka producer

I want to use Helidon SE 4.1.6 and producer the data to a specific partition of Apache Kafka using producer. Detail : I have gone through the https://helidon.io/docs/latest/se/reactive-messaging#...
MOHAMMAD SHADAB's user avatar
0 votes
0 answers
93 views

Analyzing Kafka Streams For Anomaly Detection

I am new to Kafka streams and have come across a use case where there is a need to detect anomalies in an event stream. Here is the high level use case: Events There is a stream of incoming events in ...
Saikat's user avatar
  • 568
1 vote
1 answer
59 views

Is there any side effect of setting repartition.purge.interval.ms config with too high value in order to prevent purge?

We are using Kafka Streams in our application to process events. In order to join operation we use both repartition and selectKey methods of Kafka Streams and this cause to create an internal ...
ceb's user avatar
  • 39
0 votes
0 answers
78 views

Kafka Streams Consumer Constantly Rebalance over 100k tps

We have a kafka streams service that performs a left join (KStreams) operation by their message key. The message size is 1 KB more or less. The left topic has around two hundred thousand (200,000) ...
Martinus Elvin's user avatar
1 vote
0 answers
84 views

Error retrieving state store during graceful shutdown

Context and analysis Our Spring Boot application relies on information stored in Kafka to answer REST requests. It does so by retrieving information from a global state store via the ...
Cédric Schaller's user avatar
2 votes
1 answer
96 views

How do extract messages from a specific producer in Kafka

I have two Kafka queues(queue1 & queue2), and multiple producers(clients) are posting messages to both queues. Each client sends different types of messages to the queues (e.g., client1 send "...
deen's user avatar
  • 21
2 votes
1 answer
44 views

Kafka streams partition assigner not using all available clients

We have a Java application which uses Streams library to process data. The streams application does not join data from multiple topics and processes each message received independently. The ...
Andrey's user avatar
  • 478
1 vote
1 answer
53 views

Kafka Stream KTable Foreign Key Left join outputs more tombstones than expected

After migrating to the latest streams version 3.9.0 from 3.5.0, I notice a behaviour in the left foreign key join, that I am not able to understand. For a left foreign key join : KTable<String, ...
Sumit Baurai's user avatar
0 votes
0 answers
22 views

SessionWindows in Kafka

I'm reading a Confluent blog about Windowing in Kafka Strams: https://www.confluent.io/blog/windowing-in-kafka-streams/ and I found this under the Session Windows: If your inactivity window is too ...
gregof's user avatar
  • 23
2 votes
2 answers
111 views

Inconsistent Results with KStream-KTable Join Under Load

I'm developing a Quarkus microservice that utilizes Kafka Streams to process messages from multiple topics. Specifically, I'm attempting to join a KStream and a KTable derived from two of these topics....
Alex H's user avatar
  • 31
1 vote
1 answer
75 views

Seeing duplicate records created in a streams app's output

I have a Kafka Streams app that takes input from a Kafka Topic, aggregates it on three fields in the original's value in 5 minute windows. On the output side of this, I need to translate the ...
John Ament's user avatar
  • 11.8k
4 votes
0 answers
172 views

Java virtual threads and kafka streams

We have built real time data processing pipelines on kafka topics in the past with kafka streams technology. But we were always limited by the number of partitions on the kafka topic for concurrency ...
birinder tiwana's user avatar
0 votes
0 answers
65 views

kafka-streams does not add the right kafka-clients dependency

org.apache.kafka:kafka-streams:jar:3.9.0 is suposed to use org.apache.kafka:kafka-clients:jar:3.9.0, but when I run mvn dependency:tree, I get [INFO] +- org.apache.kafka:kafka-streams:jar:3.9.0:...
M. Bouzaien's user avatar
0 votes
1 answer
77 views

Kafka Streams state store vs MongoDB for state management

I am working on a distributed system using Kafka Streams for communication between components. One of the components, (for simplicity BRAIN), manages a sequence of messages to other components (A, B, ...
Paul Marcelin Bejan's user avatar
1 vote
1 answer
191 views

Multiple topologies in Spring Kafka streams

What's the easiest or best way to create multiple topologies in a Spring Boot Kafka streams application? Is it possible to use the same default StreamBuilder bean? Or if I need to create a new ...
losingsleeep's user avatar
  • 1,899
0 votes
1 answer
49 views

Kafka Streams Window Size (15 mins) is greater than the max poll interval (5 min)

I have a Kstreams application where I am reading from an input topic, performing aggregation in a window of 15 min , suppressing and then performing some operation on each record, following is the ...
Ashutosh Singh's user avatar
0 votes
0 answers
71 views

TraceId not printing in kafka-stream application

I have kafka stream application which is running in springboot-2.x, kafka-streams-2.5.1 and spring-cloud-sleuth (log tracing) and I’m using KafkaStreamsTracing for print traceId and spanId. Which is ...
sundararajan s's user avatar
3 votes
1 answer
42 views

Kafka Streams: NPE in ProcessorRecordContext and Suppress issues with processValues()

I'm experiencing two issues with Kafka Streams' processValues() and suppress operations: Getting NPE when using processValues(): @Bean public Function<KStream<String, String>, KStream<...
suno3's user avatar
  • 201
0 votes
1 answer
31 views

App producing 2 messages for only 1 input

I'm trying to debug a problem in our production Kafka Streams app. The (simplified) topology looks something like this builder.stream("input").groupByKey().reduce( (agg, val) -> "...
Egor's user avatar
  • 1,660
0 votes
1 answer
65 views

Kafka Streams send custom headers with Transformer when multiple messages output

I want to send multiple messages downstream using Transformer (kafka streams dsl) private ProcessorContext context; @Override public void init(ProcessorContext context) { this....
xmm_581's user avatar
  • 41

1
2 3 4 5
81