当前位置: 首页 > news >正文

Canal组件学习使用

目录

      • 一、Canal安装
        • 1.1 环境准备
        • 1.2 Canal.deployer 安装
        • 1.3 补充配置说明
      • 二、 CanalAdmin安装
        • 2.1 CanalAdmin下载
        • 2.2 CanalAdmin配置
        • 2.3 CanalAdmin启动
        • 2.4 admin 添加server
        • 2.5 添加instance
        • 2.6 admin使用的问题
      • 三、canal使用
        • 3.1 canal与 RabbitMQ 集成
        • 3.2 canal与Kafka集成
        • 3.3 canal与RDB集成
        • 3.4 canal与es集成
        • 3.5 canal与java客户端集成

一、Canal安装

Canal 是阿里巴巴开源的一个分布式 数据库增量订阅与消费 中间件。它的核心功能是监听数据库的 binlog(MySQL、MariaDB 或其他数据库的二进制日志),并将数据的变更实时地同步到其他系统中,比如消息队列、缓存系统、搜索引擎等。Canal 主要用于实现 数据同步数据迁移实时数据分析 等场景。

Canal 参考数据库的主从复制,将自己模拟成mysql的一个从库slave,从而获取到数据,主要流程可以概括如下:

  1. 连接 MySQL:Canal 通过配置的用户名和密码连接 MySQL 数据库。
  2. 订阅 Binlog:Canal 客户端订阅 MySQL 的 binlog 日志。
  3. 读取 Binlog:Canal 客户端读取 binlog 文件并解析。
  4. 解析 Binlog:Canal 解析 binlog 中的数据变更事件,将其转换为标准格式。
  5. 数据处理:Canal 可以对数据进行过滤、加工、转换等操作。
  6. 同步到目标系统:将处理后的数据同步到指定的目标系统。
1.1 环境准备

Canal 需要 Java 环境,确保已安装 JDK 8 或更高版本。

  • mysql开启binlog
#my.cnf(liunx) my.ini(windows) 开启binlog
[mysqld]
#设置mysql服务的唯一标识,便于区分不同服务实例,用于数据库复制中。
server-id=1024
#开启binlog日志
log-bin=mysql-bin
#设置 MySQL binlog 日志格式为 ROW。
binlog-format=ROW
  • binlog日志格式
格式描述用途适用场景
STATEMENT记录执行的 SQL 语句。每条 SQL 语句会被记录在二进制日志中。适合于大多数常规的操作,尤其是简单的 DML 操作,如 INSERTUPDATEDELETE。因为只记录 SQL 语句,可以减少日志量。适用于不涉及复杂数据变更或需要节省存储空间的场景。对于常见的应用程序和简单数据库操作非常有效。
ROW记录每一行数据的具体变化(即记录行级别的变更)。可以确保复制的精确性,因为它不依赖于 SQL 语句,而是直接记录数据行的变更。当你需要精确的复制或者在数据库中进行复杂操作(如触发器、存储函数)时使用,以确保数据的一致性和正确性。
MIXED混合模式,根据不同的情况自动切换 STATEMENTROW 格式。例如,当使用不安全的 SQL 语句时,使用 ROW 格式。结合了 STATEMENTROW 的优点,根据具体的操作选择合适的日志格式。适用于大多数数据库应用,它结合了 STATEMENT 的低开销和 ROW 的精确性,适应性较强。
  • 验证binlog
SHOW VARIABLES LIKE 'log_bin';
SHOW VARIABLES LIKE 'binlog_format';
  • 创建 Canal 用户
#需要创建 一个可以进行复制、查看复制状态的用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
  1. 创建一个名为 canal 的用户,密码为 canal

  2. 授予该用户从任何主机连接到数据库的权限,并赋予以下特权:

    • 读取所有数据库的权限(SELECT
    • 作为复制从服务器的权限(REPLICATION SLAVE
    • 查看复制状态的权限(REPLICATION CLIENT
1.2 Canal.deployer 安装

Canal.deployer下载

  1. 打开 GitHub releases 页面:https://github.com/alibaba/canal/releases
  2. 选择最新的稳定版本,例如 canal.deployer-1.1.4,并下载 canal.deployer-1.1.4.tar.gz 文件。
# 下载安装
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz#解压
tar -zxvf canal.deployer-1.1.4.tar.gz  #解压后目录结构
canal.deployer-1.1.4/
├── bin/      # 存放相关命令脚本
├── conf/     # 存放配置文件
├── lib/      # 存放 Canal 所需的所有依赖库
├── logs/     # 存放日志文件

Canal配置

  • canal.properties: Canal 服务器的全局配置文件,包含了 Canal 服务端的一些基础设置。
  • instance.properties Canal 的实例配置文件。它定义了每个 Canal 实例的数据同步设置,通常每个数据库实例都会有对应的配置文件。
#进入conf/目录 修改canal.properties 文件#Canal Server 的运行模式,可选值为 `tcp`(默认)或 `kafka`、`RocketMQ`
canal.serverMode = tcp
#Canal 监听的实例名称列表,多个实例用逗号分隔。
# 注意这里我将example 改成customer-canal 所以需要拷贝一份conf/example 改成conf/customer-canal
canal.destinations = customer-canal# 配置canal的缓存池大小和单位,变更的数据超过这个大小,canal不会接收新的变更数据
# 16384KB / 1024 = 16MB
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024 #字节# 配置管理页面 在后面安装crm admin使用,如果单独启动canal实例,为避免影响先注释掉
#canal.admin.manager = 127.0.0.1:8089
#canal.admin.port = 11110
#canal.admin.user = admin
#canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# 进入 进入conf/目录 复制example 为customer-canal
# 修改其中的instance.properties#配置同步的mysql数据信息
# 数据库连接信息
canal.instance.master.address=127.0.0.1:3306        # MySQL 地址
canal.instance.dbUsername=canal                     # MySQL 用户名
canal.instance.dbPassword=canal                     # MySQL 密码# 监听的数据库和表(支持正则表达式)canal.instance.filter.regex=your_database\\.your_table      # 如果只监听某张表,可以具体指定
# canal.instance.filter.regex=your_database\\..*            # 监听指定数据库的所有表
# canal.instance.filter.regex=db1\\..*|db2\\..*             # 监听db1、db2数据库的所有表 监听多个库# 配置
canal.instance.mysql.slaveId=1234

Canal启动

# 启动canal
sh bin/startup.sh#查看日志 customer-canal 是根据canal.destinations = 'customer-canal' 自动生成
tail -f logs/customer-canal/customer-canal.log
1.3 补充配置说明

canal.properties 配置说明

配置项默认值说明
canal.idCanal Server 的唯一标识符,每个 Canal Server 需要设置不同的 ID。
canal.ipCanal Server 的 IP 地址,如果未设置,则 Canal 会自动获取本机 IP。
canal.port11111Canal Server 的监听端口,客户端通过该端口连接 Canal Server。
canal.zkServers如果使用 Zookeeper 管理 Canal 集群,则需要配置 Zookeeper 的地址列表(格式:host1:port1,host2:port2)。
canal.register.autotrue是否自动注册到 Zookeeper,如果是单机模式,可以设置为 false
canal.instance.global.modespringCanal 的全局模式,可选值为 spring(默认)或 standalone
canal.instance.global.spring.xmlclasspath:spring/default.xml全局 Spring 配置文件路径,用于加载 Canal 的 Bean 定义。
canal.instance.tsdb.enabletrue是否启用 TSDB(时间序列数据库),用于存储 Canal 的元数据信息。
canal.instance.tsdb.dir${canal.file.data.dir}/../tsdbTSDB 的存储目录。
canal.instance.gtidonfalse是否启用 GTID(全局事务标识符),如果 MySQL 开启了 GTID 模式,需要设置为 true
canal.metrics.pull.port11112Canal 的监控指标拉取端口,用于监控 Canal 的运行状态。
canal.destinationsCanal 监听的实例名称列表,多个实例用逗号分隔。
canal.file.data.dir${canal.conf.dir}Canal 的数据存储目录,用于存储 binlog 日志的本地缓存。
canal.serverModetcpCanal Server 的运行模式,可选值为 tcp(默认)或 kafka(如果需要将数据发送到 Kafka)。
canal.mq.servers如果 serverMode 设置为 kafka,需要配置 Kafka 的地址列表(格式:host1:port1,host2:port2)。
canal.mq.topic如果 serverMode 设置为 kafka,需要配置 Kafka 的 Topic 名称。
canal.instance.memory.buffer.size16384Canal 实例的内存缓冲区大小(单位:KB),用于缓存 binlog 数据。
canal.instance.memory.buffer.memunit1024Canal 实例的内存缓冲区单位大小(单位:字节),用于控制内存分配的粒度。
canal.instance.transaction.size1024Canal 实例的事务大小(单位:条),用于控制每次提交的事务数量。
canal.instance.network.receiveTimeout15000Canal 实例的网络接收超时时间(单位:毫秒),用于控制 Canal 与 MySQL 的连接超时时间。
canal.instance.network.sendTimeout30000Canal 实例的网络发送超时时间(单位:毫秒),用于控制 Canal 向客户端发送数据的超时时间。
canal.instance.network.heartbeatInterval60000

instance.properties配置说明

配置项默认值说明
canal.instance.master.addressMySQL 主库地址,格式为 hostname:port,例如 127.0.0.1:3306
canal.instance.master.journal.nameMySQL 主库的 binlog 文件名,用于指定从哪个 binlog 文件开始同步。
canal.instance.master.positionMySQL 主库的 binlog 位点,用于指定从哪个位点开始同步。
canal.instance.master.timestampMySQL 主库的时间戳,用于从指定时间点开始同步,Canal 会自动找到对应的 binlog 文件和位点。
canal.instance.dbUsernameMySQL 数据库用户名,用于连接 MySQL 主库。
canal.instance.dbPasswordMySQL 数据库密码,用于连接 MySQL 主库。
canal.instance.connectionCharsetUTF-8连接 MySQL 数据库的字符集,通常设置为 UTF-8
canal.instance.filter.regex正则表达式,用于过滤需要同步的表,例如 .*\\..* 表示同步所有数据库的所有表。
canal.instance.filter.black.regex正则表达式,用于过滤不需要同步的表,优先级高于 filter.regex
canal.instance.tsdb.enabletrue是否启用 TSDB(时间序列数据库),用于存储 Canal 的位点信息。
canal.instance.gtidonfalse是否启用 GTID 模式,如果 MySQL 开启了 GTID,则设置为 true
canal.instance.standby.addressMySQL 从库地址,用于在主库故障时切换到从库。
canal.instance.standby.journal.nameMySQL 从库的 binlog 文件名,用于在主库故障时切换到从库。
canal.instance.standby.positionMySQL 从库的 binlog 位点,用于在主库故障时切换到从库。
canal.instance.standby.timestampMySQL 从库的时间戳,用于在主库故障时切换到从库。
canal.instance.rds.accesskey阿里云 RDS 的 AccessKey,如果使用阿里云 RDS,需要配置。
canal.instance.rds.secretkey阿里云 RDS 的 SecretKey,如果使用阿里云 RDS,需要配置。
canal.instance.rds.instanceId阿里云 RDS 的实例 ID,如果使用阿里云 RDS,需要配置。
canal.instance.mq.servers消息队列服务器地址,如果需要将 Canal 同步的数据发送到消息队列(如 Kafka、RocketMQ),需要配置。
canal.instance.mq.topic消息队列的主题,如果需要将 Canal 同步的数据发送到消息队列,需要配置。
canal.instance.mq.partition0消息队列的分区,如果需要将 Canal 同步的数据发送到消息队列,可以指定分区。
canal.instance.mq.dynamicTopic动态主题配置,支持基于表名或数据库名动态生成主题。
canal.instance.mq.partitionHash分区哈希配置,用于指定哈希字段,例如 schema.table:id^name
canal.instance.enableDruidfalse是否启用 Druid 数据库连接池,如果启用,Canal 会使用 Druid 管理数据库连接。
canal.instance.spring.xmlspring/memory-instance.xmlSpring 配置文件,用于指定 Canal 的实例配置,可选值包括 spring/memory-instance.xmlspring/file-instance.xmlspring/default-insta

二、 CanalAdmin安装

canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

2.1 CanalAdmin下载
  1. 打开 GitHub releases 页面:https://github.com/alibaba/canal/releases
  2. 选择最新的稳定版本,例如 canal-admin-1.1.4,并下载 canal.admin-1.1.4.tar.gz 文件。
# 下载安装
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz#解压
tar -zxvf canal.deployer-1.1.4.tar.gz  #解压后目录结构
canal.admin-1.1.4/
├── bin/      # 存放相关命令脚本
├── conf/     # 存放配置文件
├── lib/      # 存放 Canal 所需的所有依赖库
├── logs/     # 存放日志文件
2.2 CanalAdmin配置
#进入解压后的 conf 目录,编辑 application.yml 文件:#修改相关数据库配置
server:port: 8089  # 管理后台端口
spring:datasource:address: 127.0.0.1:3306  # MySQL 地址database: canal_manager  # 元数据库名称username: root           # MySQL 用户名password: root           # MySQL 密码driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false

初始化元数据

#conf中有对应的canal_manager.sql里面有相关sql数据
#创建canal_manager
CREATE DATABASE canal_manager CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;#导入canal_manager.sql 
2.3 CanalAdmin启动
#进入crm.admin安装目录 
#启动服务
sh bin/startup.sh
  • 检查日志文件 logs/canal-admin/canal-admin.log,确认服务启动成功。
  • 访问 Web 管理界面:
    http://127.0.0.1:8089
  • 默认账号密码:admin/123456
    在这里插入图片描述
2.4 admin 添加server

修改canal.deployer中conf下的canal.instance,添加管理信息配置,将canal server(canal 服务)交由canal admin(管理页面管理)

#进入canal.deployer-1.1.4 先停止canal服务实例
sh bin/stop.sh#修改其canal.properties 添加CanalAdmin
# canal admin config
canal.admin.manager = 127.0.0.1:8089  #admin管理页面url
canal.admin.port = 11110   #默认端口
canal.admin.user = admin   #登陆用户名
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 #密码123456 加密
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster =#重新启动canal服务
sh bin/startup.sh

在这里插入图片描述

2.5 添加instance

之前我们创建canal后,还添加了一个customer-canal的instance,创建该customer是通过在conf目录下创建customer-canal目录在其目录下创建instance.properties。
在这里插入图片描述

但是如果使用canal admin 管理页面后,重启canal服务,就不可以使用创建目录的方式创建instance了(会创建失败)。
在这里插入图片描述
需要在admin 的instance管理页面中创建实例并重新启动server

在这里插入图片描述

点击保存后,列表页面出现该实例,通过操作 -> 启动 该instance便成功启动。
在这里插入图片描述

2.6 admin使用的问题
  1. 启动问题
# startup.sh 启动 后台运行,且日志信息(错误日志)都丢弃,所以无法排查问题
$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.admin.CanalAdminApplication 1>>/dev/null 2>&1 &
#修改为打印到控制台
$JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.admin.CanalAdminApplication

CanalAdmin启动脚本中需要使用-XX:+UseG1GCG1垃圾回收器,但是我的jdk虽然是1.8 但是是1.8的早期版本会提示
在这里插入图片描述
看提示信息需要添加-XX:+UnlockExperimentalVMOptions -XX:+UseG1GC 使用,添加完毕,重新启动成功。

三、canal使用

3.1 canal与 RabbitMQ 集成

将 Canal 的配置文件(conf/canal.properties)修改为 RabbitMQ /RocketMq模式:

# 设置Canal的服务模式为 RabbitMQ 模式。
canal.serverMode = rabbitMQ
# 指定 RabbitMQ 服务的主机地址
rabbitmq.host = 127.0.0.1
# 指定 RabbitMQ 的虚拟主机(Virtual Host)。
rabbitmq.virtual.host = customer_canal_mq
# 指定 RabbitMQ 的交换机名称。
rabbitmq.exchange = cancal.mq.customer
# 指定连接 RabbitMQ 的用户名。
rabbitmq.username = guest
# 指定连接 RabbitMQ 的秘密。
rabbitmq.password = guest

编辑 conf/customer_canal/instance.properties,配置 MQ 的 Topic:

# 配置instance.properties
canal.mq.topic = canal.mq.customer.cust_info#如果需要根据数据库或表名动态生成Topic
canal.mq.dynamicTopic = ".*\\..*"# 默认情况下,Canal 发送的消息是 JSON 格式。
#如果需要二进制格式,可以修改 canal.properties:
canal.mq.flatMessage = false# 重新启动canal
sh bin/restart.sh
3.2 canal与Kafka集成

将 Canal 的配置文件(conf/canal.properties)修改为 Kafka模式:

canal.serverMode = kafka                          # 设置为Kafka模式
# Kafka配置
canal.mq.servers = localhost:9092                  # Kafka服务器地址和端口
canal.mq.topic = canal                              # Kafka主题名称
canal.mq.partition = 0                              # 分区
canal.mq.batch.size = 1000                         # 批量发送大小
canal.mq.retries = 3                               # 失败后重试次数
canal.mq.linger.ms = 1                             # 消息延迟时间
canal.mq.buffer.memory = 33554432                  # 消息缓冲区内存大小
canal.mq.compression.type = none                    # 压缩类型, 可选值为 none, gzip, snappy, lz4, zstd

编辑 conf/customer_canal/instance.properties,配置 Kafka的 Topic:

#MQ队列名称
canal.mq.topic=canal.topic
#单队列模式的分区下标
canal.mq.partition=0
3.3 canal与RDB集成

这里演示场景为:mysql数据(student)表同步到sqlserver的(Student)表字段 。

安装canal adapter

  1. 打开 GitHub releases 页面:https://github.com/alibaba/canal/releases
  2. 选择最新的稳定版本,例如 canal-admin-1.1.4,并下载 canal.adapter-1.1.4.tar.gz 文件。
# 下载安装
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz#解压
tar -zxvf canal.adapter-1.1.4.tar.gz  #解压后目录结构
canal.adapter-1.1.4/
├── bin/      # 存放相关命令脚本
├── conf/     # 存放配置文件
├── lib/      # 存放 Canal 所需的所有依赖库
├── logs/     # 存放日志文件
├── plugins/  # plugins依赖包

配置 Canal Adapter

#修改 canal-adapter/conf/application.yml#spring配置
server:port: 8081  # Spring Boot 应用的服务端口号,应用将运行在 http://127.0.0.1:8081spring:jackson:date-format: yyyy-MM-dd HH:mm:ss  # 配置 JSON 序列化时日期格式,例如:2023-10-01 12:00:00time-zone: GMT+8  # 配置时区为 GMT+8(中国标准时间)default-property-inclusion: non_null  # 配置 JSON 序列化时仅包含非空字段# 数据同步配置
canal.conf:# Canal Server 的主机地址和端口canalServerHost: 127.0.0.1:11111  # Canal Server 的地址,用于监听数据库的 binlog 日志batchSize: 500  # 每次从 Canal Server 拉取的 binlog 事件数量syncBatchSize: 1000  # 每次同步到目标数据库的记录数量retries: 0  # 同步失败时的重试次数,0 表示不重试timeout:  # 同步操作的超时时间(未配置具体值,可能使用默认值)mode: tcp  # Canal 的运行模式,支持 tcp、kafka、rocketMQ 等# 原数据库(需要监听同步的数据库)srcDataSources:defaultDS:  # 默认的数据源配置url: jdbc:mysql://127.0.0.1:3306/mysql_customer?useUnicode=true  # MySQL 数据库的连接 URLusername: root  # 数据库用户名password: root  # 数据库密码canalAdapters:  # 配置 Canal Adapter,用于将 Canal 捕获的数据同步到目标数据库或其他存储# canal 的服务实例- instance: customer-canal  # Canal 的实例名称,与 Canal Server 配置的实例名称一致groups:- groupId: g1  # 同步任务组的 ID,用于区分不同的同步任务outerAdapters:  # 配置外部适配器,用于将数据同步到目标存储- name: rdb  # 指定适配器类型为 rdb(关系型数据库)key: mysql1  # 适配器的唯一标识符,与表映射配置文件中的 outerAdapterKey 对应properties:# 配置需要同步的目标数据库信息(SQL Server)jdbc.driverClassName: com.microsoft.sqlserver.jdbc.SQLServerDriver  # SQL Server 的 JDBC 驱动类名jdbc.url: jdbc:sqlserver://127.0.0.1\test:1433;databasename=ms_cusomter;encrypt=true;trustServerCertificate=true  # SQL Server 数据库的连接 URLjdbc.username: root  # SQL Server 数据库用户名jdbc.password: root  # SQL Server 数据库密码                 
  1. 其中 outAdapter 的配置: name统一为rdb, key为对应的数据源的唯一标识需和下面的表映射文件中的outerAdapterKey对应, properties为目标库jdb的相关参数
  2. adapter将会自动加载 conf/rdb 下的所有.yml结尾的表映射配置文件

RDB表映射文件

#修改 conf/rdb/mytest_user.yml文件#配置表的映射文件
dataSourceKey: defaultDS                   
destination: customer-canal             # cannal的instance或者MQ的topic
groupId: g1                             # 对应MQ模式下的groupId, 只会同步对应groupId的数据
outerAdapterKey: mysql1 								# adapter key, 对应上面配置outAdapters中的key
concurrent: true												# 是否按主键hash并行同步, 并行同步的表必须保证主键不更改及主键不能为其他同步表的外键!!
dbMapping:database: mysql_cusomter                 # 源数据源的database/shcematable: student                           # 源数据源表名targetTable: ms_cusomter.Student         # 目标数据源的库名.表名targetPk:       											   # 主键映射id: id_s															 # 如果是复合主键可以换行映射多个
#  mapAll: true													 	 # 是否整表映射, 要求源表和目标表字段名一模一样 (如果targetColumns也配置了映射,则以targetColumns配置为准)				targetColumns:                           # 字段映射, 格式: 目标表字段: 源表字段, 如果字段名一样源表字段名可不填id: id_sname: name_sbirthday: birthdaycommitBatch: 3000 # 批量提交的大小

上述配置将mysql中mysql_cusomter 库中的student 同步到sqlserver的ms_cusomter库中的Student表中。

启动canal-adapter

  • 将目标库的jdbc jar包放入lib文件夹, 这里放入ojdbc6.jar (如果是其他数据库则放入对应的驱动)
  • 启动canal-adapter启动器
bin/startup.sh
3.4 canal与es集成

配置adapter

#修改 canal-adapter/conf/application.ymlserver:port: 8081  # Spring Boot 应用的服务端口号,应用将运行在 http://127.0.0.1:8081spring:jackson:date-format: yyyy-MM-dd HH:mm:ss  # 配置 JSON 序列化时日期格式,例如:2023-10-01 12:00:00time-zone: GMT+8  # 配置时区为 GMT+8(中国标准时间)default-property-inclusion: non_null  # 配置 JSON 序列化时仅包含非空字段# 配置 Canal
canal.conf:canalServerHost: 127.0.0.1:11111  # Canal Server 的主机地址和端口,用于监听数据库的 binlog 日志# username: canal  # (注释掉)Canal Server 的用户名(如果 Canal Server 配置了认证)# password: 123456  # (注释掉)Canal Server 的密码(如果 Canal Server 配置了认证)batchSize: 500  # 每次从 Canal Server 拉取的 binlog 事件数量syncBatchSize: 1000  # 每次同步到目标存储的记录数量retries: 0  # 同步失败时的重试次数,0 表示不重试timeout:  # 同步操作的超时时间(未配置具体值,可能使用默认值)mode: tcp  # Canal 的运行模式,支持 tcp、kafka、rocketMQ 等# 源数据库(需要监听同步的数据库)srcDataSources:defaultDS:  # 默认的数据源配置url: jdbc:mysql://127.0.0.1:3306/crm_customer?useUnicode=true  # MySQL 数据库的连接 URLusername: root  # 数据库用户名password: root  # 数据库密码canalAdapters:  # 配置 Canal Adapter,用于将 Canal 捕获的数据同步到目标存储- instance: customer-canal2  # Canal 的实例名称,与 Canal Server 配置的实例名称一致groups:- groupId: g1  # 同步任务组的 ID,用于区分不同的同步任务outerAdapters:  # 配置外部适配器,用于将数据同步到目标存储- name: logger  # 适配器类型为 logger,用于打印 binlog 日志,便于调试和查看同步的数据# 配置同步到 Elasticsearch- name: es  # 适配器类型为 es(Elasticsearch)# transport模式 127.0.0.1:9300  rest模式  127.0.0.1:9200 hosts: 127.0.0.1:9200  # Elasticsearch 的主机地址和端口,使用 REST 模式连接 properties:mode: rest  # Elasticsearch 的连接模式,rest 表示使用 HTTP  REST API 连接security.auth: user:password  # Elasticsearch 的认证信息,用户名:密码 格式cluster.name: elasticsearch  # Elasticsearch 的集群名称
3.5 canal与java客户端集成
  1. 添加 Maven 依赖

    在 Java 项目的 pom.xml 文件中添加 Canal 客户端的依赖:

    <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version> <!-- 请根据实际需要选择合适的版本 我的服务1.1.4 -->
    </dependency>
    
  2. 编写 Java 代码连接 Canal Server

    以下是一个简单的 Java 示例程序,用于演示如何通过 Canal 获取 MySQL 的 binlog 数据并打印相关内容:

    package com.xiu.order;import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.Message;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;import java.net.InetSocketAddress;
    import java.util.List;public class CanalExample {public static void main(String[] args) throws Exception {// 创建 Canal 连接器实例CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "customer-canal2", "", "");try {// 建立连接connector.connect();// 订阅指定数据库表的数据变更(null 表示订阅所有)connector.subscribe(null);// 循环监听数据变化 批量拉取大小int batchSize = 100;while (true) {Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();// 如果没有新消息,则跳过本次循环if (batchId == -1 || message.getEntries().isEmpty()) {//System.out.println("没有接受到数据");continue;}// 遍历解析每条 Entry 数据for (Entry entry : message.getEntries()) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {System.out.println("跳过事务头尾信息");continue;}RowChange rowChange = null;try {// 尝试解析 Canal 捕获的 Binlog 数据,将其反序列化为 RowChange 对象rowChange = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {// 如果解析失败,抛出运行时异常,并包含错误信息和原始的 entry 数据throw new RuntimeException("无法解析日志数据:" + entry, e);}// 获取当前 Binlog 记录的事件类型(如 INSERT、UPDATE、DELETE 等)EventType eventType = rowChange.getEventType();System.out.println("事件类型: " + eventType.toString());// 遍历 RowChange 对象中包含的每一行数据(RowData)for (RowData rowData : rowChange.getRowDatasList()) {// 如果事件类型是 DELETEif (eventType == EventType.DELETE) {// 打印删除操作前的列数据printColumn(rowData.getBeforeColumnsList(), "删除前");}// 如果事件类型是 UPDATEelse if (eventType == EventType.UPDATE) {// 打印更新操作后的列数据printColumn(rowData.getAfterColumnsList(), "更新后");}// 如果事件类型是 INSERTelse if (eventType == EventType.INSERT) {// 打印新增操作后的列数据printColumn(rowData.getAfterColumnsList(), "新增后");}}}// 提交确认已处理的消息connector.ack(batchId);}} finally {// 断开连接connector.disconnect();}}private static void printColumn(List<Column> columns, String prefix) {StringBuilder sb = new StringBuilder(prefix).append(": ");for (Column column : columns) {sb.append(column.getName()).append("=").append(column.getValue()).append(",");}System.out.println(sb.substring(0, sb.length() - 1));}
    }
    
  3. 启动 Canal Server

    确保 Canal Server 已经启动并正常运行。

  4. 运行 Java 客户端

    使用 IDE 或命令行工具运行 Java 客户端程序,观察控制台输出,验证是否能够正确接收并解析 MySQL 的 binlog 数据。

参考文档:canal 官方文档

相关文章:

  • FreeSWITCH中SIP网关(Gateway)操作
  • 方德桌面操作系统V5.0-G23安装Anaconda
  • Multi Agents Collaboration OS:文档合规性及质量检测助手设计及实践
  • Vue3 计算属性与侦听器深度解析:优雅处理响应式数据引言
  • 使用 Vue 开发登录页面的完整指南
  • 经济指标学习(二)
  • 方案解读:虚拟电厂标杆项目整体建设方案【附全文阅读】
  • HarmonyOS:1.4 - HarmonyOS应用程序框架基础
  • QCPRange Class参考
  • 故障诊断常用算法
  • 深入理解 Transformer:从原理解析到文本生成实践
  • 【失败】Gnome将默认终端设置为 Kitty
  • string函数具体事例
  • FastGPT安装前,系统环境准备工作?
  • 浅析MySQL事务锁
  • win11系统截图的几种方式
  • 我的gittee仓库
  • Ubuntu安装MySQL步骤及注意事项
  • `peft` 和 `transformers` 库 实现 LoRA的 内部计算流程
  • 如何选择适合您的过程控制器?
  • 中国政府援缅第七批抗震救灾物资运抵交付
  • 错失两局领先浪费赛点,王楚钦不敌雨果无缘世界杯男单决赛
  • 央视网评论员:婚约不是性许可——山西订婚强奸案背后的性教育盲区
  • 网络社群的早期历史及其启示
  • 人民网评:官方轻踩刹车,智能驾驶不能“蒙眼狂奔”
  • 平安银行一季度净赚超140亿元降5.6%,营收降13.1%