0

I am new to Kafka & trying to make a decent project for my resume. So in a multi-service (microservices running on docker) model in which I'm developing multiple Spring Boot applications, two of them utilizes Kafka for messaging between them. Despite configuring the serializers and deserializers in my application.properties files, I'm encountering deserialization errors when consuming messages. Below are the relevant configurations and code snippets:

Producer Service (Clynic-Service) Configuration:

# application.properties

spring.application.name=Clynic-Service
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
server.port=4000
logging.level.root=info

Producer Code:

package com.beingadish.projects.clynicservice.kafka;

import com.beingadish.projects.clynicservice.Model.Patient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import patient.events.PatientEvent;

@Service
public class KafkaProducer {

    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
    private final KafkaTemplate<String, byte[]> kafkaTemplate;

    public KafkaProducer(KafkaTemplate<String, byte[]> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendEvent(Patient patient) {
        PatientEvent patientEvent = PatientEvent.newBuilder()
                .setPatientId(patient.getId().toString())
                .setEmail(patient.getEmail())
                .setName(patient.getName())
                .setEventType("PATIENT_CREATED")
                .build();

        try {
            kafkaTemplate.send("patient", patientEvent.toByteArray());
        } catch (Exception e) {
            log.error("Error sending patient event: {}", patientEvent);
        }
    }
}

Consumer Service (Analytics-Service) Configuration:

# application.properties

spring.application.name=Analytics-Service
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

Consumer Code:

package com.beingadish.projects.analyticsservice.kafka;

import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import patient.events.PatientEvent;

@Service
public class KafkaConsumer {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);

    @KafkaListener(topics = "patient", groupId = "analytics-service")
    public void consumeEvent(byte[] event) {
        try {
            PatientEvent patientEvent = PatientEvent.parseFrom(event);
            log.info("Received Patient Event: [Patient Id={}, Patient Name={}, PatientEmail={}]",
                    patientEvent.getPatientId(),
                    patientEvent.getName(),
                    patientEvent.getEmail());
        } catch (InvalidProtocolBufferException e) {
            log.error("Error Deserializing Patient Event {}", e.getMessage());
        }
    }
}

Proto File (patient_event.proto)

syntax = "proto3";

package patient.events;
option java_multiple_files = true;

message PatientEvent {
  string patientId = 1;
  string name = 2;
  string email = 3;
  string event_type = 4;
}

Issue:

Despite the above configurations, I'm encountering deserialization errors when the consumer attempts to process messages. The error suggests that the consumer is unable to deserialize the incoming byte array into the expected PatientEvent object. It is somehow treating the serialized [B as String. The exact error which I encounter is this:

Edit:(full Error Message)

