侧边栏壁纸
博主头像
seems 博主等级

学习博客

  • 累计撰写 62 篇文章
  • 累计创建 41 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

springboot集成debezium实时监控数据库变化

seems
2023-09-01 / 0 评论 / 0 点赞 / 36 阅读 / 0 字

1.引入依赖

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>1.5.2.Final</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>1.5.2.Final</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>1.5.2.Final</version>
</dependency>

2.java配置

/**
 * debezium 配置
 * @return
 */
@Bean
public io.debezium.config.Configuration testConnector(){
    return io.debezium.config.Configuration.create()
        // 连接器的唯一名称
        .with("name", "test-mysql-connector")
        // 连接器的 Java 类的名称  Mysql连接器
        .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
        // MySQL 数据库服务器的 IP 地址或主机名。
        .with("database.hostname", "127.0.0.1")
        // MySQL 数据库服务器的整数端口号
        .with("database.port", "3306")
        // 连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称
        .with("database.user", "root")
        // 连接到 MySQL 数据库服务器时使用的密码
        .with("database.password","123456")
        // 标识并为Debezium捕获更改的特定MySQL数据库服务器/集群提供命名空间的逻辑名称。
        // 逻辑名称在所有其他连接器中应该是唯一的,因为它用作所有接收此连接器发出的事件的Kafka
        // 主题名称的前缀。此名称中只允许使用字母数字字符和下划线
        .with("database.server.name","test_001")
        // 此数据库客户端的数字 ID,在 MySQL 集群中所有当前运行的数据库进程中必须是唯一的。
        // 此连接器作为另一台服务器(具有此唯一 ID)加入 MySQL 数据库集群,
        // 以便它可以读取 binlog。默认情况下,会生成 5400 到 6400 之间的随机数,
        // 但建议明确设置一个值
        .with("database.server.id", "5400")
        .build();
}

3.实例化Debezium Engine

private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
    recordChangeEvents.forEach(r -> {
        SourceRecord sourceRecord = r.record();
        Struct sourceRecordChangeValue = (Struct) sourceRecord.value();

        if (sourceRecordChangeValue != null) {
            // 判断操作的类型 过滤掉读 只处理增删改   这个其实可以在配置中设置
            Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

            if (operation != Envelope.Operation.READ) {
                String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;
                // 获取增删改对应的结构体数据
                Struct struct = (Struct) sourceRecordChangeValue.get(record);
                // 将变更的行封装为Map
                Map<String, Object> payload = struct.schema().fields().stream()
                        .map(Field::name)
                        .filter(fieldName -> struct.get(fieldName) != null)
                        .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                        .collect(toMap(Pair::getKey, Pair::getValue));
                // 这里简单打印一下
                System.out.println("payload = " + payload);
            }
        }
    });
}
0

评论区