+3

Concurrency in C++11

Concurrency in C++ 11

Trong bài viết này bạn sẽ học được kiến thức cơ bản về chạy các luồng song song sử dụng shared memory.

  • C++11
  • Sử dụng shared memory
    • Threads
    • Race conditions
    • Mutexes
    • Atomicity
    • Asynchonous tasks
    • Condition variables
  • Bài toán Producer-Consumer

Resources

C++11

C++11 là phiên bản được công bố năm 2011. Trước khi được công bố, nó được biết đến là C++0x.

Threads

Khởi tạo một thread đơn giản trong C++ như sau:

#include <iostream>
#include <thread>
using namespace std;

void func(int x) {
    cout << "Inside thread " << x << endl;
}

int main() {
    thread th(&func, 100);
    th.join();
    cout << "Outside thread" << endl;
    return 0;
}

Chúng ta biên dịch đoạn code này bằng g++ như sau:

$ g++ -std=c++0x main.cpp -pthread

Race conditions

Hãy tưởng tượng rằng x * x là phép tính cực kỳ phức tạp, và chúng ta cần tính tổng bình phương của một dãy các số. Chúng ta có thể hình dung ra việc sử dụng tính toán song song như sau:

#include <iostream>
#include <vector>
#include <thread>
using namespace std;

int accum = 0;

void square(int x) {
    accum += x * x;
}

int main() {
    vector<thread> ths;
    for (int i = 1; i <= 20; i++) {
        ths.push_back(std::thread(square, i));
    }

    for (auto& th : ths) {
        th.join();
    }
    cout << "accum = " << accum << endl;
    return 0;
}

Đoạn code trên sẽ tính tổng bình phương của các số từ 1->20. Mỗi vòng lặp chúng ta tạo ra một thread mà trong đó chúng ta sẽ tính bình phương của 1 số. Kết thúc vòng lặp, chúng ta sử dụng join() tất cả các threads, hàm nãy sẽ đợi các threads tính bình phương kết thúc trước khi thực hiện các hoạt động tiếp theo. Việc này rất quan trọng trước khi in ra giá trị của accum, bởi vì các threads của chúng ta có thể chưa kết thúc trước khi in.

Biên dịch và chạy chương trình, kết quả mong đợi sẽ là 2870.

Giả sử chương trình được biên dịch ra có tên là a.out, chúng ta sẽ chạy chương trình một số lần như sau:

$ for i in {1..40}; do ./a.out; done

Nếu chúng ta chạy 1000 lần như sau:

for i in {1..1000}; do ./a.out; done | sort | uniq -c

Kết quả in ra màn hình không còn luôn luôn là 2870 nữa. Đây gọi là race condition. Khi trình biên dịch thực hiện tính a += x * x, nó đọc giá trị của accum và việc cập nhật giá trị không phải là atomic. Chúng ta sửa lại hàm square như sau:

int temp = accum;
temp += x * x;
accum = temp;

Biên dịch lại chương trình, chúng ta sẽ thấy 2 threads đầu tiên chồng chéo nhau:

// Thread 1             // Thread 2
int temp1 = accum;      int temp2 = accum;          // temp1 = temp2 = 0
                        temp2 += 2 * 2;             // temp2 = 4
temp1 += 1 * 1;                                     // temp1 = 1
accum = temp1;                                      // accum = 1
                        accum = temp2;              // accum = 4

Kết thúc chúng ta có accum = 4 thay vì giá trị đúng là 5

Mutex

Một mutex (mutual exlusion) cho phép chúng ta đóng gói một đoạn code chỉ thực hiện một lần trong một thread tại một thời điểm. Chúng ta thay hàm square ở ví dụ trên như sau:

int accum = 0;
mutex accum_mutex;

void square(int x) {
    int temp = x * x;
    accum_mutex.lock();
    accum += temp;
    accum_mutex.unlock();
}

