+3

Dựng Airflow trong 10 phút :v

Có thể là bạn đang làm AI nhưng bị bắt kiêm cả phần load dữ liệu. Có thể là sếp bạn thách thức khả năng của bạn làm bạn cay cú. Lý do là gì không quan trọng, quan trọng là bạn cần dựng Airflow ngay và luôn. Vậy thì mình bắt đầu thôi nào.

I. Airflow để làm cái gì?

Mình tìm thấy rất nhiều khái niệm trên mạng, nên mình sẽ giải thích theo trải nghiệm cá nhân. Airflow nó như một cái lên lịch task, có giao diện khá gọn gàng và hỗ trợ chạy song song các task. Cái này rất hữu ích nếu bạn cần chạy code hàng ngày, xem lại code, xem xem code chạy thành công hay lỗi, và xem được cả log của lỗi nữa.
image.png image.png

II. Cài Airflow thôi

Nếu được thì hãy dùng Ubuntu nha, Windows nó cứ hiện lên mấy cái warning phiền ghê á.

Bước 1: Cài Docker

Ai cài rồi thì tốt. Còn ai chưa cài thì làm ơn cài đi nhanh lắm hứa. Chỉ cần cài hai cái này thôi:

  1. Docker engine https://docs.docker.com/engine/install/
  2. (Nếu từ bước trên bạn cài docker desktop rồi thì skip bước này nha) Docker compose : https://docs.docker.com/compose/install/

Tại sao lại chọn Docker để cài airflow? Nó siêu nhanh, đỡ setup nhiều, và đỡ bị lẫn giữa các dự án với nhau. Kể cả không có nhiều dự án, nếu cài theo phương pháp bình thường thì sau này bạn cũng cần phải setup Celery với Redis để chạy nhiều worker, hơi phiền. Build docker lo hết cho bạn rồi.

Bước 2: Lấy file docker-compose

Mở 1 folder trên máy tính của bạn để cho Airflow vào đó trên terminal, rồi gõ:

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.9.1/docker-compose.yaml'

Nếu như lỗi mạng thì bạn chỉ cần tự tạo file docker-compose.yaml trong folder của bạn rồi lấy nội dung trong đường link paste vào thôi.

Bước 3: Vài bước setup cơ bản

Bạn vẫn đang mở folder bên trên ở terminal đúng không? Tốt. Gõ cái này vào:

Linux:

mkdir -p ./dags ./logs ./plugins ./config # Cái này tạo vài folder rỗng
echo -e "AIRFLOW_UID=$(id -u)" > .env #Cái này để cài user id, hoặc bạn tạo file .env bằng tay rồi điền AIRFLOW_UID=50000 vào đấy, lấy bừa số xổ số miền bắc hay gì cũng được

Windows:

Bạn tạo 4 folder dags, logs, plugins, config, xong tạo file .env rồi điền AIRFLOW_UID=50000 vào đấy

Bước 4: Dựng nó lên

Chúng ta dựng Airflow, tích hợp database, tạo user đầu tiên bằng một câu lệnh cơ bản:

docker compose up (Hoặc là docker compose up -d nếu bạn muốn chạy dưới nền)

Bạn ngồi đợi code chạy 5 phút. Dùng ubuntu chỉ cần đợi nhưng dùng windows sẽ có lúc tường lửa nó hiện lên bảo đồng ý đi, thì bạn cứ đồng ý trước rồi tính sau 😃).

Sẽ có những lúc bạn tưởng code chạy xong rồi, nhưng không, nó nghỉ giải lao chút thôi. Khi nào thấy trên terminal hiện lên là nó đang chạy webserver trên cổng 8080 thì mới oke nè

image.png

Không liên quan nhưng giờ bạn có một tài khoản là airflow với pass là airflow sau khi chạy dòng code trên.

Bước 5: Cho tôi xin một chút giao diện

Tôi thấy tương tác với airflow tiện nhất là khi thiết kế code ở dưới file python rồi nhìn nó chạy trên web interface. Thế thì mình lên xem web interface trước nha. Các bạn vào localhost:8080 trên web để xem

image.png

Wow, đẹp dữ, airflow thật là tốt bụng khi cho chúng ta thật nhiều ví dụ. Bạn cứ bấm vào xem người ta thiết kế luồng chạy code để tham khảo nha. Còn tôi thì thấy nó đẹp nhất khi chỉ có code của tôi, nên tôi sẽ tắt nó đi. Mấy ông bà xem ở cuối bài tôi chỉ cho cách tắt.

Bước 6: Thiết kế DAG

DAG nó là cái đồ thị thiết kế luồng chạy code, kiểu này này image.png Thiết kế DAG thì dùng cách mới nhất là dùng Taskflow API nha. Cách cũ vẫn dùng được nhưng cách mới ngầu hơn. Đây là code minh họa cho cái DAG trên

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
from airflow.decorators import dag, task

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
}

@dag(
    dag_id='example_dag_decorator',
    default_args=default_args,
    description='An example DAG using @dag decorator',
    schedule_interval=None,
    start_date=days_ago(1),
    tags=['example'],
)
def example_dag():

    start = DummyOperator(
        task_id='start',
    )
    
    @task(task_id='op_1')
    def op_1():
        print("op_1 is running")

    @task(task_id='op_2')
    def op_2():
        print("op_2 is running")

    @task(task_id='some_other_task')
    def some_other_task():
        print("some_other_task is running")

    @task(task_id='op_3')
    def op_3():
        print("op_3 is running")

    @task(task_id='op_4')
    def op_4():
        print("op_4 is running")

    end = DummyOperator(
        task_id='end',
    )

    start >> [op_1(), op_2()] >> some_other_task() >> [op_3(), op_4()] >> end

dag_instance = example_dag()

Xong. Có gì khó hiểu thì anh em comment nương tay nhé ạ 🥺👉️👈️


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí