+4

[Java - Mutil thread] CountDownLatch và CyclicBarrier

Trong lập trình đa luồng (multi-threading), đôi khi chúng ta cần đồng bộ hóa các luồng để đảm bảo rằng một số tác vụ nhất định sẽ được thực hiện theo một trình tự xác định hoặc chờ đợi cho đến khi tất cả các tác vụ khác đã hoàn thành. Java cung cấp hai lớp hỗ trợ điều này là CountDownLatch và CyclicBarrier. Cả hai lớp này đều được sử dụng để đồng bộ hóa các luồng, tuy nhiên chúng có mục đích và cách sử dụng khác nhau.

1.CountDownLatch

Nguyên lý hoạt động

Lớp CountDownLatch cho phép chúng ta bắt đầu thực hiện một thread X ngay sau khi tất cả các thread A1, A2, A3, ... đều đã hoàn thành.

CountDownLatch sử dụng một biến đếm nội bộ. Khác với Semaphore, biến đếm này chỉ giảm mà không tăng và khi giảm đến 0 thì sẽ dừng. Giá trị khởi tạo của biến đếm được truyền vào từ hàm khởi tạo: CountDownLatch(int count).

Mỗi khi một thread A1, A2, A3, ... hoàn thành, phương thức countDown() sẽ được gọi để giảm biến đếm đi 1. Bên cạnh đó, phương thức await() được gọi để block cho đến khi biến đếm giảm xuống còn 0. Khi đó, thread X chính thức được bắt đầu.

Ví dụ:

MainTask.java

public class MainTask implements Runnable {

    @Override
    public void run() {
        System.out.println("Start main task...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Done main task!");
    }

}

SubTask.java

import java.util.concurrent.CountDownLatch;

public class SubTask implements Runnable {

    private CountDownLatch countDownLatch;

    public SubTask(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        System.out.println("Start sub task...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Done sub task!");
        countDownLatch.countDown();
    }

}

Test.java

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {

    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(3);

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.submit(new SubTask(countDownLatch));
        executorService.submit(new SubTask(countDownLatch));
        executorService.submit(new SubTask(countDownLatch));

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(countDownLatch.getCount());
        executorService.submit(new MainTask());

        executorService.shutdown();
    }

}

Kết quả:

Start sub task...
Start sub task...
Start sub task...
Done sub task!
Done sub task!
Done sub task!
0
Start main task...
Done main task!

2.CyclicBarrier

Nguyên lý hoạt động

Tên gọi của lớp CyclicBarrier đã nói lên đưọc nguyên lý hoạt động của lớp này:

  • barrierCyclicBarrier cho phép các thread đợi nhau để cùng tiếp cận một điểm chặn (barrier) chung. CyclicBarrier rất hữu ích đối với các chương trình có một số lượng thread cố định phải đợi nhau thì mới có thể xử lý tiếp được.
  • cyclicCyclicBarrier có thể tái sử dụng sau khi các thread đã được giải phóng.

CyclicBarrier cũng có một biến đếm nội bộ tương tự như của CountDownLatch, chỉ khác là biến đếm của CyclicBarrier có thể reset về giá trị khởi tạo ban đầu. Giá trị khởi tạo ban đầu của biến đếm, hay số lượng thread tối đa có thể tiếp cận barrier, được truyền vào từ hàm khởi tạo: CyclicBarrier(int parties).

Mỗi khi một thread hoàn thành xong công việc đưọc yêu cầu, nó sẽ gọi phương thức await() để "tham gia" vào quá trình đợi các thread khác cũng hoàn thành xong công việc. Sau khi tất cả các thread đều đã await(), đoạn mã dưới await() ở mỗi thread mới được tiếp tục thực hiện.

Ngoài ra, chúng ta còn có thể khai báo một hành động Runnable barrierAction sẽ xảy ra khi tất cả các thread tiếp cận barrier, thông qua hàm khởi tạo: CyclicBarrier(int parties, Runnable barrierAction).

Ví dụ:

Chúng ta sẽ minh họa CyclicBarrier thông qua ví dụ sau: một hệ thống microservice gồm có 3 môi trường. Mỗi môi trường gồm có 3 service. Cả 3 service đều phải chờ nhau khởi động thành công (barrier) thì mới có thể tiếp nhận request từ client.

Service.java

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Service implements Runnable {

    private String serviceName;
    private CyclicBarrier cyclicBarrier;

    public Service(String serviceName, CyclicBarrier cyclicBarrier) {
        this.serviceName = serviceName;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        System.out.println("Service " + serviceName + " started...");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("Service " + serviceName + " was available to accept request");
    }
}

Test.java

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {

    private static final int ENVIRONMENTS = 3;
    private static final int SERVICES = 3;

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(SERVICES, () -> {
            System.out.println("Done one environment");
        });

        ExecutorService executor = Executors.newFixedThreadPool(ENVIRONMENTS);
        for (int i = 1; i <= ENVIRONMENTS; i++) {
            executor.submit(new Service("A" + i, cyclicBarrier));
            executor.submit(new Service("B" + i, cyclicBarrier));
            executor.submit(new Service("C" + i, cyclicBarrier));
        }

        executor.shutdown();
    }

}

Kết quả:

Service A1 started...
Service C1 started...
Service B1 started...
Done one environment
Service B1 was available to accept request
Service A1 was available to accept request
Service A2 started...
Service B2 started...
Service C1 was available to accept request
Service C2 started...
Done one environment
Service C2 was available to accept request
Service A2 was available to accept request
Service B2 was available to accept request
Service C3 started...
Service A3 started...
Service B3 started...
Done one environment
Service B3 was available to accept request
Service C3 was available to accept request
Service A3 was available to accept request

Tóm lại, CountDownLatch và CyclicBarrier là hai lớp hữu ích trong Java để đồng bộ hóa các luồng. CountDownLatch được sử dụng khi bạn muốn chờ đợi cho đến khi một số tác vụ đã hoàn thành trước khi tiếp tục thực hiện các tác vụ khác. Trong khi đó, CyclicBarrier được sử dụng để đảm bảo rằng tất cả các luồng đều phải chờ đợi lẫn nhau tại một điểm nhất định cho đến khi tất cả đều sẵn sàng, sau đó chúng sẽ tiếp tục cùng một lúc. Việc lựa chọn sử dụng CountDownLatch hay CyclicBarrier phụ thuộc vào yêu cầu cụ thể của ứng dụng và cách thức mà các luồng cần phải được đồng bộ hóa.


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí