3

I'm trying to sync data between several MySQL databases with Confluent which base on Kafka Connect. I used "bulk" for mode in source connector config, since the primary key type is varchar, so I couldn't use incrementing mode. It works fine, but I got two problems:

  1. It seems that it couldn't sync deleting, when data was deleted in source databases, nothing happened to the sink databases. The data is still present in the sink databases.
  2. It takes quite a while to sync data. In my case, it takes about 2~4 minutes to sync a table with 3~4k rows. I can understand that using bulk mode may make it take more time to sync the data, but isn't that too long?

Here is my source connector config:

name=test-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://xxx.xxx.xxx:3306/xxx?useUnicode=true&characterEncoding=utf8
connection.user=user
connection.password=password
mode=bulk
table.whitelist=a_table

And this is my sink connector config:

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1 topics=a_table
connection.url=jdbc:mysql://xxx.xxx.xxx.xxx:3306/xxx?useUnicode=true&characterEncoding=utf8
connection.user=user
connection.password=password
insert.mode=upsert
pk.mode=record_value
pk.fields=mypk
auto.evolve=true

Any suggestion would be appreciate. Thank you.

1 Answer 1

4
  1. If you want to sync deletes, you'll need to use CDC, such as Debezium. JDBC connector can only detect records that are there, not those that aren't there.

  2. CDC is also more efficient than a bulk fetch, since it monitors the MySQL transaction log for any transactions on the tables required.

  3. Your primary key is VARCHAR? Wow. If you don't want to use CDC, I'd suggest using an INT-based key, and then incremental load with the JDBC connector. That, or add a timestamp column to the table, and use that for incremental.

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you, that helps a lot. I'll check articles about Debezium. I'd like to use INT-based keys and timestamps too, but the table schemas is not under my controll, so... Again, thank you very much.
BTW, any suggestion about the delay of the sync? In the doc of confluent, it says "poll.interval.ms" is default 5000, which means it will sync data every 5 seconds, right? But I found it takes longer and longer time to sync the data, is that because I use the bulk mode?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.