2

I tried to run Kafka consumer from the code it always exception but i ran kafka-console-consumer.sh file to check producer it works fine and show all message received by broker. Below are pom.xml code and exception logs. Kindly tell where i am wrong.

public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:2181");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id_coonfig" );
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.IntegerDeserializer");
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    return props;
}

Here is my Test class code.

@Test
public void testSpringKafkaConsumer() throws InterruptedException {

    try{
    String topics[] = { "programTopic3" };
    ConsumerFactory<Integer, String> factory = new DefaultKafkaConsumerFactory<>(configs);
    factory.createConsumer();
    AbstractMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(factory,
            topics);
    container.setBeanName("container");

    final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
    container.setMessageListener(new MessageListener<Integer, String>() {

        @Override
        public void onMessage(ConsumerRecord<Integer, String> message) {
            // logger.info("received: " + message);
            System.out.println("received: --------+++++++++++++++------------" + message);
            records.add(message);
        }
    });
    KafkaMessageDrivenChannelAdapter<Integer, String> adaptor = new KafkaMessageDrivenChannelAdapter<>(container);

    adaptor.start();
    ConsumerRecord<Integer, String> poll = null;
    while((poll =records.take()) != null){
        System.out.println(poll.topic() + "  topic");
        System.out.println(poll.key() + "   key");
        System.out.println(poll.value()+ "  value");
    }

    }catch(Exception exception)
    {
        exception.printStackTrace();
        Assert.fail();
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0

<groupId>com.learn.kafka.integrate.spring</groupId>
<artifactId>SpringIntegrationKafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>SpringIntegrationKafka</name>
<description>Demo project for Spring Integration kafka</description>

<properties>
    <springVersion>4.2.5.RELEASE</springVersion>
    <springIntegrationVersion>4.2.5.RELEASE</springIntegrationVersion>
    <mockitoVersion>1.10.19</mockitoVersion>
</properties>
<repositories>
    <repository>
        <id>repository.spring.milestone</id>
        <name>Spring Milestone Repository</name>
        <url>http://repo.spring.io/milestone</url>
    </repository>
</repositories>
<dependencies>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.21</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
        <version>${springIntegrationVersion}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-kafka</artifactId>
        <version>2.0.0.M1</version>
    </dependency>
    <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.9.0.1</version>
</dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${springVersion}</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-test</artifactId>
        <version>${springVersion}</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.3</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

Exception log:

org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value.
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:148)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:49)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:56)
at org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:336)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:494)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:46)
at com.learn.kafka.integrate.spring.TestConsumer.testSpringKafkaConsumer(TestConsumer.java:83)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:254)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:193)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
1
  • You are not showing the whole story ConsumerFactory<Integer, String> factory = new DefaultKafkaConsumerFactory<>(configs); - it appears the configs variable does not reference the properties created by consumerConfigs(). Commented Apr 26, 2016 at 12:58

1 Answer 1

2
org.apache.kafka.common.config.ConfigException: Missing required configuration "value.deserializer" which has no default value.

Looks like your new DefaultKafkaConsumerFactory<>(configs); doesn't use that consumerConfigs().

From other side the KafkaMessageDrivenChannelAdapter does exactly this in its ctor:

this.messageListenerContainer = messageListenerContainer;
this.messageListenerContainer.setAutoStartup(false);
this.messageListenerContainer.setMessageListener(this.listener);

So, your container.setMessageListener(new MessageListener<Integer, String>() { isn't reachable. Therefore nothing is going to appear in the records.

I'd recommend avoid Spring Integration for this particular test if you don't understand it yet.

For the KafkaMessageDrivenChannelAdapter variant you have to specify the outputChannel as a QueueChannel to retrieve message with the poll manner.

But also you have to do more BeanFactory stuff around KafkaMessageDrivenChannelAdapter.

See our test-case for more info: https://github.com/spring-projects/spring-integration-kafka/blob/master/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

Also pay attention to the sample application based on Kafka-0.9, too: https://github.com/spring-projects/spring-integration-samples/tree/master/basic/kafka

Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for the valuable feedback. I tried the same code it working fine if I gave 9092 port number with address which is kafka server port but if run from kafka consumer console I provide zookeeper port 2181. So I am confused with this.
I have error:- Missing required configuration "group.id" which has no default value. How to solve this error?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.