4

How can I convert this code into Java config?

<int-kafka:outbound-channel-adapter
        id="mainOutboundChannelAdapter"
        kafka-producer-context-ref="kafkaProducerContext"
        channel="mainOutboundTopicChanel">
</int-kafka:outbound-channel-adapter>

1 Answer 1

3

Yes, you can. Please, find the latest Spring Integration Java DSL:

Your case may looks like:

@Bean
public IntegrationFlow sendToKafkaFlow(String serverAddress) {
    return f -> f.<String>split(p -> FastList.newWithNValues(100, () -> p), null)
            .handle(kafkaMessageHandler(serverAddress));
}

private KafkaProducerMessageHandlerSpec kafkaMessageHandler(String serverAddress) {
    return Kafka.outboundChannelAdapter(props -> props.put("queue.buffering.max.ms", "15000"))
            .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .addProducer(TEST_TOPIC, serverAddress, this::producer);
}

private void producer(KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata) {
    metadata.async(true)
            .batchNumMessages(10)
            .valueClassType(String.class)
            .<String>valueEncoder(String::getBytes)
            .keyEncoder(new IntEncoder(null));
}

UPDATE Without Lambdas, but still Spring Integration:

@Bean
@ServiceActivator(inputChannel = "mainOutboundTopicChanel")
public MessageHandler kafkaProducer() {
    return new KafkaProducerMessageHandler<String, String>(kafkaProducerContext());
}

@Bean
public KafkaProducerContext<String, String> kafkaProducerContext() {
    KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
    ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>(TOPIC);
    producerMetadata.setValueClassType(String.class);
    producerMetadata.setKeyClassType(String.class);
    Encoder<String> encoder = new StringEncoder<String>();
    producerMetadata.setValueEncoder(encoder);
    producerMetadata.setKeyEncoder(encoder);
    producerMetadata.setAsync(true);
    Properties props = new Properties();
    props.put("queue.buffering.max.ms", "15000");
    ProducerFactoryBean<String, String> producer =
            new ProducerFactoryBean<String, String>(producerMetadata, kafkaRule.getBrokersAsString(), props);
    ProducerConfiguration<String, String> config =
            new ProducerConfiguration<String, String>(producerMetadata, producer.getObject());
        kafkaProducerContext.setProducerConfigurations(Collections.singletonMap(TOPIC, config));
    return kafkaProducerContext;
}

And don't forget to add @EnableIntegration alongside with your @Configuration.

For the future: any XML tag in Spring is parsed by some NamespaceHandler, e.g. in this case it is KafkaNamespaceHandler. Reading its source code we can find these lines:

registerBeanDefinitionParser("outbound-channel-adapter", new KafkaOutboundChannelAdapterParser());
        registerBeanDefinitionParser("producer-context", new KafkaProducerContextParser());

When we go to the KafkaOutboundChannelAdapterParser and see that it populates a BeanDefinition:

final BeanDefinitionBuilder kafkaProducerMessageHandlerBuilder =
                                BeanDefinitionBuilder.genericBeanDefinition(KafkaProducerMessageHandler.class);

and so on by source code.

UPDATE 2

The Consumer part:

@Bean
@InboundChannelAdapter(value = "fromKafkaChannel",
    poller = @Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
public MessageSource<Map<String, Map<Integer, List<Object>>>> kafkaMessageSource() {
    return new KafkaHighLevelConsumerMessageSource<String, String>();
}

@Bean
public KafkaConsumerContext<String, String> kafkaConsumerContext() {
    KafkaConsumerContext<String, String> kafkaConsumerContext = new KafkaConsumerContext<String, String>();
    .....
    kafkaConsumerContext.setConsumerConfigurations(map);
    return kafkaConsumerContext;
}
Sign up to request clarification or add additional context in comments.

5 Comments

Hi Artem, thank you for the response and I've seen these sort of answers before. I feel like people look away from these because of the java 8 lambda features. Regular java would probably be more helpful to the many that still don't know java 8.
Yeah... I understand your (and others) concern... Please, find an update in my answer.
Thank you, as a junior developer, I'm still learning overall how to dive into different scenarios and whatnot. Your code is very very helpful. If it isn't too much to ask, would you happen to post the corresponding ConsumerContext with java config as well? I can't seem to find a stable tutorial on google for that.
Please, find an update for @InboundChannelAdapter in my answer. And feel free to raise a GitHub issue (github.com/spring-projects/spring-integration-kafka/issues) for the Java Config documentation.
Man, thank you so much. You have no idea how long i've been stuck on this. Thank you!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.