Tất tần tật về Multi-processing trong Python
Khác so với multi-threading, có thể chỉ chạy trên 1 process => hệ điều hành sẽ phải lập lịch để các tác vụ chạy đồng thời. Với multi-processing, các tác vụ sẽ chạy song song và hầu như cô lập hoàn toàn về tài nguyên máy tính (CPU, bộ nhớ,...). Đây là một khái niệm cơ bản và rất quan trọng trong lập trình máy tính.
Code mà mình giới thiệu bên dưới đôi khi sẽ sử dụng các hàm helper sau, mình copy lên đây trước để các bạn có thể chạy được nha:
file: time_stuff.py
from time import perf_counter
from functools import wraps
from datetime import datetime
def get_time(func):
@wraps(func)
def wrapper(*args, **kwargs):
s = perf_counter()
func(*args, **kwargs)
e = perf_counter()
print(f"Time: {e-s} seconds")
return wrapper
def timestamp() -> str:
return f"{datetime.now():'%H:%M:%S')}"
def kill_time():
for _ in range(10**8):
pass
1. Processes
- Nhắc lại: Chạy multi-threading => các thread dùng chung tài nguyên (CPU, bộ nhớ,...). Chạy multi-processing => các process tách biệt về tài nguyên (giả sử máy có 2 CPU thì 2 thread sẽ chạy song song. trên 2 CPU). => Multi-thread có thể chạy trên cùng 1 process.
- Ví dụ dưới tạo ra 2 process là
process1
vàprocess2
sử dụng thư việnmultiprocessing
. Giống với multi-threading, để các process này chạy, ta gọi tới methodstart()
. Để chờ 1 tiến trình cho đến khi việc thực thi hoàn tất ta gọi methodjoin()
. file: main.py
import multiprocessing as mp
from time_stuff import get_time, timestamp, kill_time
import os
def func(param):
print(f"Starting {mp.current_process().name} ({os.getpid()})... ({timestamp()})")
kill_time()
print(f"{os.getpid()} finished! ({timestamp()})")
@get_time
def main():
process1 = mp.Process(name="Process-1", target=func, args=("Sample1",))
process2 = mp.Process(name="Process-2", target=func, args=("Sample2",))
process1.start()
process2.start()
process1.join()
process2.join()
if __name__ == "__main__":
main()
Kết quả là:
Starting Process-2 (9038)... ('22:30:39'))
Starting Process-1 (9037)... ('22:30:39'))
9037 finished! ('22:30:40'))
9038 finished! ('22:30:40'))
Time: 1.1100460829911754 seconds
Nhận thấy:
- 2 process này được bắt đầu và kết thúc cùng một thời điểm, nhanh hơn multi-threading (hãy thử) => 2 process chạy song song trên tài nguyên khác nhau.
process1
kết thúc trướcprocess2
(có được là do methodjoin()
chờprocess1
fininshed trước khi choprocess2
finished) => nếu bỏjoin()
sẽ có xác xuấtprocess2
xong trướcprocess1
. Lưu ý:- Do tạo ra 1 process thì phải setup tài nguyên đọc lập => thời gian tạo process thường lâu hơn tạo thread => đối với những tác vụ nhẹ, cần cân nhắc có nên dùng process không vì có thể sẽ chậm hơn dùng thread hay chạy thông thường (không dùng thread/process).
2. Pool
- Pool giúp gom lại các task rồi phân bổ ra các process mà máy tính có để chạy song song.
2.1. Pool Map
- Xem xét ví dụ dưới đây, ta dùng thuộc tính
map
của pool:
import multiprocessing as mp
from time_stuff import get_time, kill_time
import time
def convert_to_x(number: int) -> str:
time.sleep(2)
return number * "x"
@get_time
def main():
print(f"Core available: {mp.cpu_count()}")
values: tuple[int, ...] = tuple(range(1, 9))
with mp.Pool() as pool:
results: list[str] = pool.map(convert_to_x, values)
print(f"Results: {results}")
if __name__ == "__main__":
main()
- Kết quả thu được:
Core available: 8
Results: ['x', 'xx', 'xxx', 'xxxx', 'xxxxx', 'xxxxxx', 'xxxxxxx', 'xxxxxxxx']
Time: 2.311342832981609 seconds
=> Máy mình đang có sẵn 8 CPU, pool giúp mình chạy hàm convert_to_x
song song trên 8 CPU đó => Tổng thời gian chỉ xấp xỉ 2s (để ý là hàm convert_to_x
sẽ sleep
2s) => chúng đã thực sự được chạy song song (thời gian tổng > 2s là do máy tính cần thời gian để khởi tạo lên các process).
- Vậy giả sử ta cần chạy 9 task thì sao? Sửa đoạn code thành
values: tuple[int, ...] = tuple(range(1, 10))
để tạo ra 9 tasks. Kết quả:
Core available: 8
Results: ['x', 'xx', 'xxx', 'xxxx', 'xxxxx', 'xxxxxx', 'xxxxxxx', 'xxxxxxxx', 'xxxxxxxxx']
Time: 4.298837750044186 seconds
=> Thời gian xấp xỉ 4s là do cùng 1 thời điểm, pool chỉ có thể phân bổ tối đa 8 tasks vào 8 CPU để chạy song song => task thứ 9 phải chờ để có 1 trong 8 task đó xong => Gấp đôi thời gian thực thi.
-
Đôi khi ta có 8 CPU nhưng chỉ muốn pool sử dụng tối đa 4 CPU 1 lúc thôi chẳng hạn. Thật đơn giản, chỉ cần truyền số lượng process khi khởi tạo Pool:
with mp.Pool(processes=4) as pool
=> Khi này, nếu chạy 8 tasks thì sẽ mất xấp xỉ 4s do ta quy định 1 lượt chỉ chạy được 4 tasks (mỗi task mất ~2s). -
Tóm lại, pool map nhận input là 1 tuple các giá trị (
values
) cần truyền vào hàm tính toán => nó lấy ra từng giá trị, truyền chính xác giá trị đó vào hàm => phân bổ vào các process để thực thi.
2.2. Pool Starmap
- Pool starmap cũng nhận input là tuple các giá trị (
values
), nhưng trước khi truyền vào hàm, nó sẽ unpack ra trước. - Xem ví dụ sau để hiểu rõ:
import multiprocessing as mp
from time_stuff import get_time
import time
def add_numbers(*args) -> float:
time.sleep(2)
return sum(args)
@get_time
def main():
print(f"Core available: {mp.cpu_count()}")
values = ((1, 2, 3), (3, 4), (5, 6), (7, 8, 10, 12))
with mp.Pool() as pool:
results: list[str] = pool.starmap(add_numbers, values)
print(f"Results: {results}")
if __name__ == "__main__":
main()
- Kết quả là:
Core available: 8
Results: [6, 7, 11, 37]
Time: 2.3035353750456125 seconds
- Nếu dùng
map
thay vìstarmap
thì sẽ có lỗi:
TypeError: unsupported operand type(s) for +: 'int' and 'tuple'
=> Do map truyền nguyên vẹn tuple (ví dụ (1, 2, 3)
) vào hàm add_numbers
=> args=((1, 2, 3),)
=> lúc này chương trình sẽ thực thi sum(((1, 2, 3),))
gây ra lỗi như trên.
=> starmap
giải quyết được điều này do nó unpack tuple ra => args=(1, 2, 3)
=> thực thi sum(1, 2, 3)
=> không gây ra lỗi.
2.3. Xử lý các hàm khác nhau
- Cũng chỉ dùng
map
thôi nhưng khéo léo một chút kết hợp vớifunctools.partital
thì hoàn toàn có thể truyền nhiều hàm vào các process khác nhau. functools.partitial
giúp biến hàm với số lượng tham số bất kỳ thành hàm không có tham số (hỗ trợ nạp trước tham số vào hàm).- Xem xét ví dụ dưới:
import multiprocessing as mp
from time_stuff import get_time
import functools
import time
def func_a(param):
time.sleep(2)
return param
def func_b(param):
time.sleep(2)
return param
def func_c(param, param2):
time.sleep(2)
return param
def map_func(func):
return func()
@get_time
def main():
print(f"Core available: {mp.cpu_count()}")
a = functools.partial(func_a, "A")
b = functools.partial(func_b, "B")
c = functools.partial(func_c, "C", "C2")
with mp.Pool() as pool:
results: list[str] = pool.map(map_func, [a, b, c])
print(f"Results: {results}")
if __name__ == "__main__":
main()
- Kết quả là:
Core available: 8
Results: ['A', 'B', 'C']
Time: 2.322280750002392 seconds
- Như vậy, các hàm
func_a
,func_b
vàfunc_c
với các chức năng khác nhau và input/output khác nhau hoàn toàn có thể truyền và thực thi trên các process khác nhau.
3. Giao tiếp giữa các process
3.1. Vấn đề về tài nguyên chung
- Do các process khác nhau có tài nguyên (bộ nhớ, CPU,...) độc lập nên khi lập trình nếu không để ý có thể gây ra kết quả không mong muốn.
- Xem xét ví dụ sau:
import multiprocessing as mp
numbers: list[int] = [0]
def func():
global numbers
numbers.extend([1, 2, 3])
print(f"Process data: {numbers}")
def main():
process = mp.Process(target=func)
process.start()
process.join()
print(f"Main data: {numbers}")
if __name__ == "__main__":
main()
- Kết quả là:
Process data: [0, 1, 2, 3]
Main data: [0]
=> Có thể thấy rằng, lúc tạo ra process mới mp.Process()
, nó sẽ có vùng nhớ riêng => khi dùng biến global numbers
, nó sẽ khai báo và sử dụng trên vùng nhớ của riêng nó chứ không phải biến đã khai báo trong main process. Do vậy, khi in ra ta được giá trị biến numbers
không giống nhau, dù cùng thực hiện phép .extend([1, 2, 3]
.
3.2. Pipe
- Thư viện
multiprocessing
cung cấp classPipe
giúp gửi dữ liệu giữa các process. Xem xét ví dụ sau về cáchPipe
hoạt động:
from multiprocessing import Pipe
def main():
c1, c2 = Pipe()
c2.send("Hello, World!")
print("Data to be received:", c1.poll())
obj = c1.recv()
print(obj)
print("Data to be received:", c1.poll())
if __name__=="__main__":
main()
Kết quả trả về:
Data to be received: True
Hello, World!
Data to be received: False
- Ở trên,
c1.poll()
sẽ trả lời câu hỏi: liệu có còn dữ liệu để gửi hoặc nhận hay không? => nếu không gửi gì mà vẫn kích hoạt nhận (bỏ dòng codec2.send("Hello, World!")
), nó sẽ làm treo máy do hệ thống cứ chờ để nhận dữ liệu trong khi dữ liệu không được gửi. - Để giải quyết, đơn giản có thêm 1 lệnh
if
như sau:
if c2.poll():
obj = c2.recv()
print(obj)
-
Để mỗi thực thể trong
Pipe
đều có thể vừa gửi, vừa nhận, ta define biếnduplex=True
khi khởi tạo như sauc1, c2 = Pipe(duplex=True)
. -
OK, giờ hãy xem cách
Pipe
hoạt động để truyền dữ liệu giữa các process:
import os
import time
from random import randint
from multiprocessing import Process, Pipe, current_process
def sender(connection):
print(f"Sender: {current_process().name} ({os.getpid()})")
for _ in range(5):
rand: int = randint(1, 10)
connection.send(rand)
print(f"{rand} was sent...")
time.sleep(.5)
print('Sending "None"...')
connection.send(None)
print("Done with sending data!")
def receiver(connection):
print(f"Receiver: {current_process().name} ({os.getpid()})")
while True:
data = connection.recv()
print(f"Received: {data}")
if data is None:
break
print("Done with receiving data!")
def main():
c1, c2 = Pipe(duplex=True)
sender_process = Process(target=sender, args=(c1,))
receiver_process = Process(target=receiver, args=(c2,))
sender_process.start()
receiver_process.start()
sender_process.join()
receiver_process.join()
if __name__=="__main__":
main()
- Kết quả là:
Sender: Process-1 (7213)
10 was sent...
Receiver: Process-2 (7214)
Received: 10
9 was sent...
Received: 9
10 was sent...
Received: 10
6 was sent...
Received: 6
4 was sent...
Received: 4
Sending "None"...
Done with sending data!
Received: None
Done with receiving data!
- Ở ví dụ trên, ta tạo ra 2 process (1 làm nhiệm vụ gửi dữ liệu và 1 làm nhiệm vụ nhận).
Pipe()
trả về 2 thực thể kết nối với nhau (làc1
vàc2
). Ta truyền 2 thực thể này vào 2 process để chúng gửi/nhận dữ liệu. Quá trình gửi/nhận này sẽ kết thúc nếu sender gửi dữ liệuNone
sang bên receiver.
3.3. Queue
- Thay vì dùng
Pipe()
, ta có thể dùngQueue()
để các processput
dữ liệu vào đó vàget
ra nếu muốn theo thứ tự (vào trước ra trước). - Xem đoạn code dưới đây:
from multiprocessing import Process, Queue
def insert_val(queue: Queue, val: int):
print(f"Inserting {val} into the queue...")
queue.put(val)
def main():
queue = Queue()
processes = [Process(target=insert_val, args=(queue, i)) for i in range(5)]
for process in processes:
process.start()
for process in processes:
process.join()
result = [queue.get() for _ in range(len(processes))]
print(result)
if __name__=="__main__":
main()
- Kết quả in ra như sau (Lưu ý: thứ tự ở khi chạy ở các máy khác nhau có thể khác nhau, do các process chạy song song => ta không thể biết được là process nào input xong giá trị vào queue trước, process nào input xong sau):
Inserting 0 into the queue...
Inserting 1 into the queue...
Inserting 4 into the queue...
Inserting 2 into the queue...
Inserting 3 into the queue...
[0, 1, 4, 2, 3]
-
Ví dụ trên tạo ra 5 process input 5 giá trị từ 0 đến 4 vào queue, có thể thấy thứ tự các giá trị khi insert vào queue sẽ được giữ khi ta get dữ liệu ra khỏi queue.
-
OK, thứ tự trong
Queue
thì sẽ được giữ, nhưng như trên có đề cập, vấn đề là ta không kiểm soát được process nào sẽ put vào queue trước = ta muốn kết quả thu về phải là list[0, 1, 2, 3, 4]
chứ không random thứ tự như trên. -
Để thực hiện điều này, ta cần đánh thứ tự (indentifier) cho dữ liệu trước khi chúng được
put
vào queue, sau khiget
được kết quả từ queue, dễ dàng sắp xếp lại theo indentifier này. Xem ví dụ sau:
from multiprocessing import Process, Queue
import time
def square_num(identifier: int, num: int, queue: Queue):
time.sleep(2)
queue.put((identifier, num ** 2))
def main():
queue: Queue = Queue()
data: list[int] = list(range(1, 9))
processes = [Process(target=square_num, args=(i, data[i], queue)) for i in range(len(data))]
for process in processes:
process.start()
for process in processes:
process.join()
unsorted = [queue.get() for _ in range(len(data))]
print(unsorted)
sorted_data = sorted(unsorted, key=lambda x: x[0])
print(sorted_data)
if __name__=="__main__":
main()
- Kết quả trả về:
[(0, 1), (2, 9), (3, 16), (1, 4), (4, 25), (6, 49), (7, 64), (5, 36)]
[(0, 1), (1, 4), (2, 9), (3, 16), (4, 25), (5, 36), (6, 49), (7, 64)]
=> Ở đoạn code trên, thay vì chỉ input mỗi dữ liệu là biến num
vào queue, ta input thêm indentifier (thứ tự ban đầu của dữ liệu), do đó dễ dàng có thể sắp xếp lại sau khi các process xử lý xong và đẩy kết quả vào queue.
4. Lock & Semaphores
- Giống với multi-threading, ta cũng có thể dùng
lock
vàsemaphores
để giới hạn số lượng process có thể chạy trong 1 thời điểm nhất định. - Xem ví dụ sau:
from time import sleep
from multiprocessing import Process, Semaphore, Lock
def func(p_lock, indentifier):
with p_lock:
sleep(2)
print(f">> Process {indentifier} is running...")
def main():
lock = Lock()
sem = Semaphore(3)
processes = [Process(target=func, args=(lock, i)) for i in range(5)]
for process in processes:
process.start()
for process in processes:
process.join()
if __name__=="__main__":
main()
- Ví dụ trên sử dụng
lock
, muốn sử dụngsemaphores
chỉ cần thay đổi:
processes = [Process(target=func, args=(sem, i)) for i in range(5)]
- Khi sử dụng
lock
, sẽ chỉ có 1 process có thể chạy tại 1 thời điểm. Bạn sẽ thấy chỉ 1 dòng lệnh được in ra, ví dụ process 3 acquire được lock thì sẽ in ra>> Process 3 is running...
, sau đó phải chờ 2s để process này thực thi xong, nó sẽ release lock để các process khác acquire và chạy. - Khi sử dụng
semaphores
, ở đoạn code trên ta cấu hình để 3 processes chạy song song cùng 1 lúc trongsemaphores
, vì thế sẽ có 3 dòng log được in ra đồng thời khi chạy, ví dụ:
>> Process 0 is running...
>> Process 3 is running...
>> Process 2 is running...
- Sau khi 3 processes này thực thi xong, 3 processes tiếp sẽ có thể chạy cho tới khi không còn process nào cần thực thi.
Cảm ơn các bạn đã đọc hết bài này, mọi thắc mắc có thể để dưới phần bình luận nha.
Tham khảo:
- The Complete Guide To Mastering Python In 2023 - Udemy
All rights reserved