Tất tật về Threading trong Python
Xin chào, mình thích viết ngắn và đi thẳng vấn đề 😴. Ở đây có tất cả những thứ bạn nên biết về Python threading.
1. Threading để làm gì?
Để xử lý đồng thời!
Một cách mặc định, Python sẽ xử lý từ trên xuống. Do đó, tại một thời điểm chỉ có một đoạn mã được thực thi. Lấy ví dụ đoạn code sau:
def process_data(name: str, count: int):
print(f"Starting {name}...")
for i in range(count):
print(name, i+1, sep=": ")
if __name__=="__main__":
process_data("Thread-1", 5)
process_data("Thread-2", 10)
Ở đây, ta muốn chạy hàm process_data
2 lần, mỗi lần truyền vào một tham số khác nhau. Python sẽ thực thi từ trên xuống, tức là chạy hàm này với bộ tham số ("Thread-1", 5)
trước, sau khi thực thi xong thì bắt đầu chạy hàm này với bộ tham số ("Thread-2", 10)
. Kết quả trả ra sẽ là:
Starting Thread-1...
Thread-1: 1
Thread-1: 2
Thread-1: 3
Thread-1: 4
Thread-1: 5
Starting Thread-2...
Thread-2: 1
Thread-2: 2
Thread-2: 3
Thread-2: 4
Thread-2: 5
Thread-2: 6
Thread-2: 7
Thread-2: 8
Thread-2: 9
Thread-2: 10
Vậy nếu ta muốn chạy đồng thời hàm process_data
với 2 bộ tham số trên cùng 1 lúc thì sao? Bởi rõ ràng việc chờ nhau để thực thi một cách tuần tự ở đây là không cần thiết. Threading sẽ giúp chúng ta xử lý vấn đề này.
2. Threads
Để có thể tạo nhiều threads, ta sẽ sử dụng thư viện threading
, 1 thư viện built-in của Python. Dưới đây là đoạn code để giúp hàm process_data
có thể chạy đồng thời trong 2 threads khác nhau với thư viện này:
import time
import threading
def process_data(name: str, count: int):
print(f"Starting {name}...")
for i in range(count):
print(name, i+1, sep=": ")
time.sleep(1)
if __name__=="__main__":
thread_one = threading.Thread(target=process_data, args=("Thread-1", 10))
thread_two = threading.Thread(target=process_data, args=("Thread-2", 5))
thread_one.start()
time.sleep(3)
thread_two.start()
thread_one.join()
thread_two.join()
- Đoạn code
threading.Thread()
nhằm tạo ra một thead mới (=asyncio.create_task
khi xử lý bất đồng bộ vớiasyncio
). Nhớ rằng ở đây ta mới tạo ra chứ chưa chạy. - Thread sẽ thực sự chạy khi gọi
thread_one.start()
. thread_one.join()
, việc gọi hàm này giúp cho chương trình chờ cho tới khi 2 threads được thực thi xong thì mới thực hiện đoạn code bên dưới (nếu có), =await
khi xử lý bất đồng bộ (mình sẽ sớm ra bài về bất đồng bộ nha 😁).
Khithread_one
đượcstart()
với tham số("Thread-1", 10)
, nó sẽ mất tổng cộng 10s để chạy xong (do với mỗi giá trị của biếncount
thì sleep 1s). Nhìn vào đoạn chương trình ở hàmmain
, ta thấy sau khi startthread_one
sẽ chỉ chờ nó 3s, sau đó startthread_two
nên chắc mọi người cũng dễ dàng đoán được làthread_one
mới chạy được 3s (mới in ra được đến số 3 (trên tổng cộng count=10 số cần in) thìthread_two
chen vào, chiếm quyền in. Lúc này 2 thread sẽ cùng đồng thời in các giá trị ra màn hình. Kết quả như sau:
Starting Thread-1...
Thread-1: 1
Thread-1: 2
Thread-1: 3
Starting Thread-2...
Thread-2: 1
Thread-1: 4
Thread-2: 2
Thread-1: 5
Thread-2: 3
Thread-1: 6
Thread-2: 4
Thread-1: 7
Thread-2: 5
Thread-1: 8
Thread-1: 9
Thread-1: 10
Tèn ten, đúng như kỳ vọng!
3. Khác gì bất đồng bộ?
Bất đồng bộ (asynchronous) cho phép xử lý nhiều task trên cùng một thread, threading cho phép đẻ ra nhiều thread khác nhau để chạy song song.
Thông thường, thì khi 1 task nặng đang chạy sẽ block task khác, do đó async sẽ dùng 1 cơ chế (là event loop, nếu bạn không hiểu thì kệ nó đi, haha) để cho phép 1 task đang thực thi nhưng chương trình vẫn có thể nhận task khác về để thực thi (cho phép nhận thui nhá, còn vẫn phải chờ task trước xong đã, 1 thread mà). Còn threading là cho phép xử lý song song đồng thời luôn.
4. Lock
Hơi ngược đời chút, làm cách nào để chờ 1 thread chạy xong thì mới cho thread khác chạy? (Dù mục đích thread là ngăn điều này 😅). Câu trả lời là dùng lock (threading.Lock
).
Tuy nhiên trước khi nghiên cứu lock, ta thấy cũng có thể dùng thread.join()
để chờ 1 thread thực thi xong như trên mình có nói.
VD đoạn code sau:
import time
import threading
def counter(limit: int, name: str):
for i in range(limit):
time.sleep(0.5)
print(name, i+1, sep=": ")
def task1():
counter(5, "T-1")
def task2():
counter(5, "T-2")
def main():
thread1 = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)
thread1.start()
thread1.join()
thread2.start()
thread2.join()
if __name__=="__main__":
main()
Sau khi thread1
được start
, ta dùng thread1.join()
để chờ cho thread1
này thực thi xong thì mới thread2.start()
.
Do đó, kết quả là:
T-1: 1
T-1: 2
T-1: 3
T-1: 4
T-1: 5
T-2: 1
T-2: 2
T-2: 3
T-2: 4
T-2: 5
Great! Tuy nhiên, nhiều người lại không thích dùng join()
(vì không có join()
thì code nó vẫn chạy 😎). Lúc này ta sẽ dùng lock
như đoạn code dưới:
import time
import threading
lock = threading.Lock()
def counter(limit: int, name: str):
for i in range(limit):
time.sleep(0.5)
print(name, i+1, sep=": ")
def task1():
lock.acquire()
counter(5, "T-1")
lock.release()
def task2():
lock.acquire()
counter(5, "T-2")
lock.release()
def main():
thread = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)
thread.start()
thread2.start()
if __name__=="__main__":
main()
- Trong hàm
task1
,lock.acquire()
sẽ nói rằng: ê, chỉ chạy thread nào giữ lock này thôi nhá, bao giờ lock này đượcrelease()
thì mới đến thread khác. - Trong hàm
task2
cũng làacquire()
lock nhưng sẽ khác với lock củatask1
(mỗi lầnlock.acquire()
sẽ có 1 lock khác nhau, chỉ áp dụng với thread đó cho tới khi đượcrelease
thì không còn nữa). - Lưu ý là ở đây task nào cũng phải
acquire
1 lock thì nó mới bị lock nha, chứ nếu chỉacquire
ởtask1
và bỏ ởtask2
thìtask2
sẽ không bị lock. Do cơ chế củalock
thôi:lock
sẽ đảm bảo tại một thời điểm chỉ có 1 thread được giữ lock và chạy, do đó nó chỉ quản lý được các thread được gán lock, chứ nếu không gán lock thì nó cũng chịu.
Yes, và đây là kết quả, giống hệt dùngjoin()
dù không dùngjoin()
😳:
T-1: 1
T-1: 2
T-1: 3
T-1: 4
T-1: 5
T-2: 1
T-2: 2
T-2: 3
T-2: 4
T-2: 5
Thế nếu ta tạo 1 thread mới trong chính thread2
thì điều gì sẽ xảy ra? Xét đoạn code:
import time
import threading
lock = threading.Lock()
def counter(limit: int, name: str):
for i in range(limit):
time.sleep(0.5)
print(name, i+1, sep=": ")
def task1():
lock.acquire()
counter(5, "T-1")
lock.release()
def task2():
lock.acquire()
thread3 = threading.Thread(target=task3)
thread3.start()
counter(5, "T-2")
lock.release()
def task3():
counter(5, "T-3")
def main():
thread = threading.Thread(target=task1)
thread2 = threading.Thread(target=task2)
thread.start()
thread2.start()
if __name__=="__main__":
main()
thread3
(chạytask3
) được khai báo vàstart()
trong chínhtask2
.- Khi này,
task2
vàtask3
sẽ có thể xảy ra đồng thời, và cùng giữ lock, trong khitask1
thì phải độc lập. Hãy xem kết quả là hiểu ngay:
T-1: 1
T-1: 2
T-1: 3
T-1: 4
T-1: 5
T-3: 1
T-2: 1
T-3: 2
T-2: 2
T-3: 3
T-2: 3
T-3: 4
T-2: 4
T-3: 5
T-2: 5
OK, khó hơn chút này, đoạn code dưới đây tạo ra một thread
có acquire
lock (là thread2
) trong 1 thread acquire
lock (là thread1
). Hãy đoán xem điều gì sẽ xảy ra:
import time
import threading
lock = threading.Lock()
def counter(limit: int, name: str):
for i in range(limit):
time.sleep(0.5)
print(name, i+1, sep=": ")
def task1():
lock.acquire()
thread2 = threading.Thread(target=task2)
thread2.start()
counter(5, "T-1")
lock.release()
def task2():
lock.acquire()
counter(5, "T-2")
lock.release()
def main():
thread1 = threading.Thread(target=task1)
thread1.start()
if __name__=="__main__":
main()
Hãy xem kết quả nhé:
T-1: 1
T-1: 2
T-1: 3
T-1: 4
T-1: 5
T-2: 1
T-2: 2
T-2: 3
T-2: 4
T-2: 5
Đơn giản thôi, thread2
có acquire
lock trong thread1
nhưng mà lock đang bị thread1
giữ, chưa được release
nên nó phải chờ cho thread1
thực thi cho tới khi nó release
lock, rồi nó mới có lock để thực thi.
=> Từ đây có thể đoán được là nếu bỏ lock.release()
trong task1
thì task2
sẽ không bao giờ có cơ hội để thực thi và chương trình sẽ phải đợi mãi mãi (hãy thử nhé!).
5. Daemon Thread
Là loại thread có quyền ưu tiên thấp. Nó chỉ được giữ để thực thi khi có ít nhất 1 thread đang sống, còn không thì: bạn bè chết hết rồi, nó sẽ chết theo.
Hãy xem đoạn code tạo daemon thread dưới:
import threading
import time
def infinite_loop():
while True:
print(time.time())
time.sleep(1)
if __name__=="__main__":
thread = threading.Thread(target=infinite_loop, daemon=True)
thread.start()
Mặc định khi threading.Thread()
, ta tạo ra 1 thread có quyền ưu tiên cao (daemon=False
), nên nếu muốn tạo daemon thì ta chỉ định như đoạn code trên.
- Nếu không có
daemon=True
, hàminfinite_loop
sẽ in ra thời gian sau mỗi 1s và không bao giờ dừng. - Còn với daemon thread như trên nó chỉ in ra 1 lần, khi thread chạy hàm main exit thì nó cũng... nghỉ luôn. Kết quả đây:
1691860930.767461
6. Semaphores
Với lock
, ta khiến chương trình chờ 1 thread chạy xong trước khi sang thread khác, còn nếu muốn chờ nhiều (n) threads chạy đồng thời xong, trước khi chạy sang n threads kế tiếp thì sao? => Ta có Semaphores.
Đoạn code dưới đây tạo ra 1 Semaphores cho phép 5 threads chạy đồng thời cùng lúc:
import time
import threading
sem = threading.Semaphore(5)
def process_something(id: int):
sem.acquire()
print(f"{id} -> Running!")
time.sleep(5)
print(f"{id} -> Finished!")
sem.release()
if __name__=="__main__":
for i in range(10):
time.sleep(0.5)
thread = threading.Thread(target=process_something, kwargs={"id": i+1})
thread.start()
Rất giống cấu trúc acquire-release
của lock
đúng không. Khi có bất kỳ thread nào chạy xong thì thread đang chờ sẽ được vào chạy luôn. Do đó, kết quả sẽ là:
1 -> Running!
2 -> Running!
3 -> Running!
4 -> Running!
5 -> Running!
1 -> Finished!
6 -> Running!
2 -> Finished!
7 -> Running!
3 -> Finished!
8 -> Running!
4 -> Finished!
9 -> Running!
5 -> Finished!
10 -> Running!
6 -> Finished!
7 -> Finished!
8 -> Finished!
9 -> Finished!
10 -> Finished!
Lúc đầu, 5 threads được chạy đồng thời. Sau đó có 1 threads chạy xong (thread 1), lập tức thread tiếp theo được vào (thread 6),...
7. Context manager cho Lock/Semaphores
Để tránh quên release
mỗi lần acquire
, dẫn đến chương trình không thể thoát được, ta có thể sử dụng context manager với từ khóa with
. Theo đó, việc release
sẽ được tự động thực thi chừng nào đoạn code trong with
được chạy xong.
Ví dụ đoạn code Semaphores phía trên có thể được viết lại như sau:
import time
import threading
sem = threading.Semaphore(5)
def process_something(id: int):
with sem:
print(f"{id} -> Running!")
time.sleep(5)
print(f"{id} -> Finished!")
if __name__=="__main__":
for i in range(10):
time.sleep(0.5)
thread = threading.Thread(target=process_something, kwargs={"id": i+1})
thread.start()
Kết quả trả về sẽ không có gì thay đổi với việc dùng acquire-release
.
8. Race condition
Khi có nhiều thread cùng thay đổi vào 1 biến global, kết quả cuối cùng của biến đó sẽ không xác định được, đây chính là race condition. Xét VD sau:
import threading
import time
lock = threading.Lock()
def edit(operation: str, amount: int, repeat: int):
global value
for _ in range(repeat):
temp: int = value
time.sleep(0)
if operation == "add":
temp += amount
elif operation == "subtract":
temp -= amount
time.sleep(0)
value = temp
if __name__=="__main__":
value: int = 0
adder = threading.Thread(target=edit, args=("add", 100, 1_000_000))
subtractor = threading.Thread(target=edit, args=("subtract", 100, 1_000_000))
adder.start()
subtractor.start()
adder.join()
subtractor.join()
print(f"{value = }")
Kết quả mong muốn là 0 do ta cộng vào biến value
1,000,000 lần giá trị 100 sau đó lại trừ đi 1,000,000 lần giá trị 100.
Tuy nhiên, khi chạy lại cho ra giá trị không bằng 0. Thậm chí, các lần chạy khác nhau cho ra giá trị không giống nhau.
Điều này diễn ra là do 2 threads đang truy cập song song vào biến value
, đoạn code sleep
khiến có thể một phép cộng chưa thực thi xong, giá trị value
chưa được update thì phép trừ đã được thực thi. Do đó, phép trừ khi này phải thực thi trên giá trị chưa cập nhật.
Để giải quyết vấn đề này, chúng ta sẽ dùng lock
:
import threading
import time
lock = threading.Lock()
def edit(operation: str, amount: int, repeat: int):
global value
with lock:
for _ in range(repeat):
temp: int = value
time.sleep(0)
if operation == "add":
temp += amount
elif operation == "subtract":
temp -= amount
time.sleep(0)
value = temp
if __name__=="__main__":
value: int = 0
adder = threading.Thread(target=edit, args=("add", 100, 1_000_000))
subtractor = threading.Thread(target=edit, args=("subtract", 100, 1_000_000))
adder.start()
subtractor.start()
adder.join()
subtractor.join()
print(f"{value = }")
Kết quả trả về sẽ luôn bằng 0 như kỳ vọng.
Cảm ơn các bạn đã đọc hết bài này ☺️
Tham khảo:
- The Complete Guide To Mastering Python In 2023 - Udemy
All rights reserved