+1

Delta Lake Cơ Bản Với PySpark

Source (en): https://karlchris.github.io/data-engineering/projects/delta-spark/

image.png

Lời mở đầu

Sau khoảng thời gian dài trong việc quản lý dữ liệu, Data Warehouse vào khả năng lưu trữ data có cấu trúc (structured data) cũng như hỗ trợ truy vấn (query), từ đó data có thể được sử dụng với nhiều mục đích khác như BI, Machine Learning, Data Mining, ...

Cùng với sự gia tăng không ngừng của dữ liệu và nhu cầu sử dụng đống dữ liệu đó, data warehouse dần trở nên "struggled" khi gặp phải dữ liệu bán cấu trúc (semi-structured data) và phi cấu trúc (structured data), dần bộc lộ ra nhiều hạn chế. Từ đó, Data Lake ra đời, cung cấp giải pháp để lưu trữ gần như mọi loại dữ liệu từ có cấu trúc đến phi cấu trúc. Tuy nhiên Data Lake lại thiếu đi khả năng xử lý và truy các dữ liệu có cấu trúc như Data Warehouse.

Gần đây, với sự kết hợp điểm mạnh của cả Data Warehouse và cả Data Lake, cấu trúc Data LakeHouse được ra đời, cung cấp giải pháp lưu trữ linh hoạt như của data lake và hỗ trợ lưu trữ và truy vấn dữ liệu có cấu trúc như của data warehouse.

image.png

Giới thiệu về Delta Lake

Delta Lake là một open-source software, hỗ trợ một định dạng table tối ưu cho các data storage. Delta lake tối ưu hiều mặt của các data storage với nhiều tính năng mới và vượt trội như ACID, Time travel, Unified Batch & Streaming, Schema Enforcement & Evolution, ...

Delta lake sử dụng định dạng .parquet để lưu trữ các file dữ liệu của delta table. Một delta table bao gồm 2 thành phần chính:

  • Parquet files chứa các file dữ liệu dưới định dạng .parquet
  • Transaction log lưu trữ metadata về các transaction với dữ liệu trong

image.png

Delta lake đồng hỗ trợ cả ETL và ETL workloads, tuy nhiên ETL sẽ thích hợp hơn đối với delta lake bởi hiệu năng (Performance) và độ tin cậy (Reliability).

  • Performance: các truy vấn được tối ưu nhờ vào
    • Lưu trữ file paths và metadata trong Transaction log
    • Thực hiện partial read thông qua file-skipping
    • Co-locatinng các dữ liệu tương đồng để thực hiện skipping nếu có thể.

Tips: Co-locating similar data: Delta lake lưu các dữ liệu tương đồng ở gần nhau nhằm cải thiện performance thông qua các phương pháp như Z-Ordering hoặc Liquid Clustering.

  • Reliability: Delta lake tối ưu quy trình ETL thông qua việc áp dụng ACID transactions.

Liquid Clustering, Z-ordering and Hive-style partitioning

Cơ bản là đưa các dữ liệu giống nhau lại gần nhau hơn. Liquid Clustering là mới nhất, cũng như tối ưu nhất. ETL workload sẽ được tối ưu từ việc clustering nếu:

  • Thường xuyên filter các cột
  • Dữ liệu bị skew nặng
  • Dữ liệu gia tăng quá nhanh, cần maintenance và tuning
  • Access patternns thay đổi qua thời gian

Query Engine Supported

Delta lake chủ yếu hỗ trợ Apache Spark đặc biệt là trên Databricks, tuy nhiên cũng hỗ trợ một số các loại query engine khác như polars.

Schema evolution & enforcement

Delta hỗ trợ Schema evolution nhằm tránh data bị corruption. Tức là bạn không thể thực hiện write data mới vào một delta table nếu data mới và delta table cũ khác schema. Khi thực hiện write sẽ xảy ra lỗi AnalysisException.

Ví dụ

df = spark.createDataFrame(
    [("bob", 47), ("li", 23), ("leonard", 51)])
    .toDF("first_name", "age"
)

df.write.format("delta").save("data/toy_data")

Nếu thực hiện write data mới vào với schema khác schema ban đầu:

df = spark.createDataFrame([("frank", 68, "usa"), ("jordana", 26, "brasil")]).toDF(
    "first_name", "age", "country"
)

df.write.format("delta").mode("append").save("data/toy_data")

Thì lỗi =))

AnalysisException: [_LEGACY_ERROR_TEMP_DELTA_0007] A schema mismatch detected when writing to the Delta table

Table schema:
root
-- first_name: string (nullable = true)
-- age: long (nullable = true)

Data schema:
root
-- first_name: string (nullable = true)
-- age: long (nullable = true)
-- country: string (nullable = true)

Schema evolution

Tuy nhiên, ETL workload luôn thay đổi theo thời gian, schema của data đương nhiên cũng thay đổi (không có j tồn tại vĩnh viễn), do đó sinh ra Schema evolution. Các định dạng lakehouse hiện đại đều hỗ trợ tính năng này (đặc biệt thằng Iceberg hỗ trợ rẩt mạnh).

Để update schema của delta table cũ theo schema của data mới, khi thực hiện write hãy thêm option mergeSchema thành true

df.write \
    .option("mergeSchema", "true") \
    .mode("append") \
    .format("delta") \
    .save("data/toy_data")

Nếu trong ví dụ trên, kết quả thu được kiểu:

spark.read.format("delta").load("data/toy_data").show()

+----------+---+-------+
|first_name|age|country|
+----------+---+-------+
|   jordana| 26| brasil| # new
|     frank| 68|    usa| # new
|   leonard| 51|   null|
|       bob| 47|   null|
|        li| 23|   null|
+----------+---+-------+

Time travel

Du hành thời gian =)), kiểu khi bạn làm cái gì đó lỗi, bạn có thể Ctrl + Z hay rollback lại lúc chưa lỗi. Delta lake có hỗ trợ các delta table tính năng này. Khi lưu data của delta table, trong folder chứa các data file sẽ có một folder tên _delta_log. Folder này lưu lại các transaction log từ các hành động của bạn lên delta table. Từ đó có thể travel lại các trạng thái của delta table.

Tối ưu?

Các file nhỏ khiến việc read trở năng chậm ("the small file problem").

Ví dụ: query từ một delta table với 2m dòng, data được partitioned theo cột education. Ví dụ đầu thì có 1440 files mỗi partition và ví dụ sau chi có 1 per partition.

%%time
df = spark.read.format("delta").load("test/delta_table_1440")
res = df.where(df.education == "10th").collect()

CPU times: user 175 ms, sys: 20.1 ms, total: 195 ms
Wall time: 16.1 s
%%time
df = spark.read.format("delta").load("test/delta_table_1")
res = df.where(df.education == "10th").collect()

CPU times: user 156 ms, sys: 16 ms, total: 172 ms
Wall time: 4.62 s

Khác bọt vcl!

Offline optimize

Giả sử có một luồng ETL stream data vào một delta table đã partitioned và update mỗi phút => 1440 files mỗi partiton at the end of every day.

Có thể manually chạy OPTIMIZE command để optimize số lượng file. Lệnh này sẽ compact các file nhỏ thành file lớn hơn. Default file size trong mỗi partitionn sau khi chạy lệnh này là 1GB.

from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "test/delta_table")
deltaTable.optimize().executeCompaction()

Downstream queries sẽ được tối ưu hơn =))

Optimized Write

Không cần chạy manually, có thể config để chạy automatically.

Optimized Write sẽ combine các file nhỏ trong cùng 1 partition thành cùng 1 file.

Có thể viết option khi lưu như này:

df.write.format("delta").option("optimizeWrite", "True").save("path/to/delta")

Hoặc setting.

  • Cho toàn bộ delta table sử dụng table property: delta.autoOptimize.optimizeWrite
  • Cho cả Spark session: spark.databricks.delta.optimizeWrite.enabled

Warning: cái này lâu do bị shuffle => không được enables by default.

Auto Compaction

Giải quyết điểm yếu của Optimized Write. Cái này sẽ tối ưu bằng cách tự động chạy một small command optimize sau mỗi lệnh write

Setting:

  • Cho toàn bộ delta table sử dụng table property: delta.autoOptimize.autoCompact
  • Cho cả Spark session: spark.databricks.delta.autoCompact.enabled

Vacuum

Auto Compaction tối ưu sau mỗi write operation, các file nhỏ có thể vẫn tồn tại

> # get n files per partition
> !ls delta/census_table_compact/education\=10th/*.parquet | wc -l

1441

1440 files nhỏ (cũ) và 1 file to (mới - sau Auto Compatio) trong 1 partition. Tuy không ảnh hưởng đến hiệu năng của việc read, tuy nhiên nếu muốn loại bỏ thì dùng VACUUM với parameter là thời gian (giờ) preserve.

deltaTable.vacuum(0)

Lệnh này sẽ remove các files mà không còn được actively reference trong Transaction log. Lệnh VACUUM mặc định sẽ chỉ ảnh hưởng đến các file đã tồn tại lâu hơn retention duration (mặc định là 7 ngày). Setting về retention duration có thể overide bằng cácc

spark.sql("SET spark.databricks.delta.retentionDurationCheck.enabled=false")

Change Data Feed - Change Data Capture (CDC)

Cơ bản là Change Data Capture, on top of Delta lake, create by Databricks

CDF cho phép Databricks track row-level channge giữa các versionn của Delta table. Khi được enable trong một Delta table, runtime sẽ record sự thay đổi của mọi data được write vào table. Nó bao gồm record mới được insert / update / delete cùng với metadata biểu thị rằng cái record đó được insert, update hay delete.

CREATE TABLE <table_name> ()
USING DELTA
PARTITION BY (col)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
LOCATION 'path/to/table'

Databricks khuyến khích nên sử dụng CDF với Structured Streaming để incrementally process thay đổi từ các Delta table.

def read_cdc_by_table_name(starting_version):
    return spark.read.format("delta") \
            .option("readChangeFeed", "true") \
        .option("startingVersion", str(starting_version)) \
        .table("<table_name>") \
        .orderBy("_change_type", "id")


def stream_cdc_by_table_name(starting_version):
    return spark.readStream.format("delta") \
            .option("readChangeFeed", "true") \
            .option("startingVersion", str(starting_version)) \
            .table("<table_name>") \
        .writeStream \
            .format("console") \
            .option("numRows", 1000) \
            .start()

Lời kết

Ngoài thằng này ra còn các định dạng Lakehouse khác như Hudi, Iceberg, XTable (thằng này đang phát triển)...

Reference


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí