Cứ thực hành Airflow dễ hiểu và đơn giản đã, chưa làm gì phức tạp cả
Mở đầu
Tiếp nối bài viết chỉ toàn lý thuyết Bài viết về Airflow cho người mới như mình thì chúng ta đi ngay tới bài thực hành này thôi
Chú ý là mình sẽ thực hành cùng với Python nhé ạ, bài viết sẽ gồm 2 phần thực hành chính
- Thực hành với các tác vụ đơn giản
- Thực hành với bài toán đào tạo mô hình Deep Learning
Cài đặt môi trường
- Trong bài viết của anh Hoàng, có đề cập tới việc setup nhanh chóng với docker-compose, các bạn có thể tham khảo nhé
- Mình hướng tới một cái gì đó chân chất, dễ hiểu, dễ tiếp cận cho người mới, ít động vào nhiều cái liên quan nên mình sẽ setup tay =)))
Chuẩn bị trước
- Python: 3.7, 3.8, 3.9, 3.10
- Minimum memory: 4 gb
Cài đặt Airflow bằng pip
- Cài đặt các dependencies của Linux:
sudo apt-get install libmysqlclient-dev
sudo apt-get install libssl-dev
sudo apt-get install libkrb5-dev
- Setup đường dẫn tới Airflow
export AIRFLOW_HOME=~/airflow
- Assign 3 biến environment: AIRFLOW_VERSION, PYTHON_VERSION và CONSTRAINT_URL
export AIRFLOW_VERSION=2.3.3
export PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
export CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
- Khi bạn assign một biến environment bằng lệnh export, biến này chỉ tồn tại trong phiên làm việc hiện tại của terminal. Để biến environment này tồn tại trong các phiên làm việc khác, bạn cần thêm lệnh export vào tệp ~/.bashrc
- Nhớ lưu lại các thay đổi
source .bashrc
- Cài đặt Airflow bằng pip
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
- Khởi chạy các components của airflow
airflow webserver -p 8080
airflow scheduler
Các bạn mở trình duyệt lên, vào localhost:8080
để xem kết quả nhé
Thử với một tác vụ cơ bản nào
Cứ đi từ dễ nhất trước nhé, để chúng ta hiểu hơn về cấu trúc một file DAG. Thử một chương trình Hello world xem sao. Chúng ta sẽ cùng tạo 1 file my_dag.py
có nội dung như sau
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
from pytz import timezone
import os
local_tz = timezone('Asia/Ho_Chi_Minh')
# Định nghĩa các hàm xử lý dữ liệu
def process_data():
print('process data')
def save_data():
print('save data')
print()
# Định nghĩa DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 6, 7, tzinfo=local_tz),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='my_dag',
default_args=default_args,
description='A simple DAG',
schedule_interval='* * * * *'
)
# Định nghĩa các Task
task1 = BashOperator(
task_id='task1',
bash_command='echo "Task 1"',
dag=dag,
)
task2 = PythonOperator(
task_id='task2',
python_callable=process_data,
dag=dag,
)
task3 = PythonOperator(
task_id='task3',
python_callable=save_data,
dag=dag,
)
# Thiết lập phụ thuộc giữa các Task
task1 >> task2 >> task3
if __name__ == "__main__":
dag.cli()
Trước khi giải thích các phần trong mã nguồn trên, sẽ có một số lưu ý như sau:
- Tên file mã nguồn cần trung với
dag_id
- File mã nguồn cần được lưu trong thư mục
dags
của airflow, mặc định sẽ làAIRFLOW_HOME/dags
- Mỗi khi tạo 1 file DAG và đưa vào thư mục
dags
cần khởi động lại airflow webserver
Chúng ta sẽ cùng phân tích từng thành phần có trong file mã nguồn trên
- ĐỊnh nghĩa DAG: đây là phần chúng ta sẽ định danh 1 số thông tin như dag_id để phân biệt các DAGs, người sở hữu, có phụ thuộc vào DAG nào đó hay không, ngày bắt đầu chạy, số lần thử lại nếu lỗi, thời gian thử lại nếu lỗi, mô tả DAG, thời gian tự lặp lại chạy DAG, ...
# Định nghĩa DAG
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 6, 7, tzinfo=local_tz),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='my_dag',
default_args=default_args,
description='A simple DAG',
schedule_interval='* * * * *'
)
- Định nghĩa các Tasks có trong DAG: Tùy vào các loại Task, chúng ta sẽ lựa chọn Operator hoặc Sensor sao cho phù hợp. MÌnh có giới thiệu các Operator và Sensor ở bài 1, các bạn có thể xem lại. Ví dụ mình dùng BashOperator, và lệnh thực thi của mình là
echo "Task 1"
. Thông tin tối thiểu cần định nghĩa cho Task đó là task_id, dag và thực thi cái gì
# Định nghĩa các Task
task1 = BashOperator(
task_id='task1',
bash_command='echo "Task 1"',
dag=dag,
)
task2 = PythonOperator(
task_id='task2',
python_callable=process_data,
dag=dag,
)
task3 = PythonOperator(
task_id='task3',
python_callable=save_data,
dag=dag,
)
-
Thiết lập phụ thuộc giữa các Tasks: Dưới đây là những kiểu phụ thuộc thông dụng và ký hiệu tương ứng:
- >>: Tác vụ bên trái phải được thực thi trước khi tác vụ bên phải được thực thi. Đây là kiểu phụ thuộc mặc định giữa các tác vụ trong Airflow.
- <<: Tác vụ bên phải phải được thực thi trước khi tác vụ bên trái được thực thi.
- >> và <<: Cả hai tác vụ phải được thực thi trước khi tác vụ được yêu cầu được thực thi.
- >> với trigger_rule='all_success': Tất cả các tác vụ phải hoàn thành thành công trước khi tác vụ được yêu cầu được thực thi.
- >> với trigger_rule='one_success': Một trong các tác vụ phải hoàn thành thành công trước khi tác vụ được yêu cầu được thực thi.
- >> với trigger_rule='all_failed': Tất cả các tác vụ phải hoàn thành với lỗi trước khi tác vụ được yêu cầu được thực thi.
- >> với trigger_rule='one_failed': Một trong các tác vụ phải hoàn thành với lỗi trước khi tác vụ được yêu cầu được thực thi.
- Ví dụ về trigger_rule:
t1 >> t2 >> t3 t3.trigger_rule = 'all_failed'
-
Chạy DAG: Bạn có thể sử dụng
dag.run()
để chạy DAG của bạn trong một ứng dụng Python hoặc sử dụngdag.cli()
để chạy các lệnh dòng lệnh Airflow từ DAG của bạn.
Vậy chúng ta cùng xem kết quả nào. Trước hêt nhớ những cái mình vừa note bên trên nhé, sau đó chúng ta sẽ thấy DAG của mình đã xuất hiện trên Webserver
Chúng ta sẽ chạy thử DAG của mình bằng nút gạt bên trái. Hiện tại code mình đang cho nó 1 phút chạy 1 lần và mỗi task sẽ in ra một dòng chữ. Trạng thái và số lượng những task theo từng trạng thái được hiển thị trong các ô tròn, các bạn trỏ chuột vào đó sẽ hiện ra những cái hint notes
Những thông tin in ra sẽ được lưu trữ trong log, mặc định ở thư mục AIRFLOW_HOME/logs
Xuất phát từ dân AI thì thử với Deep learning xem sao
Về nội dung phần này có lẽ mình sẽ nói chung chung một chút. Mình đặt ra một quy trình đơn giản trong việc thiết kế và xây dựng mô hình học máy như hình vẽ trên, trong đó chúng ta giả sử lựa chọn ra 3 kiến trúc mô hình phù hợp cho bài toán của mình, muốn tự động hóa việc training, lựa chọn mô hình có độ chính xác tốt nhất, sau đó đánh giá lại xem nó có hiệu quả hay không
Mình sẽ không trình bày các code liên quan đến training, evaluate quá sâu trong bài viết này
Khởi tạo DAG
Một công đoạn không thể thiếu khi chúng ta triển khai AirFlow
dag = DAG("my_dag", # Dag id
start_date=datetime(2023, 1 ,1), # start date, the 1st of January 2021
schedule_interval='@daily', # Cron expression, here it is a preset of Airflow, @daily means once every day.
description='A simple ML flow with DAG',
Training mô hình
Ở đây chúng ta sẽ có 3 task đại diện cho việc training 3 môn hình
def _training_model():
return random.randint(0, 10)
# Tasks are implemented under the dag object
training_model_A = PythonOperator(
task_id="training_model_A",
python_callable=_training_model,
dag=dag
)
training_model_B = PythonOperator(
task_id="training_model_B",
python_callable=_training_model,
dag=dag
)
training_model_C = PythonOperator(
task_id="training_model_C",
python_callable=_training_model,
dag=dag
)
Kiểm tra xem có mô hình đạt yêu cầu hay không
def _choosing_best_model(ti):
accuracies = ti.xcom_pull(task_ids=[
'training_model_A',
'training_model_B',
'training_model_C'
])
if max(accuracies) > 8:
return 'accurate'
return 'inaccurate'
choosing_best_model = BranchPythonOperator(
task_id="choosing_best_model",
python_callable=_choosing_best_model,
dag=dag
)
Ở đây các bạn sẽ bắt gặp một từ khóa mới, đó là xcom_pull
. Các bạn có thể hiểu đơn giản như sau:
- XCOM (Cross-Communication Messages) là một cơ chế cho phép dữ liệu đổi giữa các tác vụ DAG
- Hàm
_choosing_best_model
được sử dụng để lấy thông tin về độ chính xác của 3 task training mô hình A, B, C, nếu 1 trong 3 độ chính xác này đạt một ngưỡng nào đó (ở đây việc training mình cho trả về random 1 giá trị từ 0 đến 10) thì trả về "đạt"
Hậu xử lý khi tối thiểu 1 trong 3 mô hình đã đạt kỳ vọng
Sau khi training 3 mô hình và xảy ra 2 trường hợp là tối thiểu 1 trong 3 mô hình đã đạt kỳ vọng hoặc không có mô hình nào đạt, thì cần có những hành động tiếp theo. Dưới đây mình có ví dụ dễ hiểu 1 chút
accurate = BashOperator(
task_id="accurate",
bash_command="echo 'Prediction'"
)
inaccurate = BashOperator(
task_id="inaccurate",
bash_command=" echo 'Retraining'"
)
Xác định phụ thuộc cho các task
training_model_tasks = [
PythonOperator(
task_id=f"training_model_{model_id}",
python_callable=_training_model,
op_kwargs={
"model": model_id
}
) for model_id in ['A', 'B', 'C']
]
choosing_best_model = BranchPythonOperator(
task_id="choosing_best_model",
python_callable=_choosing_best_model
)
training_model_tasks >> choosing_best_model >> [accurate, inaccurate]
Tổng kết
Trên đây mình có giới thiệu qua một chút về việc thực hành với Airflow một cách đơn giản để hiểu quy tình cũng như các bước cơ bản khi làm việc với nó. Thời gian tới khi có dịp làm việc với Airflow nhiều hơn, mình sẽ chia sẻ thêm với các bạn. Cảm ơn mọi người đã đọc đến những dòng cuối này ^^
Tài liệu tham khảo
All rights reserved