+7

[Debezium Series] Sử dụng Debezium Engine + PostgreSQL

Giới thiệu

Tiếp nối series Debezium cơ bản hôm nay cùng mình tìm hiểu về Debezium Engine và cách sử dụng nó với CSDL PostgreSQL nhé.

Debezium Engine

Debezium hỗ trợ cho cho ae dev 1 bộ module debezium-api được phát triển bằng java cho phép bạn kết nối tới DB để đọc logs từ chúng.

Add dependencies vào file POM nhé

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
      <groupId>io.debezium</groupId>
      <artifactId>debezium-connector-postgres</artifactId>
      <version>${debezium.version}</version>
</dependency>

nếu bạn sài các CSDL khác thì sẽ có những connector khác, ví dụ Mysql thì sẽ sẽ add như sau

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${version.debezium}</version>
</dependency>

Tiếp theo mình tiếp hành khởi tạo connector

		this.engine = DebeziumEngine
				.create(ChangeEventFormat.of(Connect.class))
				.using(setConfig())
				.notifying(new CdcSummaryBatchHandler2())
				.build();

để thực thi connector này mình sẽ tiến hành tạo ra luồng mới và tiến hành chạy.

    private final Executor executor = Executors.newSingleThreadExecutor();
    
	@PostConstruct
	private void start() {
		this.executor.execute(engine);
	}

	@PreDestroy
	private void stop() {
		if (this.engine != null) {
			try {
				this.engine.close();
			} catch (IOException e) {
				log.error("Can not close debezium engine");
			}
		}
	}

trong đó hàm setConfig và class CdcSummaryBatchHandler2 là 2 đối tượng được mình tạo ra để khai báo config và lắng nghe các sự kiện từ CSDL.

	private Properties setConfig() {
		Properties configProperties = new Properties();
		configProperties.put("plugin.name", "pgoutput");
		configProperties.put("connector.class", "io.debezium.connector.postgresql.PostgresConnector");

        ///KAFKA
        configProperties.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        configProperties.put("key.converter.schemas.enable", "false");
        configProperties.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        configProperties.put("value.converter.schemas.enable", "false");


		configProperties.put("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
		configProperties.put("offset.storage.file.filename", "/Users/vanhanhchu/Documents/file.DAT");
		configProperties.put("offset.flush.interval.ms", "60000");
		configProperties.put("name", "connectorName");
		configProperties.put("database.server.name", applicationProperties.getInventorySummaryBranchCdc().getDatabaseServerName());
		configProperties.put("database.hostname", applicationProperties.getInventorySummaryBranchCdc().getDatabaseHostname());
		configProperties.put("database.port", applicationProperties.getInventorySummaryBranchCdc().getDatabasePort());
		configProperties.put("database.user", applicationProperties.getInventorySummaryBranchCdc().getDatabaseUser());
		configProperties.put("database.password", applicationProperties.getInventorySummaryBranchCdc().getDatabasePassword());
		configProperties.put("database.dbname", applicationProperties.getInventorySummaryBranchCdc().getDatabaseDbname());
		configProperties.put("database.tcpKeepAlive", 600);
		configProperties.put("table.include.list", "item-services.inventory_summary_branch,item-services.cdc_update_es,item-services.item_model,item-services.cdc_heart_beat");

		// advantage config
		configProperties.put("skipped.operations", "d,u");
		configProperties.put("provide.transaction.metadata", "true");
		configProperties.put("max.batch.size", "500");
		configProperties.put("slot.name", "etl_replication_1");
        configProperties.put("snapshot.mode", "never");
        
		configProperties.put("heartbeat.interval.ms", "30000");
		configProperties.put("heartbeat.action.query", "UPDATE \"item-services\".cdc_heart_beat SET ts_time = now() WHERE id = 1;");

		// return config
		return configProperties;
	}

Trong đó: skipped.operations là những event được bỏ qua(d=delete, u=update)

slot.name tên slot replication sẽ được khởi tạo trong CSDL của bạn

max.batch.size số lượng batch tối đa.

offset.storage.file.filename là đường dẫn file lưu trữ offset

table.include.list là danh sách table sẽ được debezium đọc logs

snapshot.mode là chế độ snapshot khi lần đầu khởi tạo, mặc định là initial thì debezium sẽ tiến hành snapshot toàn bộ dữ liệu từ logs và các giá trị bị thiếu. giá trị never thì debezium chỉ đọc các log từ thời điểm tạo replication.

heartbeat.interval.ms là cấu hình cứ sau X ms sẽ thực thi câu truy vấn heartbeat.action.query

heartbeat.action.query là câu truy vấn được thực thi, việc thêm tác vụ này giúp bạn duy trì được kết nối ổn định tránh tình trạng quá lâu DB của bạn không phát sinh thay đổi.

còn lại là các giá trị mặc định và bạn không cần thay đổi.

Tiếp theo ta tiến hành implements class CdcSummaryBatchHandler2

public class CdcSummaryBatchHandler2 implements DebeziumEngine.ChangeConsumer<RecordChangeEvent<SourceRecord>> {

    private final Logger log = LoggerFactory.getLogger(CdcSummaryBatchHandler2.class);

    @Override
    public void handleBatch(List<RecordChangeEvent<SourceRecord>> records,
                            DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> committer) throws InterruptedException {
        try {
            Map<String, List<RecordChangeEvent<SourceRecord>>> group = records.stream().collect(Collectors.groupingBy(x -> x.record().topic()));
            for (Map.Entry<String, List<RecordChangeEvent<SourceRecord>>> data : group.entrySet()) {
                log.debug("record value" + data.getValue());
                committer.markBatchFinished();
            }
        } catch (Throwable e) {
            log.error("ETL_ERROR " + e.getMessage());
            e.printStackTrace();
        }
    }
}

và đây là kết quả mình nhận được

cụ thể object RecordChangeEvent<SourceRecord> nó sẽ như này

record value[EmbeddedEngineChangeEvent [key=null, value=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206200, lsn_commit=34194206200, lsn=34194206264, txId=13456207, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=BEGIN,id=13456207}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, sourceRecord=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206200, lsn_commit=34194206200, lsn=34194206264, txId=13456207, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=BEGIN,id=13456207}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}], EmbeddedEngineChangeEvent [key=null, value=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206400, lsn_commit=34194206400, lsn=34194206400, transaction_data_collection_order_item-services.cdc_heart_beat=1, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=END,id=13456207,event_count=1,data_collections=[Struct{data_collection=item-services.cdc_heart_beat,event_count=1}]}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}, sourceRecord=SourceRecord{sourcePartition={server=item-etl-service}, sourceOffset={transaction_id=13456207, lsn_proc=34194206400, lsn_commit=34194206400, lsn=34194206400, transaction_data_collection_order_item-services.cdc_heart_beat=1, ts_usec=1711180913672013}} ConnectRecord{topic='item-etl-service.transaction', kafkaPartition=null, key=Struct{id=13456207}, keySchema=Schema{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, value=Struct{status=END,id=13456207,event_count=1,data_collections=[Struct{data_collection=item-services.cdc_heart_beat,event_count=1}]}, valueSchema=Schema{io.debezium.connector.common.TransactionMetadataValue:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]]

dựa trên object trên ta có thể convert nó về đối object tương ứng bạn muốn, dưới đây là code example các bạn có thể tham khảo

/*
 *
 *  * Copyright (c) 2024, T5K - Truyen5k.Com
 *  * author: Hanh.Chu
 *
 */
static {
        {
            mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
            mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
            mapper.configure(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS, false);
            mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
            mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
            mapper.configure(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS, true);
            mapper.registerModule(new ParameterNamesModule());
            mapper.registerModule(new Jdk8Module());
            mapper.registerModule(new JavaTimeModule());
        }
    }
    public static <T> Optional<T> mapToModel(SourceRecord sourceRecord, Class<T> clasz, Operation... opeAction) {
        Optional<Map<String, Object>> map = sourceRecordToMap(sourceRecord, opeAction);
        if (map.isPresent()) {
            return Optional.of(mapper.convertValue(map.get(), clasz));
        }
        return Optional.empty();
    }
    
   public static Optional<Map<String, Object>> sourceRecordToMap(SourceRecord sourceRecord, Operation... opeAction) {
        try {
            // map json to struct
            Struct sourceRecordValue = (Struct) sourceRecord.value();

            if (sourceRecordValue != null && opeAction != null) {
                Operation operation = Operation.forCode((String) sourceRecordValue.get(OPERATION));

                // Only if this is a transaction operation. update and create event
                if (Arrays.asList(opeAction).contains(operation)) { // Operation.CREATE || operation == Operation.UPDATE

                    Struct struct = null;
                    if (operation == Operation.DELETE) {
                        struct = (Struct) sourceRecordValue.get(BEFORE);
                    } else {
                        struct = (Struct) sourceRecordValue.get(AFTER);
                    }
                    Struct finalStruct = struct;
                    if (finalStruct == null) {
                        return Optional.empty();
                    }
                    Map<String, Object> message = struct.schema().fields().stream().map(Field::name)
                        .filter(fieldName -> finalStruct.get(fieldName) != null)
                        .map(fieldName -> Pair.of(fieldName, finalStruct.get(fieldName)))
                        .collect(toMap(Pair::getKey, Pair::getValue));
                    return Optional.of(message);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return Optional.empty();
    }

Reference

T5K - Truyện chữ online (provider source)

debezium.io - using Debezium Engine

Series

Phần 1: Debezium là gì? ứng dụng thực tế.

Phần 2: Cấu hình sử dụng Debezium Engine + PostgreSQL

Phần 3: Cấu hình sử dụng Debezium + PostgreSQL + Kafka Connect


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí