+4

Tất tần tật về Multi-processing trong Python

Khác so với multi-threading, có thể chỉ chạy trên 1 process => hệ điều hành sẽ phải lập lịch để các tác vụ chạy đồng thời. Với multi-processing, các tác vụ sẽ chạy song song và hầu như cô lập hoàn toàn về tài nguyên máy tính (CPU, bộ nhớ,...). Đây là một khái niệm cơ bản và rất quan trọng trong lập trình máy tính.

Code mà mình giới thiệu bên dưới đôi khi sẽ sử dụng các hàm helper sau, mình copy lên đây trước để các bạn có thể chạy được nha: file: time_stuff.py

from time import perf_counter
from functools import wraps
from datetime import datetime

def get_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        s = perf_counter()
        func(*args, **kwargs)
        e = perf_counter()
        print(f"Time: {e-s} seconds")

    return wrapper

def timestamp() -> str:
    return f"{datetime.now():'%H:%M:%S')}"

def kill_time():
    for _ in range(10**8):
        pass

1. Processes

  • Nhắc lại: Chạy multi-threading => các thread dùng chung tài nguyên (CPU, bộ nhớ,...). Chạy multi-processing => các process tách biệt về tài nguyên (giả sử máy có 2 CPU thì 2 thread sẽ chạy song song. trên 2 CPU). => Multi-thread có thể chạy trên cùng 1 process.
  • Ví dụ dưới tạo ra 2 process là process1process2 sử dụng thư viện multiprocessing. Giống với multi-threading, để các process này chạy, ta gọi tới method start(). Để chờ 1 tiến trình cho đến khi việc thực thi hoàn tất ta gọi method join().
  • file: main.py
import multiprocessing as mp
from time_stuff import get_time, timestamp, kill_time
import os

def func(param):
    print(f"Starting {mp.current_process().name} ({os.getpid()})... ({timestamp()})")
    kill_time()
    print(f"{os.getpid()} finished! ({timestamp()})")

@get_time
def main():
    process1 = mp.Process(name="Process-1", target=func, args=("Sample1",))
    process2 = mp.Process(name="Process-2", target=func, args=("Sample2",))

    process1.start()
    process2.start()

    process1.join()
    process2.join()

if __name__ == "__main__":
    main()

Kết quả là:

Starting Process-2 (9038)... ('22:30:39'))
Starting Process-1 (9037)... ('22:30:39'))
9037 finished! ('22:30:40'))
9038 finished! ('22:30:40'))
Time: 1.1100460829911754 seconds

Nhận thấy:

  • 2 process này được bắt đầu và kết thúc cùng một thời điểm, nhanh hơn multi-threading (hãy thử) => 2 process chạy song song trên tài nguyên khác nhau.
  • process1 kết thúc trước process2 (có được là do method join() chờ process1 fininshed trước khi cho process2 finished) => nếu bỏ join() sẽ có xác xuất process2 xong trước process1. Lưu ý:
  • Do tạo ra 1 process thì phải setup tài nguyên đọc lập => thời gian tạo process thường lâu hơn tạo thread => đối với những tác vụ nhẹ, cần cân nhắc có nên dùng process không vì có thể sẽ chậm hơn dùng thread hay chạy thông thường (không dùng thread/process).

2. Pool

  • Pool giúp gom lại các task rồi phân bổ ra các process mà máy tính có để chạy song song.

2.1. Pool Map

  • Xem xét ví dụ dưới đây, ta dùng thuộc tính map của pool:
import multiprocessing as mp
from time_stuff import get_time, kill_time
import time

def convert_to_x(number: int) -> str:
    time.sleep(2)
    return number * "x"

@get_time
def main():
    print(f"Core available: {mp.cpu_count()}")

    values: tuple[int, ...] = tuple(range(1, 9))

    with mp.Pool() as pool:
        results: list[str] = pool.map(convert_to_x, values)
        print(f"Results: {results}")

if __name__ == "__main__":
    main()
  • Kết quả thu được:
Core available: 8
Results: ['x', 'xx', 'xxx', 'xxxx', 'xxxxx', 'xxxxxx', 'xxxxxxx', 'xxxxxxxx']
Time: 2.311342832981609 seconds

=> Máy mình đang có sẵn 8 CPU, pool giúp mình chạy hàm convert_to_x song song trên 8 CPU đó => Tổng thời gian chỉ xấp xỉ 2s (để ý là hàm convert_to_x sẽ sleep 2s) => chúng đã thực sự được chạy song song (thời gian tổng > 2s là do máy tính cần thời gian để khởi tạo lên các process).

  • Vậy giả sử ta cần chạy 9 task thì sao? Sửa đoạn code thành values: tuple[int, ...] = tuple(range(1, 10)) để tạo ra 9 tasks. Kết quả:
Core available: 8
Results: ['x', 'xx', 'xxx', 'xxxx', 'xxxxx', 'xxxxxx', 'xxxxxxx', 'xxxxxxxx', 'xxxxxxxxx']
Time: 4.298837750044186 seconds

=> Thời gian xấp xỉ 4s là do cùng 1 thời điểm, pool chỉ có thể phân bổ tối đa 8 tasks vào 8 CPU để chạy song song => task thứ 9 phải chờ để có 1 trong 8 task đó xong => Gấp đôi thời gian thực thi.

  • Đôi khi ta có 8 CPU nhưng chỉ muốn pool sử dụng tối đa 4 CPU 1 lúc thôi chẳng hạn. Thật đơn giản, chỉ cần truyền số lượng process khi khởi tạo Pool: with mp.Pool(processes=4) as pool => Khi này, nếu chạy 8 tasks thì sẽ mất xấp xỉ 4s do ta quy định 1 lượt chỉ chạy được 4 tasks (mỗi task mất ~2s).

  • Tóm lại, pool map nhận input là 1 tuple các giá trị (values) cần truyền vào hàm tính toán => nó lấy ra từng giá trị, truyền chính xác giá trị đó vào hàm => phân bổ vào các process để thực thi.

2.2. Pool Starmap

  • Pool starmap cũng nhận input là tuple các giá trị (values), nhưng trước khi truyền vào hàm, nó sẽ unpack ra trước.
  • Xem ví dụ sau để hiểu rõ:
import multiprocessing as mp
from time_stuff import get_time
import time

def add_numbers(*args) -> float:
    time.sleep(2)
    return sum(args)

@get_time
def main():
    print(f"Core available: {mp.cpu_count()}")

    values = ((1, 2, 3), (3, 4), (5, 6), (7, 8, 10, 12))

    with mp.Pool() as pool:
        results: list[str] = pool.starmap(add_numbers, values)
        print(f"Results: {results}")

if __name__ == "__main__":
    main()
  • Kết quả là:
Core available: 8
Results: [6, 7, 11, 37]
Time: 2.3035353750456125 seconds
  • Nếu dùng map thay vì starmap thì sẽ có lỗi:
TypeError: unsupported operand type(s) for +: 'int' and 'tuple'

=> Do map truyền nguyên vẹn tuple (ví dụ (1, 2, 3)) vào hàm add_numbers => args=((1, 2, 3),) => lúc này chương trình sẽ thực thi sum(((1, 2, 3),)) gây ra lỗi như trên. => starmap giải quyết được điều này do nó unpack tuple ra => args=(1, 2, 3) => thực thi sum(1, 2, 3) => không gây ra lỗi.

2.3. Xử lý các hàm khác nhau

  • Cũng chỉ dùng map thôi nhưng khéo léo một chút kết hợp với functools.partital thì hoàn toàn có thể truyền nhiều hàm vào các process khác nhau.
  • functools.partitial giúp biến hàm với số lượng tham số bất kỳ thành hàm không có tham số (hỗ trợ nạp trước tham số vào hàm).
  • Xem xét ví dụ dưới:
import multiprocessing as mp
from time_stuff import get_time
import functools
import time

def func_a(param):
    time.sleep(2)
    return param

def func_b(param):
    time.sleep(2)
    return param

def func_c(param, param2):
    time.sleep(2)
    return param

def map_func(func):
    return func()

@get_time
def main():
    print(f"Core available: {mp.cpu_count()}")

    a = functools.partial(func_a, "A")
    b = functools.partial(func_b, "B")
    c = functools.partial(func_c, "C", "C2")
    
    with mp.Pool() as pool:
        results: list[str] = pool.map(map_func, [a, b, c])
        print(f"Results: {results}")

if __name__ == "__main__":
    main()
  • Kết quả là:
Core available: 8
Results: ['A', 'B', 'C']
Time: 2.322280750002392 seconds
  • Như vậy, các hàm func_a, func_bfunc_c với các chức năng khác nhau và input/output khác nhau hoàn toàn có thể truyền và thực thi trên các process khác nhau.

3. Giao tiếp giữa các process

3.1. Vấn đề về tài nguyên chung

  • Do các process khác nhau có tài nguyên (bộ nhớ, CPU,...) độc lập nên khi lập trình nếu không để ý có thể gây ra kết quả không mong muốn.
  • Xem xét ví dụ sau:
