记录seatunnel排查重复数据的案例分析
文章目录
- 背景
- 分析
- 检查现象
- 检查B集群是否有异常,导致重复消费的
- 分析同步任务
- 修复问题
- 发现flink job 一直报异常
- 修复问题
背景
使用seatunnel 同步数据从A 集群kafka 同步到B集群kafka,现象是发现两边数据不一致,每天10w级别会多几十条数据
分析
检查现象
因为两侧kafka的数据同时也会写es,先检查两侧es的数据, 通过二分发现,B集群es数据确实比A集群多,多的数据检查发现是重复的数据,有记录被写了多次
检查B集群是否有异常,导致重复消费的
检查日志发现有没有写入失败,导致一批数据被重复消费的,从日志来看是没有的
分析同步任务
检查seatunnel 任务,发现没有配置semantics ,然后发又检查了客户现场的flink job ,确定有经常任务会有环境导致的重启现象,应该问题就是这了
sink {kafka {topic = "test_topic"bootstrap.servers = "localhost:9092"format = jsonkafka.request.timeout.ms = 60000semantics = EXACTLY_ONCEkafka.config = {acks = "all"request.timeout.ms = 60000buffer.memory = 33554432}}
修复问题
加上 semantics = EXACTLY_ONCE
sink {kafka {topic = "test_topic"bootstrap.servers = "localhost:9092"format = jsonkafka.request.timeout.ms = 60000kafka.config = {acks = "all"request.timeout.ms = 60000buffer.memory = 33554432}}
发现flink job 一直报异常
最后问题定位到这,官方bug导致,升级最新版本修复
sink 在一致性语义情况下报异常
修复问题
只修改sink一致性语义是不够的,还要消费b 集群kafka 的客户端的事务配置是,read_commited的