当前位置: 首页 > news >正文

kafka jdbc connector适配kadb数据实时同步

  • 测试结论

源端增量获取方式包括:bulk、incrementing、timestamp、incrementing+timestamp(混合),各种方式说明如下:

bulk: 一次同步整个表的数据

incrementing: 使用严格的自增列标识增量数据。不支持对旧数据的更新和删除

timestamp: 使用时间戳标识增量数据,每次更新数据都要修改时间戳,时间戳严格递增

timestamp+incrementing: 使用两个列,一个为自增列,一个为时间戳列。综合incrementing和timestamp的功能

  • 环境说明

本文在kafka的standalone模式下,适配kafka jdbc connector从源端mysql数据库实时同步数据到kadb中。验证1. 增量数据获取及增量数据获取方式

  1. kadb版本:V8R3
  2. mysql版本:5.7
  3. 操作系统:centos 7.6
  4. jdbc connector版本:10.8.3。下载地址:JDBC Connector (Source and Sink) | Confluent Hub: Apache Kafka Connectors for Streaming Data.
  5. mysql驱动:mysql-connector-java-5.1.39-bin.jar
  6. kadb驱动:postgresql-42.7.4.jar
  7. java版本:17.0.12 (kafka要求必须为17或者18版本,否则kafka安装报错)
  8. kafka版本:kafka_2.13-4.0.0
  9. kafka jdbc connector参考资料:

JDBC Source and Sink Connector for Confluent Platform | Confluent Documentation

  1. kafka connector参考资料

https://kafka.apache.org/documentation/

  • 环境部署
  1. kafka部署

解压

tar -xzf kafka_2.13-4.0.0.tgz

cd kafka_2.13-4.0.0

产生集群UUID

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

格式化日志目录

bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

启动kafka

bin/kafka-server-start.sh config/server.properties

  1. jdbc connector部署

下载jdbc connector,将解压的内容保存到kafka解压目录的plugins下(plugins目录需自己创建内容如下:

[root@nanri plugins]# ls -l

total 8

drwxr-xr-x. 2 root root   43 Apr 17 21:50 assets

drwxr-xr-x. 3 root root  108 Apr 17 21:50 doc

drwxr-xr-x. 2 root root   90 Apr 17 21:50 etc

drwxr-xr-x. 2 root root 4096 Apr 17 21:50 lib

-rw-r--r--. 1 root root 2687 Apr 17 21:50 manifest.json

[root@nanri plugins]# pwd

/root/kafka_2.13-4.0.0/plugins

  1. 源端/目标端jdbc驱动

将源端mysql的jdbc驱动文件和目标端kadb驱动文件拷贝至kafka的解压目录的libs目录下:

[root@nanri libs]# ls -l mysql* postgres*

-rw-r--r--. 1 root root  989497 Apr 17 23:15 mysql-connector-java-5.1.39-bin.jar

-rw-r--r--. 1 root root 1086687 Apr 17 23:14 postgresql-42.7.4.jar

[root@nanri libs]# pwd

/root/kafka_2.13-4.0.0/libs

  1. 配置文件修改
  1. 连接器配置文件:connect-standalone.properties

添加插件路径参数:(绝对路径)

plugin.path=/root/kafka_2.13-4.0.0/plugins,/root/kafka_2.13-4.0.0/libs/connect-file-4.0.0.jar

  1. 源端配置文件:connect-mysql-source.properties文件内容,参数意义参考:

https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html

#productor名字

name=connect-mysql-source                

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector    //固定值,使用jdbc connector的类

# topic名称列表,源端和目标端的topic必须一致

topics=test

# 配置jdbc连接

connection.url=jdbc:mysql://192.168.85.145:3306/test_source?useUnicode=true&characterEncoding=utf8&user=root&password=Kingbase@1234&serverTimezone=Asia/Shanghai&useSSL=false

#增量获取方式,支持bulk,incrementing,timestamp等等

mode=incrementing

  1. 目标端配置文件:connect-kadb-sink.properties文件内容,参数意义参考:

https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html

#consumer名字

name=connect-kadb-sink

# 为当前connector创建的最大线程数

tasks.max=1

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector //固定值,必须设置

# topic名称列表

topics=test

# 配置jdbc连接

connection.url=jdbc:postgresql://192.168.85.145:5432/test_sink

connection.user=mppadmin

# 自动创建表

auto.create=true

# 写入模式

insert.mode=insert

  1. 启动connect

bin/connect-standalone.sh

config/connect-standalone.properties                //connect配置参数

config/connect-mysql-source.properties    //源端配置参数

config/connect-kadb-sink.properties            //目标端参数

  1. 测试
  1. mysql源端创建表,目标端会自动创建对应的表

mysql> desc test

    -> ;

+-------+-------------+------+-----+---------+----------------+

| Field | Type        | Null | Key | Default | Extra          |

+-------+-------------+------+-----+---------+----------------+

| a     | int(11)     | NO   | PRI | NULL    | auto_increment |    //使用increment ing方式,必须是自增列

| b     | varchar(10) | YES  |     | NULL    |                |

+-------+-------------+------+-----+---------+----------------+

2 rows in set (0.00 sec)

  1. 源端插入数据

mysql> insert into test(b) values('dddd');

Query OK, 1 row affected (0.00 sec)

  1. connect日志:

[2025-04-18 22:39:27,665] INFO [connect-kadb-sink|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Completed write operation for 1 records to the database (io.confluent.connect.jdbc.sink.JdbcDbWriter:100)

[2025-04-18 22:39:27,680] INFO [connect-kadb-sink|task-0] Successfully wrote 1 records. (io.confluent.connect.jdbc.sink.JdbcSinkTask:91)

[2025-04-18 22:39:32,637] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:32,641] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:34,208] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 0 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:37,642] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:37,644] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:42,645] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:42,648] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:44,210] INFO [connect-mysql-source|task-0|offsets] WorkerSourceTask{id=connect-mysql-source-0} Committing offsets for 1 acknowledged messages (org.apache.kafka.connect.runtime.WorkerSourceTask:236)