import multiprocessing as mp

numbers: list[int] = [0]

def func():
    global numbers

    numbers.extend([1, 2, 3])
    print(f"Process data: {numbers}")

def main():
    process = mp.Process(target=func)
    process.start()
    process.join()

    print(f"Main data: {numbers}")

if __name__ == "__main__":
    main()
  • Kết quả là:
Process data: [0, 1, 2, 3]
Main data: [0]

=> Có thể thấy rằng, lúc tạo ra process mới mp.Process(), nó sẽ có vùng nhớ riêng => khi dùng biến global numbers, nó sẽ khai báo và sử dụng trên vùng nhớ của riêng nó chứ không phải biến đã khai báo trong main process. Do vậy, khi in ra ta được giá trị biến numbers không giống nhau, dù cùng thực hiện phép .extend([1, 2, 3].

3.2. Pipe

  • Thư viện multiprocessing cung cấp class Pipe giúp gửi dữ liệu giữa các process. Xem xét ví dụ sau về cách Pipe hoạt động:
from multiprocessing import Pipe

def main():
    c1, c2 = Pipe()

    c2.send("Hello, World!")
    print("Data to be received:", c1.poll())
    obj = c1.recv()
    print(obj)
    print("Data to be received:", c1.poll())

if __name__=="__main__":
    main()

Kết quả trả về:

Data to be received: True
Hello, World!
Data to be received: False
  • Ở trên, c1.poll() sẽ trả lời câu hỏi: liệu có còn dữ liệu để gửi hoặc nhận hay không? => nếu không gửi gì mà vẫn kích hoạt nhận (bỏ dòng code c2.send("Hello, World!")), nó sẽ làm treo máy do hệ thống cứ chờ để nhận dữ liệu trong khi dữ liệu không được gửi.
  • Để giải quyết, đơn giản có thêm 1 lệnh if như sau:
    if c2.poll():
        obj = c2.recv()
        print(obj)
  • Để mỗi thực thể trong Pipe đều có thể vừa gửi, vừa nhận, ta define biến duplex=True khi khởi tạo như sau c1, c2 = Pipe(duplex=True).

  • OK, giờ hãy xem cách Pipe hoạt động để truyền dữ liệu giữa các process:

import os
import time
from random import randint
from multiprocessing import Process, Pipe, current_process


def sender(connection):
    print(f"Sender: {current_process().name} ({os.getpid()})")

    for _ in range(5):
        rand: int = randint(1, 10)
        connection.send(rand)
        print(f"{rand} was sent...")
        time.sleep(.5)
    
    print('Sending "None"...')
    connection.send(None)
    print("Done with sending data!")
    
def receiver(connection):
    print(f"Receiver: {current_process().name} ({os.getpid()})")

    while True:
        data = connection.recv()
        print(f"Received: {data}")

        if data is None:
            break
        
    print("Done with receiving data!")
    
def main():
    c1, c2 = Pipe(duplex=True)
    sender_process = Process(target=sender, args=(c1,))
    receiver_process = Process(target=receiver, args=(c2,))

    sender_process.start()
    receiver_process.start()

    sender_process.join()
    receiver_process.join()

if __name__=="__main__":
    main()
  • Kết quả là:
Sender: Process-1 (7213)
10 was sent...
Receiver: Process-2 (7214)
Received: 10
9 was sent...
Received: 9
10 was sent...
Received: 10
6 was sent...
Received: 6
4 was sent...
Received: 4
Sending "None"...
Done with sending data!
Received: None
Done with receiving data!
  • Ở ví dụ trên, ta tạo ra 2 process (1 làm nhiệm vụ gửi dữ liệu và 1 làm nhiệm vụ nhận).
  • Pipe() trả về 2 thực thể kết nối với nhau (là c1c2). Ta truyền 2 thực thể này vào 2 process để chúng gửi/nhận dữ liệu. Quá trình gửi/nhận này sẽ kết thúc nếu sender gửi dữ liệu None sang bên receiver.

3.3. Queue

  • Thay vì dùng Pipe(), ta có thể dùng Queue() để các process put dữ liệu vào đó và get ra nếu muốn theo thứ tự (vào trước ra trước).
  • Xem đoạn code dưới đây:
from multiprocessing import Process, Queue

def insert_val(queue: Queue, val: int):
    print(f"Inserting {val} into the queue...")
    queue.put(val)

def main():
    queue = Queue()
    processes = [Process(target=insert_val, args=(queue, i)) for i in range(5)]

    for process in processes:
        process.start()
    
    for process in processes:
        process.join()

    result = [queue.get() for _ in range(len(processes))]
    print(result)

if __name__=="__main__":
    main()
  • Kết quả in ra như sau (Lưu ý: thứ tự ở khi chạy ở các máy khác nhau có thể khác nhau, do các process chạy song song => ta không thể biết được là process nào input xong giá trị vào queue trước, process nào input xong sau):
Inserting 0 into the queue...
Inserting 1 into the queue...
Inserting 4 into the queue...
Inserting 2 into the queue...
Inserting 3 into the queue...
[0, 1, 4, 2, 3]
  • Ví dụ trên tạo ra 5 process input 5 giá trị từ 0 đến 4 vào queue, có thể thấy thứ tự các giá trị khi insert vào queue sẽ được giữ khi ta get dữ liệu ra khỏi queue.

  • OK, thứ tự trong Queue thì sẽ được giữ, nhưng như trên có đề cập, vấn đề là ta không kiểm soát được process nào sẽ put vào queue trước = ta muốn kết quả thu về phải là list [0, 1, 2, 3, 4] chứ không random thứ tự như trên.

  • Để thực hiện điều này, ta cần đánh thứ tự (indentifier) cho dữ liệu trước khi chúng được put vào queue, sau khi get được kết quả từ queue, dễ dàng sắp xếp lại theo indentifier này. Xem ví dụ sau:

from multiprocessing import Process, Queue
import time

def square_num(identifier: int, num: int, queue: Queue):
    time.sleep(2)
    queue.put((identifier, num ** 2))

def main():
    queue: Queue = Queue()
    data: list[int] = list(range(1, 9))

    processes = [Process(target=square_num, args=(i, data[i], queue)) for i in range(len(data))]

    for process in processes:
        process.start()
    
    for process in processes:
        process.join()
    
    unsorted = [queue.get() for _ in range(len(data))]
    print(unsorted)

    sorted_data = sorted(unsorted, key=lambda x: x[0])
    print(sorted_data)

if __name__=="__main__":
    main()
  • Kết quả trả về:
[(0, 1), (2, 9), (3, 16), (1, 4), (4, 25), (6, 49), (7, 64), (5, 36)]
[(0, 1), (1, 4), (2, 9), (3, 16), (4, 25), (5, 36), (6, 49), (7, 64)]

=> Ở đoạn code trên, thay vì chỉ input mỗi dữ liệu là biến num vào queue, ta input thêm indentifier (thứ tự ban đầu của dữ liệu), do đó dễ dàng có thể sắp xếp lại sau khi các process xử lý xong và đẩy kết quả vào queue.

4. Lock & Semaphores

  • Giống với multi-threading, ta cũng có thể dùng locksemaphores để giới hạn số lượng process có thể chạy trong 1 thời điểm nhất định.
  • Xem ví dụ sau:
from time import sleep
from multiprocessing import Process, Semaphore, Lock

def func(p_lock, indentifier):
    with p_lock:
        sleep(2)
        print(f">> Process {indentifier} is running...")

def main():
    lock = Lock()
    sem = Semaphore(3)

    processes = [Process(target=func, args=(lock, i)) for i in range(5)]

    for process in processes:
        process.start()
    
    for process in processes:
        process.join()
    
if __name__=="__main__":
    main()
  • Ví dụ trên sử dụng lock, muốn sử dụng semaphores chỉ cần thay đổi:
processes = [Process(target=func, args=(sem, i)) for i in range(5)]
  • Khi sử dụng lock, sẽ chỉ có 1 process có thể chạy tại 1 thời điểm. Bạn sẽ thấy chỉ 1 dòng lệnh được in ra, ví dụ process 3 acquire được lock thì sẽ in ra >> Process 3 is running..., sau đó phải chờ 2s để process này thực thi xong, nó sẽ release lock để các process khác acquire và chạy.
  • Khi sử dụng semaphores, ở đoạn code trên ta cấu hình để 3 processes chạy song song cùng 1 lúc trong semaphores, vì thế sẽ có 3 dòng log được in ra đồng thời khi chạy, ví dụ:
>> Process 0 is running...
>> Process 3 is running...
>> Process 2 is running...
  • Sau khi 3 processes này thực thi xong, 3 processes tiếp sẽ có thể chạy cho tới khi không còn process nào cần thực thi.

Cảm ơn các bạn đã đọc hết bài này, mọi thắc mắc có thể để dưới phần bình luận nha.

Tham khảo:

  • The Complete Guide To Mastering Python In 2023 - Udemy

All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí