Canal组件学习使用
目录
- 一、Canal安装
- 1.1 环境准备
- 1.2 Canal.deployer 安装
- 1.3 补充配置说明
- 二、 CanalAdmin安装
- 2.1 CanalAdmin下载
- 2.2 CanalAdmin配置
- 2.3 CanalAdmin启动
- 2.4 admin 添加server
- 2.5 添加instance
- 2.6 admin使用的问题
- 三、canal使用
- 3.1 canal与 RabbitMQ 集成
- 3.2 canal与Kafka集成
- 3.3 canal与RDB集成
- 3.4 canal与es集成
- 3.5 canal与java客户端集成
一、Canal安装
Canal 是阿里巴巴开源的一个分布式 数据库增量订阅与消费 中间件。它的核心功能是监听数据库的 binlog(MySQL、MariaDB 或其他数据库的二进制日志),并将数据的变更实时地同步到其他系统中,比如消息队列、缓存系统、搜索引擎等。Canal 主要用于实现 数据同步、数据迁移、实时数据分析 等场景。
Canal 参考数据库的主从复制,将自己模拟成mysql的一个从库slave,从而获取到数据,主要流程可以概括如下:
- 连接 MySQL:Canal 通过配置的用户名和密码连接 MySQL 数据库。
- 订阅 Binlog:Canal 客户端订阅 MySQL 的 binlog 日志。
- 读取 Binlog:Canal 客户端读取 binlog 文件并解析。
- 解析 Binlog:Canal 解析 binlog 中的数据变更事件,将其转换为标准格式。
- 数据处理:Canal 可以对数据进行过滤、加工、转换等操作。
- 同步到目标系统:将处理后的数据同步到指定的目标系统。
1.1 环境准备
Canal 需要 Java 环境,确保已安装 JDK 8 或更高版本。
- mysql开启binlog
#my.cnf(liunx) my.ini(windows) 开启binlog
[mysqld]
#设置mysql服务的唯一标识,便于区分不同服务实例,用于数据库复制中。
server-id=1024
#开启binlog日志
log-bin=mysql-bin
#设置 MySQL binlog 日志格式为 ROW。
binlog-format=ROW
- binlog日志格式
格式 | 描述 | 用途 | 适用场景 |
---|---|---|---|
STATEMENT | 记录执行的 SQL 语句。每条 SQL 语句会被记录在二进制日志中。 | 适合于大多数常规的操作,尤其是简单的 DML 操作,如 INSERT 、UPDATE 、DELETE 。因为只记录 SQL 语句,可以减少日志量。 | 适用于不涉及复杂数据变更或需要节省存储空间的场景。对于常见的应用程序和简单数据库操作非常有效。 |
ROW | 记录每一行数据的具体变化(即记录行级别的变更)。 | 可以确保复制的精确性,因为它不依赖于 SQL 语句,而是直接记录数据行的变更。 | 当你需要精确的复制或者在数据库中进行复杂操作(如触发器、存储函数)时使用,以确保数据的一致性和正确性。 |
MIXED | 混合模式,根据不同的情况自动切换 STATEMENT 或 ROW 格式。例如,当使用不安全的 SQL 语句时,使用 ROW 格式。 | 结合了 STATEMENT 和 ROW 的优点,根据具体的操作选择合适的日志格式。 | 适用于大多数数据库应用,它结合了 STATEMENT 的低开销和 ROW 的精确性,适应性较强。 |
- 验证binlog
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
- 创建 Canal 用户
#需要创建 一个可以进行复制、查看复制状态的用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
-
创建一个名为
canal
的用户,密码为canal
。 -
授予该用户从任何主机连接到数据库的权限,并赋予以下特权:
- 读取所有数据库的权限(
SELECT
) - 作为复制从服务器的权限(
REPLICATION SLAVE
) - 查看复制状态的权限(
REPLICATION CLIENT
)
- 读取所有数据库的权限(
1.2 Canal.deployer 安装
Canal.deployer下载
- 打开 GitHub releases 页面:https://github.com/alibaba/canal/releases
- 选择最新的稳定版本,例如
canal.deployer-1.1.4
,并下载canal.deployer-1.1.4.tar.gz
文件。
# 下载安装
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz#解压
tar -zxvf canal.deployer-1.1.4.tar.gz #解压后目录结构
canal.deployer-1.1.4/
├── bin/ # 存放相关命令脚本
├── conf/ # 存放配置文件
├── lib/ # 存放 Canal 所需的所有依赖库
├── logs/ # 存放日志文件
Canal配置
- canal.properties: Canal 服务器的全局配置文件,包含了 Canal 服务端的一些基础设置。
- instance.properties Canal 的实例配置文件。它定义了每个 Canal 实例的数据同步设置,通常每个数据库实例都会有对应的配置文件。
#进入conf/目录 修改canal.properties 文件#Canal Server 的运行模式,可选值为 `tcp`(默认)或 `kafka`、`RocketMQ`
canal.serverMode = tcp
#Canal 监听的实例名称列表,多个实例用逗号分隔。
# 注意这里我将example 改成customer-canal 所以需要拷贝一份conf/example 改成conf/customer-canal
canal.destinations = customer-canal# 配置canal的缓存池大小和单位,变更的数据超过这个大小,canal不会接收新的变更数据
# 16384KB / 1024 = 16MB
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024 #字节# 配置管理页面 在后面安装crm admin使用,如果单独启动canal实例,为避免影响先注释掉
#canal.admin.manager = 127.0.0.1:8089
#canal.admin.port = 11110
#canal.admin.user = admin
#canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# 进入 进入conf/目录 复制example 为customer-canal
# 修改其中的instance.properties#配置同步的mysql数据信息
# 数据库连接信息
canal.instance.master.address=127.0.0.1:3306 # MySQL 地址
canal.instance.dbUsername=canal # MySQL 用户名
canal.instance.dbPassword=canal # MySQL 密码# 监听的数据库和表(支持正则表达式)canal.instance.filter.regex=your_database\\.your_table # 如果只监听某张表,可以具体指定
# canal.instance.filter.regex=your_database\\..* # 监听指定数据库的所有表
# canal.instance.filter.regex=db1\\..*|db2\\..* # 监听db1、db2数据库的所有表 监听多个库# 配置
canal.instance.mysql.slaveId=1234
Canal启动
# 启动canal
sh bin/startup.sh#查看日志 customer-canal 是根据canal.destinations = 'customer-canal' 自动生成
tail -f logs/customer-canal/customer-canal.log
1.3 补充配置说明
canal.properties 配置说明
配置项 | 默认值 | 说明 |
---|---|---|
canal.id | 无 | Canal Server 的唯一标识符,每个 Canal Server 需要设置不同的 ID。 |
canal.ip | 无 | Canal Server 的 IP 地址,如果未设置,则 Canal 会自动获取本机 IP。 |
canal.port | 11111 | Canal Server 的监听端口,客户端通过该端口连接 Canal Server。 |
canal.zkServers | 无 | 如果使用 Zookeeper 管理 Canal 集群,则需要配置 Zookeeper 的地址列表(格式:host1:port1,host2:port2 )。 |
canal.register.auto | true | 是否自动注册到 Zookeeper,如果是单机模式,可以设置为 false 。 |
canal.instance.global.mode | spring | Canal 的全局模式,可选值为 spring (默认)或 standalone 。 |
canal.instance.global.spring.xml | classpath:spring/default.xml | 全局 Spring 配置文件路径,用于加载 Canal 的 Bean 定义。 |
canal.instance.tsdb.enable | true | 是否启用 TSDB(时间序列数据库),用于存储 Canal 的元数据信息。 |
canal.instance.tsdb.dir | ${canal.file.data.dir}/../tsdb | TSDB 的存储目录。 |
canal.instance.gtidon | false | 是否启用 GTID(全局事务标识符),如果 MySQL 开启了 GTID 模式,需要设置为 true 。 |
canal.metrics.pull.port | 11112 | Canal 的监控指标拉取端口,用于监控 Canal 的运行状态。 |
canal.destinations | 无 | Canal 监听的实例名称列表,多个实例用逗号分隔。 |
canal.file.data.dir | ${canal.conf.dir} | Canal 的数据存储目录,用于存储 binlog 日志的本地缓存。 |
canal.serverMode | tcp | Canal Server 的运行模式,可选值为 tcp (默认)或 kafka (如果需要将数据发送到 Kafka)。 |
canal.mq.servers | 无 | 如果 serverMode 设置为 kafka ,需要配置 Kafka 的地址列表(格式:host1:port1,host2:port2 )。 |
canal.mq.topic | 无 | 如果 serverMode 设置为 kafka ,需要配置 Kafka 的 Topic 名称。 |
canal.instance.memory.buffer.size | 16384 | Canal 实例的内存缓冲区大小(单位:KB),用于缓存 binlog 数据。 |
canal.instance.memory.buffer.memunit | 1024 | Canal 实例的内存缓冲区单位大小(单位:字节),用于控制内存分配的粒度。 |
canal.instance.transaction.size | 1024 | Canal 实例的事务大小(单位:条),用于控制每次提交的事务数量。 |
canal.instance.network.receiveTimeout | 15000 | Canal 实例的网络接收超时时间(单位:毫秒),用于控制 Canal 与 MySQL 的连接超时时间。 |
canal.instance.network.sendTimeout | 30000 | Canal 实例的网络发送超时时间(单位:毫秒),用于控制 Canal 向客户端发送数据的超时时间。 |
canal.instance.network.heartbeatInterval | 60000 |
instance.properties配置说明
配置项 | 默认值 | 说明 |
---|---|---|
canal.instance.master.address | 无 | MySQL 主库地址,格式为 hostname:port ,例如 127.0.0.1:3306 。 |
canal.instance.master.journal.name | 无 | MySQL 主库的 binlog 文件名,用于指定从哪个 binlog 文件开始同步。 |
canal.instance.master.position | 无 | MySQL 主库的 binlog 位点,用于指定从哪个位点开始同步。 |
canal.instance.master.timestamp | 无 | MySQL 主库的时间戳,用于从指定时间点开始同步,Canal 会自动找到对应的 binlog 文件和位点。 |
canal.instance.dbUsername | 无 | MySQL 数据库用户名,用于连接 MySQL 主库。 |
canal.instance.dbPassword | 无 | MySQL 数据库密码,用于连接 MySQL 主库。 |
canal.instance.connectionCharset | UTF-8 | 连接 MySQL 数据库的字符集,通常设置为 UTF-8 。 |
canal.instance.filter.regex | 无 | 正则表达式,用于过滤需要同步的表,例如 .*\\..* 表示同步所有数据库的所有表。 |
canal.instance.filter.black.regex | 无 | 正则表达式,用于过滤不需要同步的表,优先级高于 filter.regex 。 |
canal.instance.tsdb.enable | true | 是否启用 TSDB(时间序列数据库),用于存储 Canal 的位点信息。 |
canal.instance.gtidon | false | 是否启用 GTID 模式,如果 MySQL 开启了 GTID,则设置为 true 。 |
canal.instance.standby.address | 无 | MySQL 从库地址,用于在主库故障时切换到从库。 |
canal.instance.standby.journal.name | 无 | MySQL 从库的 binlog 文件名,用于在主库故障时切换到从库。 |
canal.instance.standby.position | 无 | MySQL 从库的 binlog 位点,用于在主库故障时切换到从库。 |
canal.instance.standby.timestamp | 无 | MySQL 从库的时间戳,用于在主库故障时切换到从库。 |
canal.instance.rds.accesskey | 无 | 阿里云 RDS 的 AccessKey,如果使用阿里云 RDS,需要配置。 |
canal.instance.rds.secretkey | 无 | 阿里云 RDS 的 SecretKey,如果使用阿里云 RDS,需要配置。 |
canal.instance.rds.instanceId | 无 | 阿里云 RDS 的实例 ID,如果使用阿里云 RDS,需要配置。 |
canal.instance.mq.servers | 无 | 消息队列服务器地址,如果需要将 Canal 同步的数据发送到消息队列(如 Kafka、RocketMQ),需要配置。 |
canal.instance.mq.topic | 无 | 消息队列的主题,如果需要将 Canal 同步的数据发送到消息队列,需要配置。 |
canal.instance.mq.partition | 0 | 消息队列的分区,如果需要将 Canal 同步的数据发送到消息队列,可以指定分区。 |
canal.instance.mq.dynamicTopic | 无 | 动态主题配置,支持基于表名或数据库名动态生成主题。 |
canal.instance.mq.partitionHash | 无 | 分区哈希配置,用于指定哈希字段,例如 schema.table:id^name 。 |
canal.instance.enableDruid | false | 是否启用 Druid 数据库连接池,如果启用,Canal 会使用 Druid 管理数据库连接。 |
canal.instance.spring.xml | spring/memory-instance.xml | Spring 配置文件,用于指定 Canal 的实例配置,可选值包括 spring/memory-instance.xml 、spring/file-instance.xml 、spring/default-insta |
二、 CanalAdmin安装
canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作
2.1 CanalAdmin下载
- 打开 GitHub releases 页面:https://github.com/alibaba/canal/releases
- 选择最新的稳定版本,例如
canal-admin-1.1.4
,并下载canal.admin-1.1.4.tar.gz
文件。
# 下载安装
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz#解压
tar -zxvf canal.deployer-1.1.4.tar.gz #解压后目录结构
canal.admin-1.1.4/
├── bin/ # 存放相关命令脚本
├── conf/ # 存放配置文件
├── lib/ # 存放 Canal 所需的所有依赖库
├── logs/ # 存放日志文件
2.2 CanalAdmin配置
#进入解压后的 conf 目录,编辑 application.yml 文件:#修改相关数据库配置
server:port: 8089 # 管理后台端口
spring:datasource:address: 127.0.0.1:3306 # MySQL 地址database: canal_manager # 元数据库名称username: root # MySQL 用户名password: root # MySQL 密码driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
初始化元数据
#conf中有对应的canal_manager.sql里面有相关sql数据
#创建canal_manager
CREATE DATABASE canal_manager CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;#导入canal_manager.sql
2.3 CanalAdmin启动
#进入crm.admin安装目录
#启动服务
sh bin/startup.sh
- 检查日志文件
logs/canal-admin/canal-admin.log
,确认服务启动成功。 - 访问 Web 管理界面:
http://127.0.0.1:8089 - 默认账号密码:
admin/123456
2.4 admin 添加server
修改canal.deployer中conf下的canal.instance,添加管理信息配置,将canal server(canal 服务)交由canal admin(管理页面管理)
#进入canal.deployer-1.1.4 先停止canal服务实例
sh bin/stop.sh#修改其canal.properties 添加CanalAdmin
# canal admin config
canal.admin.manager = 127.0.0.1:8089 #admin管理页面url
canal.admin.port = 11110 #默认端口
canal.admin.user = admin #登陆用户名
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 #密码123456 加密
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =#重新启动canal服务
sh bin/startup.sh
2.5 添加instance
之前我们创建canal后,还添加了一个customer-canal的instance,创建该customer是通过在conf目录下创建customer-canal目录在其目录下创建instance.properties。
但是如果使用canal admin 管理页面后,重启canal服务,就不可以使用创建目录的方式创建instance了(会创建失败)。
需要在admin 的instance管理页面中创建实例并重新启动server
点击保存后,列表页面出现该实例,通过操作 -> 启动 该instance便成功启动。
2.6 admin使用的问题
- 启动问题
# startup.sh 启动 后台运行,且日志信息(错误日志)都丢弃,所以无法排查问题
$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.admin.CanalAdminApplication 1>>/dev/null 2>&1 &
#修改为打印到控制台
$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.admin.CanalAdminApplication
CanalAdmin启动脚本中需要使用-XX:+UseG1GC
G1垃圾回收器,但是我的jdk虽然是1.8 但是是1.8的早期版本会提示
看提示信息需要添加-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC 使用,添加完毕,重新启动成功。
三、canal使用
3.1 canal与 RabbitMQ 集成
将 Canal 的配置文件(conf/canal.properties
)修改为 RabbitMQ /RocketMq模式:
# 设置Canal的服务模式为 RabbitMQ 模式。
canal.serverMode = rabbitMQ
# 指定 RabbitMQ 服务的主机地址
rabbitmq.host = 127.0.0.1
# 指定 RabbitMQ 的虚拟主机(Virtual Host)。
rabbitmq.virtual.host = customer_canal_mq
# 指定 RabbitMQ 的交换机名称。
rabbitmq.exchange = cancal.mq.customer
# 指定连接 RabbitMQ 的用户名。
rabbitmq.username = guest
# 指定连接 RabbitMQ 的秘密。
rabbitmq.password = guest
编辑 conf/customer_canal/instance.properties
,配置 MQ 的 Topic:
# 配置instance.properties
canal.mq.topic = canal.mq.customer.cust_info#如果需要根据数据库或表名动态生成Topic
canal.mq.dynamicTopic = ".*\\..*"# 默认情况下,Canal 发送的消息是 JSON 格式。
#如果需要二进制格式,可以修改 canal.properties:
canal.mq.flatMessage = false# 重新启动canal
sh bin/restart.sh
3.2 canal与Kafka集成
将 Canal 的配置文件(conf/canal.properties
)修改为 Kafka模式:
canal.serverMode = kafka # 设置为Kafka模式
# Kafka配置
canal.mq.servers = localhost:9092 # Kafka服务器地址和端口
canal.mq.topic = canal # Kafka主题名称
canal.mq.partition = 0 # 分区
canal.mq.batch.size = 1000 # 批量发送大小
canal.mq.retries = 3 # 失败后重试次数
canal.mq.linger.ms = 1 # 消息延迟时间
canal.mq.buffer.memory = 33554432 # 消息缓冲区内存大小
canal.mq.compression.type = none # 压缩类型, 可选值为 none, gzip, snappy, lz4, zstd
编辑 conf/customer_canal/instance.properties
,配置 Kafka的 Topic:
#MQ队列名称
canal.mq.topic=canal.topic
#单队列模式的分区下标
canal.mq.partition=0
3.3 canal与RDB集成
这里演示场景为:mysql数据(student)表同步到sqlserver的(Student)表字段 。
安装canal adapter
- 打开 GitHub releases 页面:https://github.com/alibaba/canal/releases
- 选择最新的稳定版本,例如
canal-admin-1.1.4
,并下载canal.adapter-1.1.4.tar.gz
文件。
# 下载安装
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz#解压
tar -zxvf canal.adapter-1.1.4.tar.gz #解压后目录结构
canal.adapter-1.1.4/
├── bin/ # 存放相关命令脚本
├── conf/ # 存放配置文件
├── lib/ # 存放 Canal 所需的所有依赖库
├── logs/ # 存放日志文件
├── plugins/ # plugins依赖包
配置 Canal Adapter
#修改 canal-adapter/conf/application.yml#spring配置
server:port: 8081 # Spring Boot 应用的服务端口号,应用将运行在 http://127.0.0.1:8081spring:jackson:date-format: yyyy-MM-dd HH:mm:ss # 配置 JSON 序列化时日期格式,例如:2023-10-01 12:00:00time-zone: GMT+8 # 配置时区为 GMT+8(中国标准时间)default-property-inclusion: non_null # 配置 JSON 序列化时仅包含非空字段# 数据同步配置
canal.conf:# Canal Server 的主机地址和端口canalServerHost: 127.0.0.1:11111 # Canal Server 的地址,用于监听数据库的 binlog 日志batchSize: 500 # 每次从 Canal Server 拉取的 binlog 事件数量syncBatchSize: 1000 # 每次同步到目标数据库的记录数量retries: 0 # 同步失败时的重试次数,0 表示不重试timeout: # 同步操作的超时时间(未配置具体值,可能使用默认值)mode: tcp # Canal 的运行模式,支持 tcp、kafka、rocketMQ 等# 原数据库(需要监听同步的数据库)srcDataSources:defaultDS: # 默认的数据源配置url: jdbc:mysql://127.0.0.1:3306/mysql_customer?useUnicode=true # MySQL 数据库的连接 URLusername: root # 数据库用户名password: root # 数据库密码canalAdapters: # 配置 Canal Adapter,用于将 Canal 捕获的数据同步到目标数据库或其他存储# canal 的服务实例- instance: customer-canal # Canal 的实例名称,与 Canal Server 配置的实例名称一致groups:- groupId: g1 # 同步任务组的 ID,用于区分不同的同步任务outerAdapters: # 配置外部适配器,用于将数据同步到目标存储- name: rdb # 指定适配器类型为 rdb(关系型数据库)key: mysql1 # 适配器的唯一标识符,与表映射配置文件中的 outerAdapterKey 对应properties:# 配置需要同步的目标数据库信息(SQL Server)jdbc.driverClassName: com.microsoft.sqlserver.jdbc.SQLServerDriver # SQL Server 的 JDBC 驱动类名jdbc.url: jdbc:sqlserver://127.0.0.1\test:1433;databasename=ms_cusomter;encrypt=true;trustServerCertificate=true # SQL Server 数据库的连接 URLjdbc.username: root # SQL Server 数据库用户名jdbc.password: root # SQL Server 数据库密码
- 其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应, properties为目标库jdb的相关参数
- adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件
RDB表映射文件
#修改 conf/rdb/mytest_user.yml文件#配置表的映射文件
dataSourceKey: defaultDS
destination: customer-canal # cannal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
outerAdapterKey: mysql1 # adapter key, 对应上面配置outAdapters中的key
concurrent: true # 是否按主键hash并行同步, 并行同步的表必须保证主键不更改及主键不能为其他同步表的外键!!
dbMapping:database: mysql_cusomter # 源数据源的database/shcematable: student # 源数据源表名targetTable: ms_cusomter.Student # 目标数据源的库名.表名targetPk: # 主键映射id: id_s # 如果是复合主键可以换行映射多个
# mapAll: true # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准) targetColumns: # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填id: id_sname: name_sbirthday: birthdaycommitBatch: 3000 # 批量提交的大小
上述配置将mysql中mysql_cusomter 库中的student 同步到sqlserver的ms_cusomter库中的Student表中。
启动canal-adapter
- 将目标库的jdbc jar包放入lib文件夹, 这里放入ojdbc6.jar (如果是其他数据库则放入对应的驱动)
- 启动canal-adapter启动器
bin/startup.sh
3.4 canal与es集成
配置adapter
#修改 canal-adapter/conf/application.ymlserver:port: 8081 # Spring Boot 应用的服务端口号,应用将运行在 http://127.0.0.1:8081spring:jackson:date-format: yyyy-MM-dd HH:mm:ss # 配置 JSON 序列化时日期格式,例如:2023-10-01 12:00:00time-zone: GMT+8 # 配置时区为 GMT+8(中国标准时间)default-property-inclusion: non_null # 配置 JSON 序列化时仅包含非空字段# 配置 Canal
canal.conf:canalServerHost: 127.0.0.1:11111 # Canal Server 的主机地址和端口,用于监听数据库的 binlog 日志# username: canal # (注释掉)Canal Server 的用户名(如果 Canal Server 配置了认证)# password: 123456 # (注释掉)Canal Server 的密码(如果 Canal Server 配置了认证)batchSize: 500 # 每次从 Canal Server 拉取的 binlog 事件数量syncBatchSize: 1000 # 每次同步到目标存储的记录数量retries: 0 # 同步失败时的重试次数,0 表示不重试timeout: # 同步操作的超时时间(未配置具体值,可能使用默认值)mode: tcp # Canal 的运行模式,支持 tcp、kafka、rocketMQ 等# 源数据库(需要监听同步的数据库)srcDataSources:defaultDS: # 默认的数据源配置url: jdbc:mysql://127.0.0.1:3306/crm_customer?useUnicode=true # MySQL 数据库的连接 URLusername: root # 数据库用户名password: root # 数据库密码canalAdapters: # 配置 Canal Adapter,用于将 Canal 捕获的数据同步到目标存储- instance: customer-canal2 # Canal 的实例名称,与 Canal Server 配置的实例名称一致groups:- groupId: g1 # 同步任务组的 ID,用于区分不同的同步任务outerAdapters: # 配置外部适配器,用于将数据同步到目标存储- name: logger # 适配器类型为 logger,用于打印 binlog 日志,便于调试和查看同步的数据# 配置同步到 Elasticsearch- name: es # 适配器类型为 es(Elasticsearch)# transport模式 127.0.0.1:9300 rest模式 127.0.0.1:9200 hosts: 127.0.0.1:9200 # Elasticsearch 的主机地址和端口,使用 REST 模式连接 properties:mode: rest # Elasticsearch 的连接模式,rest 表示使用 HTTP REST API 连接security.auth: user:password # Elasticsearch 的认证信息,用户名:密码 格式cluster.name: elasticsearch # Elasticsearch 的集群名称
3.5 canal与java客户端集成
-
添加 Maven 依赖
在 Java 项目的
pom.xml
文件中添加 Canal 客户端的依赖:<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version> <!-- 请根据实际需要选择合适的版本 我的服务1.1.4 --> </dependency>
-
编写 Java 代码连接 Canal Server
以下是一个简单的 Java 示例程序,用于演示如何通过 Canal 获取 MySQL 的 binlog 数据并打印相关内容:
package com.xiu.order;import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.*;import java.net.InetSocketAddress; import java.util.List;public class CanalExample {public static void main(String[] args) throws Exception {// 创建 Canal 连接器实例CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "customer-canal2", "", "");try {// 建立连接connector.connect();// 订阅指定数据库表的数据变更(null 表示订阅所有)connector.subscribe(null);// 循环监听数据变化 批量拉取大小int batchSize = 100;while (true) {Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();// 如果没有新消息,则跳过本次循环if (batchId == -1 || message.getEntries().isEmpty()) {//System.out.println("没有接受到数据");continue;}// 遍历解析每条 Entry 数据for (Entry entry : message.getEntries()) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {System.out.println("跳过事务头尾信息");continue;}RowChange rowChange = null;try {// 尝试解析 Canal 捕获的 Binlog 数据,将其反序列化为 RowChange 对象rowChange = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {// 如果解析失败,抛出运行时异常,并包含错误信息和原始的 entry 数据throw new RuntimeException("无法解析日志数据:" + entry, e);}// 获取当前 Binlog 记录的事件类型(如 INSERT、UPDATE、DELETE 等)EventType eventType = rowChange.getEventType();System.out.println("事件类型: " + eventType.toString());// 遍历 RowChange 对象中包含的每一行数据(RowData)for (RowData rowData : rowChange.getRowDatasList()) {// 如果事件类型是 DELETEif (eventType == EventType.DELETE) {// 打印删除操作前的列数据printColumn(rowData.getBeforeColumnsList(), "删除前");}// 如果事件类型是 UPDATEelse if (eventType == EventType.UPDATE) {// 打印更新操作后的列数据printColumn(rowData.getAfterColumnsList(), "更新后");}// 如果事件类型是 INSERTelse if (eventType == EventType.INSERT) {// 打印新增操作后的列数据printColumn(rowData.getAfterColumnsList(), "新增后");}}}// 提交确认已处理的消息connector.ack(batchId);}} finally {// 断开连接connector.disconnect();}}private static void printColumn(List<Column> columns, String prefix) {StringBuilder sb = new StringBuilder(prefix).append(": ");for (Column column : columns) {sb.append(column.getName()).append("=").append(column.getValue()).append(",");}System.out.println(sb.substring(0, sb.length() - 1));} }
-
启动 Canal Server
确保 Canal Server 已经启动并正常运行。
-
运行 Java 客户端
使用 IDE 或命令行工具运行 Java 客户端程序,观察控制台输出,验证是否能够正确接收并解析 MySQL 的 binlog 数据。
参考文档:canal 官方文档