+8

Tất tật về Threading trong Python

Xin chào, mình thích viết ngắn và đi thẳng vấn đề 😴. Ở đây có tất cả những thứ bạn nên biết về Python threading.

1. Threading để làm gì?

Để xử lý đồng thời!
Một cách mặc định, Python sẽ xử lý từ trên xuống. Do đó, tại một thời điểm chỉ có một đoạn mã được thực thi. Lấy ví dụ đoạn code sau:

def process_data(name: str, count: int):
    print(f"Starting {name}...")

    for i in range(count):
        print(name, i+1, sep=": ")

if __name__=="__main__":
    process_data("Thread-1", 5)
    process_data("Thread-2", 10)

Ở đây, ta muốn chạy hàm process_data 2 lần, mỗi lần truyền vào một tham số khác nhau. Python sẽ thực thi từ trên xuống, tức là chạy hàm này với bộ tham số ("Thread-1", 5) trước, sau khi thực thi xong thì bắt đầu chạy hàm này với bộ tham số ("Thread-2", 10). Kết quả trả ra sẽ là:

Starting Thread-1...
Thread-1: 1
Thread-1: 2
Thread-1: 3
Thread-1: 4
Thread-1: 5
Starting Thread-2...
Thread-2: 1
Thread-2: 2
Thread-2: 3
Thread-2: 4
Thread-2: 5
Thread-2: 6
Thread-2: 7
Thread-2: 8
Thread-2: 9
Thread-2: 10

Vậy nếu ta muốn chạy đồng thời hàm process_data với 2 bộ tham số trên cùng 1 lúc thì sao? Bởi rõ ràng việc chờ nhau để thực thi một cách tuần tự ở đây là không cần thiết. Threading sẽ giúp chúng ta xử lý vấn đề này.

2. Threads

Để có thể tạo nhiều threads, ta sẽ sử dụng thư viện threading, 1 thư viện built-in của Python. Dưới đây là đoạn code để giúp hàm process_data có thể chạy đồng thời trong 2 threads khác nhau với thư viện này:

import time
import threading

def process_data(name: str, count: int):
    print(f"Starting {name}...")

    for i in range(count):
        print(name, i+1, sep=": ")
        time.sleep(1)

