1.引入依赖
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>1.5.2.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>1.5.2.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>1.5.2.Final</version>
</dependency>
2.java配置
/**
* debezium 配置
* @return
*/
@Bean
public io.debezium.config.Configuration testConnector(){
return io.debezium.config.Configuration.create()
// 连接器的唯一名称
.with("name", "test-mysql-connector")
// 连接器的 Java 类的名称 Mysql连接器
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
// MySQL 数据库服务器的 IP 地址或主机名。
.with("database.hostname", "127.0.0.1")
// MySQL 数据库服务器的整数端口号
.with("database.port", "3306")
// 连接到 MySQL 数据库服务器时要使用的 MySQL 用户的名称
.with("database.user", "root")
// 连接到 MySQL 数据库服务器时使用的密码
.with("database.password","123456")
// 标识并为Debezium捕获更改的特定MySQL数据库服务器/集群提供命名空间的逻辑名称。
// 逻辑名称在所有其他连接器中应该是唯一的,因为它用作所有接收此连接器发出的事件的Kafka
// 主题名称的前缀。此名称中只允许使用字母数字字符和下划线
.with("database.server.name","test_001")
// 此数据库客户端的数字 ID,在 MySQL 集群中所有当前运行的数据库进程中必须是唯一的。
// 此连接器作为另一台服务器(具有此唯一 ID)加入 MySQL 数据库集群,
// 以便它可以读取 binlog。默认情况下,会生成 5400 到 6400 之间的随机数,
// 但建议明确设置一个值
.with("database.server.id", "5400")
.build();
}
3.实例化Debezium Engine
private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
recordChangeEvents.forEach(r -> {
SourceRecord sourceRecord = r.record();
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
// 判断操作的类型 过滤掉读 只处理增删改 这个其实可以在配置中设置
Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if (operation != Envelope.Operation.READ) {
String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;
// 获取增删改对应的结构体数据
Struct struct = (Struct) sourceRecordChangeValue.get(record);
// 将变更的行封装为Map
Map<String, Object> payload = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
// 这里简单打印一下
System.out.println("payload = " + payload);
}
}
});
}
评论区