2025-03-30T02:27:01.989Z ERROR 1 --- [Analytics-Service] [ntainer#0-0-C-1] o.s.kafka.listener.DefaultErrorHandler : Backoff FixedBackOff{interval=0, currentAttempts=1, maxAttempts=0} exhausted for patient-0@2 2025-03-30T02:27:02.248459758Z 2025-03-30T02:27:02.248522112Z org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message 2025-03-30T02:27:02.248530622Z Endpoint handler details: 2025-03-30T02:27:02.248536552Z Method [public void com.beingadish.projects.analyticsservice.kafka.KafkaConsumer.consumeEvent(byte[])] 2025-03-30T02:27:02.248542051Z Bean [com.beingadish.projects.analyticsservice.kafka.KafkaConsumer@7dccceb5] 2025-03-30T02:27:02.248547644Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2982) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248553908Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2889) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248561987Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2853) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248568138Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2766) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248573964Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2604) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248580073Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2493) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.248587012Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2144) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249553424Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1520) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249568787Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1458) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249575607Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1327) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249805210Z at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na] 2025-03-30T02:27:02.249811461Z at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] 2025-03-30T02:27:02.249815586Z Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace 2025-03-30T02:27:02.249819110Z at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:499) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249823150Z at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:478) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249826834Z at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:421) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249830553Z at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249834153Z at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249838020Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2875) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249841699Z Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message 2025-03-30T02:27:02.249845240Z at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:478) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249849891Z at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:421) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249853471Z at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249857184Z at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249860953Z at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2875) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249864628Z ... 10 common frames omitted 2025-03-30T02:27:02.249868138Z Caused by: org.springframework.messaging.converter.MessageConversionException: Failed to convert message payload ' 2025-03-30T02:27:02.249871808Z $07da774f-44d7-4636-878e-5cd97afdea7bAadarsh Pandey Kakfa [email protected]"PATIENT_CREATED' to '[B' 2025-03-30T02:27:02.249875731Z at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:70) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249885757Z at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:147) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249889485Z at org.springframework.kafka.listener.adapter.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:48) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249893325Z at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249897307Z at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249901043Z at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249904670Z at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:71) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249909778Z at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:474) ~[spring-kafka-3.3.0.jar!/:3.3.0] 2025-03-30T02:27:02.249913410Z ... 14 common frames omitted 2025-03-30T02:27:02.249916917Z Caused by: org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [byte] for value [$07da774f-44d7-4636-878e-5cd97afdea7bAadarsh Pandey Kakfa [email protected]"PATIENT_CR (truncated)...] 2025-03-30T02:27:02.249921137Z at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:47) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249924757Z at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:182) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249928201Z at org.springframework.core.convert.support.StringToArrayConverter.convert(StringToArrayConverter.java:72) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249931777Z at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:41) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249935234Z at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:182) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249938928Z at org.springframework.core.convert.support.GenericConversionService.convert(GenericConversionService.java:165) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249942428Z at org.springframework.messaging.converter.GenericMessageConverter.fromMessage(GenericMessageConverter.java:66) ~[spring-messaging-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249946133Z ... 21 common frames omitted 2025-03-30T02:27:02.249949584Z Caused by: java.lang.NumberFormatException: For input string: "$07da774f-44d7-4636-878e-5cd97afdea7bAadarshPandeyKakfaTestkakfaTest@producer.com"PATIENT_CREATED" 2025-03-30T02:27:02.249958027Z at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:67) ~[na:na] 2025-03-30T02:27:02.249961791Z at java.base/java.lang.Integer.parseInt(Integer.java:647) ~[na:na] 2025-03-30T02:27:02.249965305Z at java.base/java.lang.Byte.parseByte(Byte.java:193) ~[na:na] 2025-03-30T02:27:02.249968643Z at java.base/java.lang.Byte.valueOf(Byte.java:249) ~[na:na] 2025-03-30T02:27:02.249972017Z at java.base/java.lang.Byte.valueOf(Byte.java:275) ~[na:na] 2025-03-30T02:27:02.249976761Z at org.springframework.util.NumberUtils.parseNumber(NumberUtils.java:195) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249980417Z at org.springframework.core.convert.support.StringToNumberConverterFactory$StringToNumber.convert(StringToNumberConverterFactory.java:64) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249984010Z at org.springframework.core.convert.support.StringToNumberConverterFactory$StringToNumber.convert(StringToNumberConverterFactory.java:50) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249987832Z at org.springframework.core.convert.support.GenericConversionService$ConverterFactoryAdapter.convert(GenericConversionService.java:415) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249991483Z at org.springframework.core.convert.support.ConversionUtils.invokeConverter(ConversionUtils.java:41) ~[spring-core-6.2.5.jar!/:6.2.5] 2025-03-30T02:27:02.249995144Z ... 27 common frames omitted 2025-03-30T02:27:02.249998725Z

2
  • An update, all this I have been using in separate docker containers, but as soon as I started running my Consumer (Analytics-Service) on Local Machine (for debugging), everything went fine. Then I also checked the contents of the app.jar file which is inside the docker container, it seems that everything (every proto generated files are already there). I am still not able to figure out what is the exact problem when running on Docker Container it is not able to deserialize the [B. Commented Mar 30 at 6:02
  • Please, consider to edit the stack trace in your question as a code snippet, preserving all the line breaks. Right now that stack is just not-readable. Commented Mar 31 at 15:18

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.