if __name__=="__main__":
    thread_one = threading.Thread(target=process_data, args=("Thread-1", 10))
    thread_two = threading.Thread(target=process_data, args=("Thread-2", 5))
    
    thread_one.start()
    time.sleep(3)
    thread_two.start()

    thread_one.join()
    thread_two.join()

  • Đoạn code threading.Thread() nhằm tạo ra một thead mới (= asyncio.create_task khi xử lý bất đồng bộ với asyncio). Nhớ rằng ở đây ta mới tạo ra chứ chưa chạy.
  • Thread sẽ thực sự chạy khi gọi thread_one.start().
  • thread_one.join(), việc gọi hàm này giúp cho chương trình chờ cho tới khi 2 threads được thực thi xong thì mới thực hiện đoạn code bên dưới (nếu có), =await khi xử lý bất đồng bộ (mình sẽ sớm ra bài về bất đồng bộ nha 😁).

    Khi thread_one được start() với tham số ("Thread-1", 10), nó sẽ mất tổng cộng 10s để chạy xong (do với mỗi giá trị của biến count thì sleep 1s). Nhìn vào đoạn chương trình ở hàm main, ta thấy sau khi start thread_one sẽ chỉ chờ nó 3s, sau đó start thread_two nên chắc mọi người cũng dễ dàng đoán được là thread_one mới chạy được 3s (mới in ra được đến số 3 (trên tổng cộng count=10 số cần in) thì thread_two chen vào, chiếm quyền in. Lúc này 2 thread sẽ cùng đồng thời in các giá trị ra màn hình. Kết quả như sau:
Starting Thread-1...
Thread-1: 1
Thread-1: 2
Thread-1: 3
Starting Thread-2...
Thread-2: 1
Thread-1: 4
Thread-2: 2
Thread-1: 5
Thread-2: 3
Thread-1: 6
Thread-2: 4
Thread-1: 7
Thread-2: 5
Thread-1: 8
Thread-1: 9
Thread-1: 10

Tèn ten, đúng như kỳ vọng!

3. Khác gì bất đồng bộ?

Bất đồng bộ (asynchronous) cho phép xử lý nhiều task trên cùng một thread, threading cho phép đẻ ra nhiều thread khác nhau để chạy song song.
Thông thường, thì khi 1 task nặng đang chạy sẽ block task khác, do đó async sẽ dùng 1 cơ chế (là event loop, nếu bạn không hiểu thì kệ nó đi, haha) để cho phép 1 task đang thực thi nhưng chương trình vẫn có thể nhận task khác về để thực thi (cho phép nhận thui nhá, còn vẫn phải chờ task trước xong đã, 1 thread mà). Còn threading là cho phép xử lý song song đồng thời luôn.

4. Lock

Hơi ngược đời chút, làm cách nào để chờ 1 thread chạy xong thì mới cho thread khác chạy? (Dù mục đích thread là ngăn điều này 😅). Câu trả lời là dùng lock (threading.Lock).
Tuy nhiên trước khi nghiên cứu lock, ta thấy cũng có thể dùng thread.join() để chờ 1 thread thực thi xong như trên mình có nói. VD đoạn code sau:

import time
import threading

def counter(limit: int, name: str):
    for i in range(limit):
        time.sleep(0.5)
        print(name, i+1, sep=": ")

def task1():
    counter(5, "T-1")

def task2():
    counter(5, "T-2")

def main():
    thread1 = threading.Thread(target=task1)
    thread2 = threading.Thread(target=task2)

    thread1.start()
    thread1.join()
    thread2.start()
    thread2.join()

if __name__=="__main__":
    main()

Sau khi thread1 được start, ta dùng thread1.join() để chờ cho thread1 này thực thi xong thì mới thread2.start().
Do đó, kết quả là:

T-1: 1
T-1: 2
T-1: 3
T-1: 4
T-1: 5
T-2: 1
T-2: 2
T-2: 3
T-2: 4
T-2: 5

Great! Tuy nhiên, nhiều người lại không thích dùng join() (vì không có join() thì code nó vẫn chạy 😎). Lúc này ta sẽ dùng lock như đoạn code dưới:

import time
import threading

lock = threading.Lock()

def counter(limit: int, name: str):
    for i in range(limit):
        time.sleep(0.5)
        print(name, i+1, sep=": ")

def task1():
    lock.acquire()
    counter(5, "T-1")
    lock.release()

def task2():
    lock.acquire()
    counter(5, "T-2")
    lock.release()

def main():
    thread = threading.Thread(target=task1)
    thread2 = threading.Thread(target=task2)

    thread.start()
    thread2.start()

if __name__=="__main__":
    main()
  • Trong hàm task1, lock.acquire() sẽ nói rằng: ê, chỉ chạy thread nào giữ lock này thôi nhá, bao giờ lock này được release() thì mới đến thread khác.
  • Trong hàm task2 cũng là acquire() lock nhưng sẽ khác với lock của task1 (mỗi lần lock.acquire() sẽ có 1 lock khác nhau, chỉ áp dụng với thread đó cho tới khi được release thì không còn nữa).
  • Lưu ý là ở đây task nào cũng phải acquire 1 lock thì nó mới bị lock nha, chứ nếu chỉ acquiretask1 và bỏ ở task2 thì task2 sẽ không bị lock. Do cơ chế của lock thôi: lock sẽ đảm bảo tại một thời điểm chỉ có 1 thread được giữ lock và chạy, do đó nó chỉ quản lý được các thread được gán lock, chứ nếu không gán lock thì nó cũng chịu.
    Yes, và đây là kết quả, giống hệt dùng join() dù không dùng join() 😳:
T-1: 1
T-1: 2
T-1: 3
T-1: 4
T-1: 5
T-2: 1
T-2: 2
T-2: 3
T-2: 4
T-2: 5

Thế nếu ta tạo 1 thread mới trong chính thread2 thì điều gì sẽ xảy ra? Xét đoạn code:

import time
import threading

lock = threading.Lock()

def counter(limit: int, name: str):
    for i in range(limit):
        time.sleep(0.5)
        print(name, i+1, sep=": ")

def task1():
    lock.acquire()
    counter(5, "T-1")
    lock.release()

def task2():
    lock.acquire()
    thread3 = threading.Thread(target=task3)
    thread3.start()
    counter(5, "T-2")
    lock.release()

def task3():
    counter(5, "T-3")

def main():
    thread = threading.Thread(target=task1)
    thread2 = threading.Thread(target=task2)

    thread.start()
    thread2.start()

if __name__=="__main__":
    main()
  • thread3 (chạy task3) được khai báo và start() trong chính task2.
  • Khi này, task2task3 sẽ có thể xảy ra đồng thời, và cùng giữ lock, trong khi task1 thì phải độc lập. Hãy xem kết quả là hiểu ngay:
T-1: 1
T-1: 2
T-1: 3
T-1: 4
T-1: 5
T-3: 1
T-2: 1
T-3: 2
T-2: 2
T-3: 3
T-2: 3
T-3: 4
T-2: 4
T-3: 5
T-2: 5

OK, khó hơn chút này, đoạn code dưới đây tạo ra một threadacquire lock (là thread2) trong 1 thread acquire lock (là thread1). Hãy đoán xem điều gì sẽ xảy ra:

import time
import threading

lock = threading.Lock()

def counter(limit: int, name: str):
    for i in range(limit):
        time.sleep(0.5)
        print(name, i+1, sep=": ")

def task1():
    lock.acquire()
    thread2 = threading.Thread(target=task2)
    thread2.start()
    counter(5, "T-1")
    lock.release()

def task2():
    lock.acquire()
    counter(5, "T-2")
    lock.release()

def main():
    thread1 = threading.Thread(target=task1)
    thread1.start()

if __name__=="__main__":
    main()

Hãy xem kết quả nhé:

T-1: 1
T-1: 2
T-1: 3
T-1: 4
T-1: 5
T-2: 1
T-2: 2
T-2: 3
T-2: 4
T-2: 5

Đơn giản thôi, thread2acquire lock trong thread1 nhưng mà lock đang bị thread1 giữ, chưa được release nên nó phải chờ cho thread1 thực thi cho tới khi nó release lock, rồi nó mới có lock để thực thi. => Từ đây có thể đoán được là nếu bỏ lock.release() trong task1 thì task2 sẽ không bao giờ có cơ hội để thực thi và chương trình sẽ phải đợi mãi mãi (hãy thử nhé!).

5. Daemon Thread

Là loại thread có quyền ưu tiên thấp. Nó chỉ được giữ để thực thi khi có ít nhất 1 thread đang sống, còn không thì: bạn bè chết hết rồi, nó sẽ chết theo.
Hãy xem đoạn code tạo daemon thread dưới:

import threading
import time

def infinite_loop():
    while True:
        print(time.time())
        time.sleep(1)

if __name__=="__main__":
    thread = threading.Thread(target=infinite_loop, daemon=True)
    thread.start()

Mặc định khi threading.Thread(), ta tạo ra 1 thread có quyền ưu tiên cao (daemon=False), nên nếu muốn tạo daemon thì ta chỉ định như đoạn code trên.

  • Nếu không có daemon=True, hàm infinite_loop sẽ in ra thời gian sau mỗi 1s và không bao giờ dừng.
  • Còn với daemon thread như trên nó chỉ in ra 1 lần, khi thread chạy hàm main exit thì nó cũng... nghỉ luôn. Kết quả đây:
1691860930.767461

6. Semaphores

Với lock, ta khiến chương trình chờ 1 thread chạy xong trước khi sang thread khác, còn nếu muốn chờ nhiều (n) threads chạy đồng thời xong, trước khi chạy sang n threads kế tiếp thì sao? => Ta có Semaphores.
Đoạn code dưới đây tạo ra 1 Semaphores cho phép 5 threads chạy đồng thời cùng lúc:

import time
import threading

sem = threading.Semaphore(5)

def process_something(id: int):
    sem.acquire()
    print(f"{id} -> Running!")
    time.sleep(5)
    print(f"{id} -> Finished!")
    sem.release()

if __name__=="__main__":
    for i in range(10):
        time.sleep(0.5)
        thread = threading.Thread(target=process_something, kwargs={"id": i+1})
        thread.start()

Rất giống cấu trúc acquire-release của lock đúng không. Khi có bất kỳ thread nào chạy xong thì thread đang chờ sẽ được vào chạy luôn. Do đó, kết quả sẽ là:

1 -> Running!
2 -> Running!
3 -> Running!
4 -> Running!
5 -> Running!
1 -> Finished!
6 -> Running!
2 -> Finished!
7 -> Running!
3 -> Finished!
8 -> Running!
4 -> Finished!
9 -> Running!
5 -> Finished!
10 -> Running!
6 -> Finished!
7 -> Finished!
8 -> Finished!
9 -> Finished!
10 -> Finished!

Lúc đầu, 5 threads được chạy đồng thời. Sau đó có 1 threads chạy xong (thread 1), lập tức thread tiếp theo được vào (thread 6),...

7. Context manager cho Lock/Semaphores

Để tránh quên release mỗi lần acquire, dẫn đến chương trình không thể thoát được, ta có thể sử dụng context manager với từ khóa with. Theo đó, việc release sẽ được tự động thực thi chừng nào đoạn code trong with được chạy xong.
Ví dụ đoạn code Semaphores phía trên có thể được viết lại như sau:

import time
import threading

sem = threading.Semaphore(5)

def process_something(id: int):
    with sem:
        print(f"{id} -> Running!")
        time.sleep(5)
        print(f"{id} -> Finished!")

if __name__=="__main__":
    for i in range(10):
        time.sleep(0.5)
        thread = threading.Thread(target=process_something, kwargs={"id": i+1})
        thread.start()

Kết quả trả về sẽ không có gì thay đổi với việc dùng acquire-release.

8. Race condition

Khi có nhiều thread cùng thay đổi vào 1 biến global, kết quả cuối cùng của biến đó sẽ không xác định được, đây chính là race condition. Xét VD sau:

import threading
import time

lock = threading.Lock()

def edit(operation: str, amount: int, repeat: int):
    global value
    
    for _ in range(repeat):
        temp: int = value
        time.sleep(0)
        if operation == "add":
            temp += amount
        elif operation == "subtract":
            temp -= amount

            time.sleep(0)
            value = temp

if __name__=="__main__":
    value: int = 0

    adder = threading.Thread(target=edit, args=("add", 100, 1_000_000))
    subtractor = threading.Thread(target=edit, args=("subtract", 100, 1_000_000))

    adder.start()
    subtractor.start()

    adder.join()
    subtractor.join()
    print(f"{value = }")

Kết quả mong muốn là 0 do ta cộng vào biến value 1,000,000 lần giá trị 100 sau đó lại trừ đi 1,000,000 lần giá trị 100.
Tuy nhiên, khi chạy lại cho ra giá trị không bằng 0. Thậm chí, các lần chạy khác nhau cho ra giá trị không giống nhau.
Điều này diễn ra là do 2 threads đang truy cập song song vào biến value, đoạn code sleep khiến có thể một phép cộng chưa thực thi xong, giá trị value chưa được update thì phép trừ đã được thực thi. Do đó, phép trừ khi này phải thực thi trên giá trị chưa cập nhật.

Để giải quyết vấn đề này, chúng ta sẽ dùng lock:

import threading
import time

lock = threading.Lock()

def edit(operation: str, amount: int, repeat: int):
    global value

    with lock:
        for _ in range(repeat):
            temp: int = value
            time.sleep(0)
            if operation == "add":
                temp += amount
            elif operation == "subtract":
                temp -= amount

            time.sleep(0)
            value = temp

if __name__=="__main__":
    value: int = 0

    adder = threading.Thread(target=edit, args=("add", 100, 1_000_000))
    subtractor = threading.Thread(target=edit, args=("subtract", 100, 1_000_000))

    adder.start()
    subtractor.start()

    adder.join()
    subtractor.join()
    print(f"{value = }")

Kết quả trả về sẽ luôn bằng 0 như kỳ vọng.

Cảm ơn các bạn đã đọc hết bài này ☺️

Tham khảo:

  • The Complete Guide To Mastering Python In 2023 - Udemy

All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí