3

I am trying Kafka for the first time and set up Kafka cluster using AWS MSK. The objective is to stream data from MySQL server to Postgresql. I used debezium MySQL connector for source and Confluent JDBC connector for the sink.

MySQL config:

  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "database.server.id": "1",
  "tasks.max": "3",
  "internal.key.converter.schemas.enable": "false",
  "transforms.unwrap.add.source.fields": "ts_ms",
  "key.converter.schemas.enable": "false",
  "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "internal.value.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false",
  "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "transforms": "unwrap",
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"

After registering the Mysql connector, its status is "running" and capturing the changes being made in MySQL table and showing result in consumer console in the following format:

{"id":5,"created_at":1594910329000,"userid":"asldnl3r234mvnkk","amount":"B6Eg","wallet_type":"CDW"}

My first issue: in table "amount" column is of type "decimal" and contains numeric value but in consumer console why it is showing as alphanumeric value?

For Postgresql as target DB, I used JDBC sink connector, with following config:

"name": "postgres-connector-db08",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "tasks.max": "1",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "key.converter.schemas.enable": "false",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false",
  "topics": "mysql-cash.kafka_test.test",
  "connection.url": "jdbc:postgresql://xxxxxx:5432/test?currentSchema=public",
  "connection.user": "xxxxxx",
  "connection.password": "xxxxxx",
  "insert.mode": "upsert",
  "auto.create": "true",
  "auto.evolve": "true"

After registering JDBC connector when I check status it gives an error:

{"name":"postgres-connector-db08","connector":{"state":"RUNNING","worker_id":"x.x.x.x:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"x.x.x.x:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
 org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
 org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
 org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
 org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
 org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
 org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 java.util.concurrent.FutureTask.run(FutureTask.java:266)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'postgres-connector-db08' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='mysql-cash.kafka_test.test',partition=0,offset=0,timestamp=1594909233389) with a HashMap value and null value schema.
 io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:83)
 io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
 io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
 io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
 org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
... 10 more
"}],"type":"sink"}

Why this error is coming? Is something I missed in the sink config?

2 Answers 2

0

https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/index.html#data-mapping

The sink connector requires knowledge of schemas, so you should use a suitable converter e.g. the Avro converter that comes with Schema Registry, or the JSON converter with schemas enabled.

Since the JSON is plain (has no schema) and the connector is configured with "value.converter.schemas.enable": "false" (JSON converter with schemas disabled), Avro converter should be set up with Schema Registry: https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#applying-schema

Sign up to request clarification or add additional context in comments.

Comments

0

Answer about first issue. "Why decimal is dropped in alpha-numerical format?"

The conversion of decimal depends on decimal.handling.mode configuration.

Specifies how the connector should handle values for DECIMAL and NUMERIC columns: precise (the default) represents them precisely using java.math.BigDecimal values represented in change events in a binary form; or double represents them using double values, which may result in a loss of precision but will be far easier to use. string option encodes values as formatted string which is easy to consume but a semantic information about the real type is lost.

https://debezium.io/documentation/reference/0.10/connectors/mysql.html#decimal-values

If there's no proper conversion configure for you, you can also create custom converter.

https://debezium.io/documentation/reference/stable/development/converters.html

If lucky you can find some open-source converters to solve this issue.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.