Chạy lại chương trình và vấn đề race condition đã được sửa. Khi thread đầu đi vào hàm, nó gọi lock() để khóa chương trình. Trong thời gian khóa, các thread khác sẽ phải đợi cho đến khi mutex được mở khóa. Để ý rằng, ở đây chúng ta sử dụng biến temp nhằm thực hiện phép toán bình phương bên ngoài cặp lock-unlock

Atomic

C++11 đưa ra một tiếp cận đơn giản hơn, đó là atomic

#include <atomic>

atomic<int> accum(0);

void square(int x) {
    accum += x * x;
}

Chúng ta không cần sử dụng biến temp vì x * x đã được tính toán trước khi cộng vào accum

Tasks

Cùng xem xét ví dụ sau:

#include <iostream>
#include <future>
#include <chrono>
using namespace std;

int square(int x) {
    return x * x;
}

int main() {
    auto a = async(&square, 10);
    int v = a.get();

    cout << "The thread returned " << v << endl;
    return 0;
}

async sử dụng một cặp object được gọi là promisefuture. future được link với một promise và bất cứ lúc nào cần lấy giá trị có thể gọi hàm get(). Nếu promise chưa được fill đữ liệu, nó sẽ đợi cho đến khi sẵn sàng.

Conditional variables

Khi lập trình với đa luồng, chúng ta có thể gặp bài toán một thread cần chờ một thread khác kết thúc, gửi tín hiệu giữa các thread. Điều này có thể thực hiện đc với mutexs nhưng sẽ nguy hiểm khi gặp phải bài toán deadlock. Chúng ta cũng có thể sử dụng biến bool ví dụ tên là notified và được set bằng true khi chúng ta cần gửi tín hiệu. Thread khác thực hiện loop để check nếu notified bằng true thì dừng lặp. Bởi vì cài đặt notified bằng true là atomic và trong ví dụ này chúng ta chỉ set một lần, chúng ta không cần tới mutex. Tuy nhiên, trên thread chạy loop sẽ luôn chạy và làm tiêu tốn tài nguyên CPU. Chúng ta có thể thêm sleep_for trong vòng loop để làm CPU tạm nghỉ trong một số khoảng thời gian.

Một các tiếp cận khác đó là sử dụng wait cho một condition variable bên trong vòng lặp.

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <chrono>
#include <queue>
using namespace std;

condition_variable cond_var;
mutex m;

int main() {
    int value = 100;
    bool notified = false;
    thread reporter([&]() {
        unique_lock<mutex> lock(m);
        while (!notified) {
            cond_var.wait(lock);
        }
        cout << "The value is " << value << endl;
    });

    thread assigner([&]() {
        value = 20;
        notified = true;
        cond_var.notify_one();
    });

    reporter.join();
    assigner.join();
    return 0;
}

Trong assigner thread sẽ gán notified bằng true và gửi tín hiệu thông qua conditional variable cond_var. Trong reporter thread, chúng ta lặp đợi khi notified bằng false, ở mỗi vòng lặp chúng ta đợi một tín hiệu.

Chúng ta có thể nhận ra rằng, cond_var.wait(lock) sẽ block đến khi nó nhận cond_var nhận được tín hiệu. Vậy tại sao vẫn cần biến notified ở đầu loop? Thực tế conditional variable có thể bị đánh thức ngay cả khi chúng ta không gọi tới notify_one, do đó chúng ta cần thêm kiểm tra notified

Producer-consumer problem

Với bài toán producer-consumer chúng ta có thể giải quyết bằng đoạn chương trình như sau:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <chrono>
#include <queue>
using namespace std;

int main() {
    int c = 0;
    bool done = false;
    queue<int> goods;

    thread producer([&]() {
        for (int i = 0; i < 500; ++i) {
            goods.push(i);
            c++;
        }

        done = true;
    });

    thread consumer([&]() {
        while (!done) {
            while (!goods.empty()) {
                goods.pop();
                c--;
            }
        }
    });

    producer.join();
    consumer.join();
    cout << "Net: " << c << endl;
}

All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí