3

I'm using spring boot v2.2.4 and Apache Kafka in my project.

Below is my pom.xml file:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <version>1.4.200</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.github.docker-java</groupId>
            <artifactId>docker-java</artifactId>
            <version>3.1.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.12.3</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <!-- <dependency>
            <groupId>org.hibernate</groupId>
            <artifactId>hibernate-annotations</artifactId>
            <version>3.5.6-Final</version>
        </dependency>
 -->
    </dependencies>

Below is the code which i have as part of kafka

@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<String,ServingDetailsEntity> producerFactoryServingDetail(){
        Map<String,Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public ProducerFactory<String,String> producerFactory(){
        Map<String,Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory(config);
    }

    @Bean
    public KafkaTemplate<String, ServingDetailsEntity> kafkaTemplateItem(){
        return new KafkaTemplate<String, ServingDetailsEntity>(producerFactoryServingDetail());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(){
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

But when json message is sent to kafka queue, i'm getting below error

java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.Producer.close(Ljava/time/Duration;)V
    at org.springframework.kafka.core.KafkaTemplate.closeProducer(KafkaTemplate.java:382) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(KafkaTemplate.java:433) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:198) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:570) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:550) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:474) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:660) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224) ~[kafka-clients-0.11.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) ~[kafka-clients-0.11.0.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_201]

however json message is getting reached to queue, but i want to understand why i'm getting above error

looking forward for any help

2 Answers 2

3

Yes, as Gary has mentioned if you are using SpringBoot, let it manage the compatible versions.
You can also find the compatibility matrix here https://spring.io/projects/spring-kafka

Sign up to request clarification or add additional context in comments.

Comments

1

You are using an old version of kafka-clients. Try using a more recent one:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>

3 Comments

It's best to remove the version from the POM and let Spring Boot manage it - it will bring in the right (compatible) versions of spring-kafka and kafka-clients. You don't show which version of Boot you are using.
@GaryRussell 2.2.4.RELEASE is the spring boot
Boot 2.2.4 will pull in spring-kafka 2.3.5 and kafka-clients 2.3.1 automatically; if you decide to use the 2.4.0 (or 2.4.1) kafka-clients, you should also override the spring-kafka version to 2.4.4.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.