当前位置: 首页 > news >正文

速通FlinkCDC3.0

1.FlinkCDC概述

1.1FlinkCDC是什么?

        FlinkCDC(Flink Change Data Capture)是一个用于实时捕获数据库变更日志的工具,它可以将数据库的变更实时同步到ApacheFlink系统中。

1.2 FlinkCDC的三个版本?

        1.x 这个版本的FlinkCDC的提供了DataStream以及FlinkSQL的方式实现数据的动态获取

        存在的问题:

        就是在生产环境中,我们在同步环境的时候,万一前脚刚同步了数据,后脚就被修改了怎么办呢?这样我们不就是读到了,不正确的数据了。因此,在FlinkCDC1.x的版本中的解决方案是,在读表的过程中,锁住整张表,这时候不会有新的数据写入了。但是由此又带来了一个新的问题,生产环境中时时刻刻就是会有新的数据写入的,如果锁住整张表,就会对线上产生很多问题。所以迎来了2.x版本。

        2.x 这个版本提供了丰富的数据库对接以及增加全量的同步锁表的解决问题的解决方案。

        提供API或者FlinkSql去进行操作,打包代码上传到集群去进行操作,但是我们的本身任务并不复杂,就是一个导数据的任务,所以需要更简单的方法去实现数据导入的作用。

        3.x 这个版本提供了StreamingETL方式导入数据方案。 

        FlinkCDC在这个版本形成了自己的框架,可以像平时的那些框架文件如果spark,hadoop一样又bin,conf等文件夹,所以我们在使用FlinkCDC的时候就是可以直接在Conf中配置Resource(要导入的数据库)sink(目标文件),可以通过命令启动来进行同步。

顺带提一下两种CDC的同步方式:

        CDC一种是通过查询的方式和通过Binlog两种方式,简单说一下两种的不同

cdc对比
基于查询的CDC基于binlog的cdc
产品

Sqoop、DataX

Canal
执行模式BatchStreaming
是否可以检测到所有变化否(同步最终态)
延迟性延迟高(按天进行同步)低延迟
增加数据库压力

2.flinkCDC 同步mysql数据库数据到doris

        2.1 环境准备

           1)安装FlinkCDC

               flinkCDC下载地址 https://pan.baidu.com/s/1_BKPxommK5dsY3hD7rYVUA 提取码: pisv 

tar -zxvf flink-cdc-3.0.0-bin.tar.gz -C /opt/module/

        2)向FlinkCDC的目录下的lib目录下传入Mysql以及Doris 的依赖包

        doris的jar包
        https://pan.baidu.com/s/1pgtsYT9VyXD1U4RbjYA6rg 提取码: kx2q 
        mysql的jar包
        https://pan.baidu.com/s/1pxCy0-iSutqN9YjdzGAfZw 提取码: p65d 
        还需要一个mysql-connector的jar
        https://pan.baidu.com/s/1lzJuQRPL3KtDqDXaoGBSMQ 提取码: dwnw 

        为啥还需要已经有了mysql的jar包了还需要一个mysql-connector?

        是因为mysql的jar包依赖于mysql-connector,

        为啥不封装在一起?

        首先来说就是MySQL的jar相当于对数据库一个能力的封装底层可以用别的connector,也是为了解耦,还有一个原因就是两个包所用的协议不一样,上面的这个msyql的jar包是用的apache的协议。  

        为啥用了mysql的驱动包,不用doris的驱动包?

        因为doris兼容mysql的协议。

        2.2 同步变更

        编写 MySQL同步到doris的配置文件

        可以选择在FlinkCDC中创建一个单独的文件夹写配置文件,也可以写在conf的目录下。

vim mysql-to-doris.yaml

source:

 #数据源的数据库类型

  type: mysql

 #地址/主机名称

  hostname: hadoop103

 #端口号

  port: 3306

 #数据库用户名

  username: root

  #数据库密码

  password: "000000"

  #要同步的表名

  tables: test.\.*

  #ServerID 下面详细解释

  server-id: 5400-5404

 #时区

  server-time-zone: UTC+8

sink:

  #目标数据库类型

  type: doris

  #目标数据库物理存储主机名加端口号

  fenodes: hadoop102:7030

  #数据库用户名

  username: root

  #数据库密码

  password: "000000"

 #是否同步表的初始变化,就是类似新增字段之类的

  table.create.properties.light_schema_change: true

 #副本数

  table.create.properties.replication_num: 1

pipeline:

  #任务名称

  name: Sync MySQL Database to Doris

  #并行任务数量

  parallelism: 1

server-id 的作用:
MySQL复制标识:在MySQL主从复制中,每个从库必须有一个唯一的server-id来标识自己。同样,当你的CDC工具连接MySQL时,它实际上扮演了一个MySQL从库的角色,通过binlog来获取数据变更。

避免冲突:如果你有多个CDC工具或从库连接同一个MySQL主库,每个实例必须有不同的server-id,否则会导致冲突和数据不一致。

        这种配置方式通常在分布式或并行环境中使用,允许多个任务实例使用不同的server-id(在你的配置中parallelism: 4表示并行度为4,所以需要4个不同的server-id)。

        启动环境

        1)开启Flink集群

        首先要添加如下配置

vim conf/flink-conf.yaml

添加如下配置信息

execution.checkpointing.interval: 5000

#启动集群
bin/start-cluster.sh

        2)开启doris的FE

bin/start_fe.sh

        3)  开启Doris的BE

bin/start_be.sh

        4)启动FlinkCDC同步变更任务

        尚硅谷给的是这个命令,但是我用这个命令不行

flink-cdc-3.0.0]$ bin/flink-cdc.sh job/mysql-to-doris.yaml

        我用的这个可以

bin/flink-cdc.sh config/你的配置文件 --jar lib/mysql....

        然后刷新数据库观察结果

        以上情况适用于就是我们的主库mysql的数据库名.表名,在doris中的数据库名.表名是一样。如果doris中的表名不一样就用到下面的路由变更。

2.3路由变更

ource:

  type: mysql

  hostname: hadoop103

  port: 3306

  username: root

  password: "000000"

  tables: test_route.\.*

  server-id: 5400-5404

  server-time-zone: UTC+8

sink:

  type: doris

  fenodes: hadoop102:7030

  benodes: hadoop102:7040

  username: root

  password: "000000"

  table.create.properties.light_schema_change: true

  table.create.properties.replication_num: 1

#增加了路由规则

route:

  - source-table: test_route.t1

    sink-table: doris_test_route.doris_t1

  - source-table: test_route.t2

    sink-table: doris_test_route.doris_t1

  - source-table: test_route.t3

    sink-table: doris_test_route.doris_t3

pipeline:

  name: Sync MySQL Database to Doris

  parallelism: 1

相关文章:

  • MongoDB数据库的安装到入门使用详细讲解
  • HTTP 和 HTTPS 有什么区别?
  • 负载均衡与实时调度—LSF
  • 解决Mac 安装 PyICU 依赖失败
  • Centos9 安装 nginx 及配置
  • 【React】搜索时高亮被搜索选中的文案
  • 算法工程师面试题与参考答案资料(2025年版)
  • C++算法(10):二叉树的高度与深度,(C++代码实战)
  • Java 泛型使用教程
  • Netty前置基础知识之BIO、NIO以及AIO理论详细解析和实战案例
  • 使用PyTorch实现图像增广与模型训练实战
  • RESTful学习笔记(二)---简单网页前后端springboot项目搭建
  • uni-app 状态管理深度解析:Vuex 与全局方案实战指南
  • 【C++软件实战问题排查经验分享】UI界面卡顿 | CPU占用高 | GDI对象泄漏 | 线程堵塞 系列问题排查总结
  • 如何维护技术文档的持续更新?
  • 【Unity笔记】Unity音视频播放监听器封装笔记:VideoPlayer + AudioSource事件触发与编辑器扩展
  • 微软Entra新安全功能引发大规模账户锁定事件
  • GeoAI技术内涵与城市计算
  • 目标检测:视觉系统中的CNN-Transformer融合网络
  • 从代码学习深度学习 - 学习率调度器 PyTorch 版
  • 教育部增设29种本科新专业,首建战略急需专业超常设置机制
  • 国家税务总局镇江市税务局原纪检组组长朱永凯接受审查调查
  • 全国登记在册民营企业超过5700万户,占企业总量92.3%
  • 北京理工大学:教师宫某涉嫌师德失范,暂停其一切职务活动
  • 徐州沛县一村委会因无资质处理固废,被环保部门罚款19万元
  • 寺庙餐饮,被年轻人追捧成新顶流