当前位置: 首页 > news >正文

Springboot集成Debezium监听postgresql变更

在这里插入图片描述

1.创建springboot项目引入pom

<dependencies>
   <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-embedded</artifactId>
        <version>1.4.2.Final</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-postgres</artifactId>
        <version>1.4.2.Final</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-api</artifactId>
        <version>1.4.2.Final</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>


    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>

    <dependency>
        <groupId>com.alibaba.fastjson2</groupId>
        <artifactId>fastjson2</artifactId>
        <version>2.0.43</version>
    </dependency>

</dependencies>

2.application.properties配置

# Debezium Configuration
debezium.name=my-postgres-connector
debezium.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
debezium.offset.storage.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\offsets_0.dat
debezium.offset.flush.interval.ms=60000
debezium.database.hostname=192.168.159.103
debezium.database.port=15432
debezium.database.user=postgres
debezium.database.password=123456
debezium.database.dbname=db_test
debezium.database.server.id=12345
debezium.database.server.name=customer-postgres-db-server
debezium.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.database.history.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\history_0.dat
debezium.table.include.list=public.user
debezium.column.include.list=public.user.id,public.user.name
debezium.publication.autocreate.mode=filtered
debezium.plugin.name=pgoutput
debezium.slot.name=dbz_customerdb_listener

3.配置类:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.io.IOException;

@Configuration
public class DebeziumConnectorConfig {

    @Bean
    public io.debezium.config.Configuration customerConnector(Environment env) throws IOException {
        return io.debezium.config.Configuration.create()
            .with("name", env.getProperty("debezium.name"))
            .with("connector.class", env.getProperty("debezium.connector.class"))
            .with("offset.storage", env.getProperty("debezium.offset.storage"))
            .with("offset.storage.file.filename", env.getProperty("debezium.offset.storage.file.filename"))
            .with("offset.flush.interval.ms", env.getProperty("debezium.offset.flush.interval.ms"))
            .with("database.hostname", env.getProperty("debezium.database.hostname"))
            .with("database.port", env.getProperty("debezium.database.port"))
            .with("database.user", env.getProperty("debezium.database.user"))
            .with("database.password", env.getProperty("debezium.database.password"))
            .with("database.dbname", env.getProperty("debezium.database.dbname"))
            .with("database.server.id", env.getProperty("debezium.database.server.id"))
            .with("database.server.name", env.getProperty("debezium.database.server.name"))
            //.with("database.history", "io.debezium.relational.history.MemoryDatabaseHistory")
            .with("database.history", env.getProperty("debezium.database.history"))
            .with("database.history.file.filename", env.getProperty("debezium.database.history.file.filename"))
            .with("table.include.list", env.getProperty("debezium.table.include.list")) //表名
            .with("column.include.list", env.getProperty("debezium.column.include.list")) // 表中得哪些字段
            .with("publication.autocreate.mode", env.getProperty("debezium.publication.autocreate.mode"))
            .with("plugin.name", env.getProperty("debezium.plugin.name"))
            .with("slot.name", env.getProperty("debezium.slot.name"))
            .build();
    }
}

4.注册监听

import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;

@Slf4j
@Component
public class DebeziumListener {

    private final Executor executor = Executors.newSingleThreadExecutor();
    private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

    public DebeziumListener(Configuration customerConnectorConfiguration) {
        this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
            .using(customerConnectorConfiguration.asProperties())
            .notifying(this::handleChangeEvent)
            .build();
    }

    private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
        SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();

        log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());
        Struct sourceRecordChangeValue= (Struct) sourceRecord.value();
        //log.info("SourceRecordChangeValue = '{}'",sourceRecordRecordChangeEvent);
         if (sourceRecordChangeValue != null) {
             Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

             // 处理非读操作
             if(operation != Envelope.Operation.READ) {
                 String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;

                 Struct struct = (Struct) sourceRecordChangeValue.get(record);
                 Map<String, Object> payload = struct.schema().fields().stream()
                     .map(Field::name)
                     .filter(fieldName -> struct.get(fieldName) != null)
                     .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                     .collect(toMap(Pair::getKey, Pair::getValue));

                 // this.customerService.replicateData(payload, operation);
                 log.info("Updated Data: {} with Operation: {}", payload, operation.name());
             }
         }
    }

    @PostConstruct
    private void start() {
        this.executor.execute(debeziumEngine);
    }

    @PreDestroy
    private void stop() throws IOException {
        if (Objects.nonNull(this.debeziumEngine)) {
            this.debeziumEngine.close();
        }
    }

}

相关文章:

  • CQL学习
  • 游戏引擎学习第177天
  • 996引擎-接口测试:背包
  • pnpm 报错 Error: Cannot find matching keyid 解决
  • Mybatis的基础操作——03
  • 西交建筑学本科秋天毕业想转码,自学了Python+408,华为OD社招还是考研更香?
  • 第十四章:模板实例化_《C++ Templates》notes
  • 如何编写SLURM系统的GRES资源插件
  • Lustre 语言的 Rust 生成相关的工作
  • Autosar OS配置-Timing Protection配置及实现--基于ETAS工具
  • 题单:精挑细选
  • 生物化学笔记:医学免疫学原理02 抗原概念+免疫应答+抗原的分类
  • SQL语言——MySQL
  • MuJoCo 仿真 Panda 机械臂!末端位置实时追踪 + 可视化(含缩放交互)
  • 系统架构书单推荐(一)领域驱动设计与面向对象
  • pycharm快捷键汇总(持续更新)
  • 神聖的綫性代數速成例題12. 齊次方程組零解充要條件、其齊次方程組非零解、 齊次方程組基礎解系
  • SHELL练习01
  • Resume全栈项目(.NET)
  • Servlet、HttpServletRequest、HttpServletResponse、静态与动态网页、jsp、重定向与转发
  • 商务部新闻发言人就波音公司飞回拟交付飞机答记者问
  • 国家统计局:一季度全国规模以上文化及相关产业企业营业收入增长6.2%
  • 王毅会见俄罗斯外长拉夫罗夫
  • 马上评丨机械停车库成“僵尸库”,设计不能闭门造车
  • 美媒:受关税政策影响,美国电商平台近千种商品平均涨价29%
  • 上海市政府常务会议研究抓好稳就业稳企业稳市场稳预期工作,让企业感受温度