I'm experiencing two issues with Kafka Streams' processValues() and suppress operations:
- Getting NPE when using processValues():
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return inputStream -> inputStream
.processValues(() -> new HeartbeatProcessor()) // sends heartbeat events every second
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
Duration.ofSeconds(5), // inactivityGap
Duration.ofSeconds(1) // gracePeriod
))
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()
));
}
Caused by: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.topic" is null
at org.apache.kafka.streams.processor.internals.ProcessorRecordContext.serialize(ProcessorRecordContext.java:97)
Note: When using process() instead of processValues(), session windows close properly but I want to avoid repartitioning.
- Even after fixing the NPE by setting a topic name in ProcessorRecordContext, the suppressed session windows don't close properly:
public class HeartbeatProcessor implements FixedKeyProcessor<String, String, String> {
private FixedKeyProcessorContext<String, String> context;
@Override
public void init(FixedKeyProcessorContext<String, String> context) {
this.context = context;
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, this::generateHeartbeat);
}
@Override
public void process(FixedKeyRecord<String, String> record) {
context.forward(record);
}
private void generateHeartbeat(long timestamp) {
if (context instanceof InternalProcessorContext internalContext) {
internalContext.setRecordContext(
new ProcessorRecordContext(
timestamp,
0L,
context.taskId().partition(),
"dummy-topic",
new RecordHeaders()
)
);
}
Record<String, String> record = new Record<>(
"heartbeat-" + timestamp, // different key every second
"heartbeat",
timestamp
);
context.forward(InternalFixedKeyRecordFactory.create(record));
}
}
I'm using processValues() instead of process() to avoid repartitioning, and sending heartbeat events with different keys every second to trigger session closures, but windows aren't closing consistently.
Version: org.apache.kafka:kafka-streams:3.7.0