0

I want to send multiple messages downstream using Transformer (kafka streams dsl)

    private ProcessorContext context;


    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

@Override
    public KeyValue<String, Map<String, String>> transform(String s, String msg) {
        Headers headers = context.headers();
        for (int i = 0; i < 2; i++) {
            headers.add("test", "test".getBytes());
            headers.add("meta", "test".getBytes());

            context.forward("new key", msg);
        }
        return null;
    }

On downstream there would be TopicNameExtractor

.transform(TestTransformer::new)
                .to(orderTopicNameExtractor, Produced.with(Serdes.serdeFrom(String.class),
                        Serdes.serdeFrom(String.class)));

Where I need to use data from header (there would be topic name), also other headers must be written to Kafka to.

The problem is, that on second message next step is receiving multiple headers (so there would be 2 "test" keys), only way is to remove them before adding in transform cycle. It works locally, but what about thread safety and high throughput? Context seems to be shared, can there be a situation where remove() will remove header before downstream extractor processes a message?

headers.remove("meta");
headers.add("meta", "test".getBytes());
context.forward("new key", msg);

Is there any more right way to solve this problem? The only way i think maybe will work is to use Process interface and send message with withHeaders(new RecordHeaders()), or just add them to message directly

1 Answer 1

0

Header support is somewhat limited... I would recommend to move to the new KStream.process(...) which takes an api.Processor instead of the old (and already deprecated transform()). It should handle header better.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.