[2025-04-18 22:39:47,649] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:47,650] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:52,653] INFO [connect-mysql-source|task-0] Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

[2025-04-18 22:39:52,657] INFO [connect-mysql-source|task-0] Current Result is null. Executing query. (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:174)

[2025-04-18 22:39:54,192] INFO Database connection established. (io.confluent.connect.jdbc.util.CachedConnectionProvider:64)

  1. 使用kafka-console-consumer.sh查看topic中的事件

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"a"},{"type":"string","optional":true,"field":"b"}],"optional":false,"name":"test"},"payload":{"a":5,"b":"dddd"}}

  1. 目标端数据

1 | aaa

 2 | bbb

 3 | ccc

 4 | ddd

 5 | dddd

(844 rows)

test_sink=#

  1. 源端数据

mysql> select * from test;

+---+------+

| a | b    |

+---+------+

| 5 | dddd |

+---+------+

1 row in set (0.00 sec)

  1. 命令参考

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic sys_config

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic sys_config

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic sys_config --from-beginning

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 –list

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --all-groups

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-local-file-sink –state

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic __consumer_offsets

bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092

相关文章:

  • Uniapp调用native.js使用经典蓝牙串口通讯方法及问题解决
  • Web 前端包管理工具深度解析:npm、yarn、pnpm 全面对比与实战建议
  • 第五章 SQLite数据库:4、SQLite 进阶用法:常见的约束、PRAGMA 配置、数据操作
  • 微信小程序怎么分包步骤(包括怎么主包跳转到分包)
  • UE5 渲染视频
  • RAG 实战|用 StarRocks + DeepSeek 构建智能问答与企业知识库
  • 力扣刷题-热题100题-第35题(c++、python)
  • 捕鱼船检测数据集VOC+YOLO格式2105张1类别
  • 【工具-Krillin AI】视频翻译、配音、语音克隆于一体的一站式视频多语言转换工具~
  • BFS DFS ----习题
  • C语言教程(十):C 语言函数详解
  • 数据结构之队列及其应用
  • Openfein实现远程调用的方法(实操)
  • 聊一聊接口测试是如何进行的?
  • Vue3如何选择传参方式
  • 虚幻基础:ue引擎的碰撞
  • HTTP/1.1 队头堵塞问题
  • 函数对象-C++
  • Linux 系统的启动流程
  • 树莓派超全系列教程文档--(30)autoboot.txt介绍
  • 见证上海援藏30年成果,萨迦非遗珍品展来沪
  • 香港警务处高级助理处长叶云龙升任警务处副处长(行动)
  • 为何未来的福利国家必须绿色且公平
  • 强政神鸟——故宫里的乌鸦
  • 政治局会议深度|提出“设立新型政策性金融工具”有何深意?
  • 我国首次实现地月距离尺度的卫星激光测距