[Debezium Series] Sử dụng Debezium Engine + PostgreSQL
Giới thiệu
Tiếp nối series Debezium cơ bản hôm nay cùng mình tìm hiểu về Debezium Engine và cách sử dụng nó với CSDL PostgreSQL nhé.
Debezium Engine
Debezium hỗ trợ cho cho ae dev 1 bộ module debezium-api được phát triển bằng java cho phép bạn kết nối tới DB để đọc logs từ chúng.
Add dependencies vào file POM nhé
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-postgres</artifactId>
<version>${debezium.version}</version>
</dependency>
nếu bạn sài các CSDL khác thì sẽ có những connector khác, ví dụ Mysql thì sẽ sẽ add như sau
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>
Tiếp theo mình tiếp hành khởi tạo connector
this.engine = DebeziumEngine
.create(ChangeEventFormat.of(Connect.class))
.using(setConfig())
.notifying(new CdcSummaryBatchHandler2())
.build();
để thực thi connector này mình sẽ tiến hành tạo ra luồng mới và tiến hành chạy.
private final Executor executor = Executors.newSingleThreadExecutor();
@PostConstruct
private void start() {
this.executor.execute(engine);
}
@PreDestroy
private void stop() {
if (this.engine != null) {
try {
this.engine.close();
} catch (IOException e) {
log.error("Can not close debezium engine");
}
}
}
trong đó hàm setConfig
và class CdcSummaryBatchHandler2
là 2 đối tượng được mình tạo ra để khai báo config và lắng nghe các sự kiện từ CSDL.
private Properties setConfig() {
Properties configProperties = new Properties();
configProperties.put("plugin.name", "pgoutput");
configProperties.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
///KAFKA
configProperties.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
configProperties.put("key.converter.schemas.enable", "false");
configProperties.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
configProperties.put("value.converter.schemas.enable", "false");
configProperties.put("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
configProperties.put("offset.storage.file.filename", "/Users/vanhanhchu/Documents/file.DAT");
configProperties.put("offset.flush.interval.ms", "60000");
configProperties.put("name", "connectorName");
configProperties.put("database.server.name", applicationProperties.getInventorySummaryBranchCdc().getDatabaseServerName());
configProperties.put("database.hostname", applicationProperties.getInventorySummaryBranchCdc().getDatabaseHostname());
configProperties.put("database.port", applicationProperties.getInventorySummaryBranchCdc().getDatabasePort());
configProperties.put("database.user", applicationProperties.getInventorySummaryBranchCdc().getDatabaseUser());
configProperties.put("database.password", applicationProperties.getInventorySummaryBranchCdc().getDatabasePassword());
configProperties.put("database.dbname", applicationProperties.getInventorySummaryBranchCdc().getDatabaseDbname());
configProperties.put("database.tcpKeepAlive", 600);
configProperties.put("table.include.list", "item-services.inventory_summary_branch,item-services.cdc_update_es,item-services.item_model,item-services.cdc_heart_beat");
// advantage config
configProperties.put("skipped.operations", "d,u");
configProperties.put("provide.transaction.metadata", "true");
configProperties.put("max.batch.size", "500");
configProperties.put("slot.name", "etl_replication_1");
configProperties.put("snapshot.mode", "never");
configProperties.put("heartbeat.interval.ms", "30000");
configProperties.put("heartbeat.action.query", "UPDATE \"item-services\".cdc_heart_beat SET ts_time = now() WHERE id = 1;");
// return config
return configProperties;
}
Trong đó:
skipped.operations
là những event được bỏ qua(d=delete, u=update)
slot.name
tên slot replication sẽ được khởi tạo trong CSDL của bạn
max.batch.size
số lượng batch tối đa.
offset.storage.file.filename
là đường dẫn file lưu trữ offset
table.include.list
là danh sách table sẽ được debezium đọc logs
snapshot.mode
là chế độ snapshot khi lần đầu khởi tạo, mặc định là initial thì debezium sẽ tiến hành snapshot toàn bộ dữ liệu từ logs và các giá trị bị thiếu. giá trị never thì debezium chỉ đọc các log từ thời điểm tạo replication.
heartbeat.interval.ms
là cấu hình cứ sau X ms sẽ thực thi câu truy vấn heartbeat.action.query
heartbeat.action.query
là câu truy vấn được thực thi, việc thêm tác vụ này giúp bạn duy trì được kết nối ổn định tránh tình trạng quá lâu DB của bạn không phát sinh thay đổi.
còn lại là các giá trị mặc định và bạn không cần thay đổi.
Tiếp theo ta tiến hành implements class CdcSummaryBatchHandler2
public class CdcSummaryBatchHandler2 implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {
private final Logger log = LoggerFactory.getLogger(CdcSummaryBatchHandler2.class);
@Override
public void handleBatch(List<RecordChangeEvent<SourceRecord>> records,
DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
try {
Map<String, List<RecordChangeEvent<SourceRecord>>> group = records.stream().collect(Collectors.groupingBy(x -> x.record().topic()));
for (Map.Entry<String, List<RecordChangeEvent<SourceRecord>>> data : group.entrySet()) {
log.debug("record value" + data.getValue());
committer.markBatchFinished();
}
} catch (Throwable e) {
log.error("ETL_ERROR " + e.getMessage());
e.printStackTrace();
}
}
}
và đây là kết quả mình nhận được
cụ thể object RecordChangeEvent<SourceRecord>
nó sẽ như này
record value[EmbeddedEngineChangeEvent [key=null, value=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206200, lsn_commit=34194206200, lsn=34194206264, txId=13456207, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=BEGIN,id=13456207}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, sourceRecord=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206200, lsn_commit=34194206200, lsn=34194206264, txId=13456207, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=BEGIN,id=13456207}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}], EmbeddedEngineChangeEvent [key=null, value=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206400, lsn_commit=34194206400, lsn=34194206400, transaction_data_collection_order_item-services.cdc_heart_beat=1, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=END,id=13456207,event_count=1,data_collections=[Struct{data_collection=item-services.cdc_heart_beat,event_count=1}]}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, sourceRecord=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206400, lsn_commit=34194206400, lsn=34194206400, transaction_data_collection_order_item-services.cdc_heart_beat=1, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=END,id=13456207,event_count=1,data_collections=[Struct{data_collection=item-services.cdc_heart_beat,event_count=1}]}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]]
dựa trên object trên ta có thể convert nó về đối object tương ứng bạn muốn, dưới đây là code example các bạn có thể tham khảo
/*
*
* * Copyright (c) 2024, T5K - Truyen5k.Com
* * author: Hanh.Chu
*
*/
static {
{
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
mapper.configure(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
mapper.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, true);
mapper.registerModule(new ParameterNamesModule());
mapper.registerModule(new Jdk8Module());
mapper.registerModule(new JavaTimeModule());
}
}
public static <T> Optional<T> mapToModel(SourceRecord sourceRecord, Class<T> clasz, Operation... opeAction) {
Optional<Map<String, Object>> map = sourceRecordToMap(sourceRecord, opeAction);
if (map.isPresent()) {
return Optional.of(mapper.convertValue(map.get(), clasz));
}
return Optional.empty();
}
public static Optional<Map<String, Object>> sourceRecordToMap(SourceRecord sourceRecord, Operation... opeAction) {
try {
// map json to struct
Struct sourceRecordValue = (Struct) sourceRecord.value();
if (sourceRecordValue != null && opeAction != null) {
Operation operation = Operation.forCode((String) sourceRecordValue.get(OPERATION));
// Only if this is a transaction operation. update and create event
if (Arrays.asList(opeAction).contains(operation)) { // Operation.CREATE || operation == Operation.UPDATE
Struct struct = null;
if (operation == Operation.DELETE) {
struct = (Struct) sourceRecordValue.get(BEFORE);
} else {
struct = (Struct) sourceRecordValue.get(AFTER);
}
Struct finalStruct = struct;
if (finalStruct == null) {
return Optional.empty();
}
Map<String, Object> message = struct.schema().fields().stream().map(Field::name)
.filter(fieldName -> finalStruct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, finalStruct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
return Optional.of(message);
}
}
} catch (Exception e) {
e.printStackTrace();
}
return Optional.empty();
}
Reference
T5K - Truyện chữ online (provider source)
debezium.io - using Debezium Engine
Series
Phần 1: Debezium là gì? ứng dụng thực tế.
Phần 2: Cấu hình sử dụng Debezium Engine + PostgreSQL
Phần 3: Cấu hình sử dụng Debezium + PostgreSQL + Kafka Connect
All rights reserved