+5

Thread Pools Trong Java

I, Giới thiệu

ThreaPool trong java rất hữu ích khi cần giới hạn số lượng Thread được chạy bên trong ứng dụng của bạn cùng 1 thời điểm. Sẽ có vấn đề về hiệu suất khi tạo mới 1 Thread (new Thread) và mỗi thread được tạo ra cũng được phân bổ bộ nhớ cho việc sử dụng.

Ví dụ: Bạn có 100 tập tin cần tải về từ trên mạng, mỗi tệp tin bạn cần 1 thread để download, như vậy sẽ có 100 thread hoạt động cùng 1 thời điểm trong ứng dụng của bạn, sẽ gây ra 1 vấn đề nghiêm trọng về bộ nhớ và hiệu suất và có thể dẫn đến gây lỗi (crash) chương trình.

Thay vì phải tạo mới thread cho mỗi task (nhiệm vụ) được thực hiện đồng thời, các nhiệm vụ cần đưa vào 1 thread pool (hồ bơi dành cho luồng). Và ngay sau khi trong hồ bơi có bất kì luồng nào đang nhàn rỗi (no task) các nhiệm vụ sẽ được gán vào 1 trong số chúng và được thực thi.

Bên trong ThreadPool các nhiệm vụ được chèn vào một Blocking Queue (hàng đợi được khóa) nơi mà các Thread sẽ lấy chúng ra và thực thi lần lượt. Mỗi khi có 1 task mới được thêm vào Queue, sau đó chỉ có 1 thread nhàn rỗi sẽ vào queue và lấy chúng ra, các thread nhàn rỗi còn lại phải chờ sau khi thread trước đó lấy nhiệm vụ ra thành công.

Thread Pool thường xuyên được sử dụng trong các máy chủ thực hiện đa luồng. Mỗi kết nối đến với máy chủ thông qua mạng được bao bọc như là 1 nhiệm vụ (task) và đưa vào thread pool. Các thread trong thread pool sẽ xử lý các yêu cầu về kết nối đồng thời.

Trong Java 5 Thread Pool đã được xây dựng sẵn (build-in) các lớp trong gói java.util.concurrent, vì vậy bạn không cần triển khai ThreadPool của riêng bạn. Thay vào đó chúng ta cùng đi nghiên cứu về các sử dụng của lớp java.util.concurrent.ExecutorService.

II, ExecutorService

Giao diện (interface) java.util.concurrent.ExecutorService đại diện cho cơ chế thực thi bất đồng bộ có khả năng thực thi các nhiệm vụ trong background (nền). ExecutorService là tương tự như một Thread Pool. Trong thực tế, việc thực hiện ExecutorService trong gói là một triển khai thread pool.

1. Ví dụ

Sau đây là 1 ví dụ đơn giản về Java ExecutorService:

ExecutorService executorService = Executors.newFixedThreadPool(10);

executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});

executorService.shutdown();

Đầu tiên, ExecutorService là được tạo bằng cách sử dụng phương thức newFixedThreadPool(). Việc này tạo ra 1 thread pool với 10 thread dành cho việc thực thi các nhiệm vụ.

Tiếp đó, một triển khai của giao diện Runnable được đưa vào phương thức execute() như 1 nhiệm vụ. Điều này làm cho Runnable được thực thi bởi một trong các thread có trong ExcutorService.

2. Task Delegation (Sự ủy thác nhiệm vụ)

Dưới đây là một sơ đồ minh họa một thread ủy thác 1 nhiệm vụ tới ExecutorService cho việc thực thi bất đồng bộ:

executor-service.png

Một khi Thread giao nhiệm vụ cho ExecutorService, nó sẽ tiếp tục thực hiện công việc riêng của nó độc lập với việc thực thi nhiệm vụ của ExecutorService.

3. Triển khai ExecutorService

Sau đây là tạo ExecutorServer và cách sử dụng các phương thức của nó.

3.1 Tạo ra 1 ExecutorService

Bạn có thể sử dụng lớp Executors (factory class) trong gói java.util.concurrent để tạo ra 1 thể hiện của ExecutorService. Dưới đây là 1 vài ví dụ cho việc tạo một ExecutorService:

ExecutorService executorService1 = Executors.newSingleThreadExecutor();

ExecutorService executorService2 = Executors.newFixedThreadPool(10);

ExecutorService executorService3 = Executors.newScheduledThreadPool(10);
3.2 Cách sử dụng ExecutorService

Có một vài cách khác nhau để giao nhiệm vụ tới một ExecutorService:

  • execute(Runnable)
  • submit(Runnable)
  • submit(Callable)
  • invokeAny(...)
  • invokeAll(...)

Sau đây là cách sử dụng các phương thức trên và tác dụng của chúng:

execute(Runnable)

Phươngthức execute(Runnable) đưa vào một đối tượng java.lang.Runnable và thực thi chúng bất đồng bộ. Đây là một ví dụ cho việc thực thi a Runnable với một ExcutorService:

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});

executorService.shutdown();

Với việc sử dụng phương thức này không có cách nào để thu được kết quả của việc thực hiện Runnable (k có callback hoặc giá trị trả về khi thực hiện xong nhiệm vụ). Bạn sẽ phải sử dụng 1 Callable cho việc đó. Xem submit(Callable).

**submit(Runnable) **

Phương thức submit(Runnable) cũng đưa vào 1 Runnable nhưng nó trả về một đối tượng Future. Đối tượng Future có thể được sử dụng để kiểm tra nếu Runnable đã hoàn tất việc thực thi.

Future<T> future = executorService.submit(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
}, T.class);

future.get(); // Trả về đối tượng T mà bạn truyền vào, dựa vào đây xác đây nhiệm đã hòan tất

**submit(Callable) **

Phương thức submit(Callable) tương tự như submit(Runnable) ngoại trừ việc hàm call() của nó cần 1 giá trị trả về để xác định kết quả thu được sau khi hòan thành nhiệm vụ còn phương thức Runnable.run() không thể trả lại kết quả.

Kết quả của Callable có thể thu được thông qua đối tượng Future được trả về bởi phương thức submit(Callable). Dưới đây là ví dụ:

Future<T> future = executorService.submit(new Callable(){
    public Object call() throws Exception {
        System.out.println("Asynchronous Callable");
        return T;
    }
});

System.out.println("future.get() = " + future.get()); // Return T object (callable result)

Sử dụng phương thức future.get() để thu được kết quả. Chú ý phương thực này được thực thi đồng bộ (Asynchronous - tức là sau khi callable hòan thành nhiệm vụ kết quả được trả về nó mới được thực thi).

**invokeAny(Collection<?> extends Callable<T> tasks) **

Phương thức invokeAny() nhận một tập hợp (collection) của các đối tượng Callable hoặc các lớp được kế thừa từ Callable chúng. Gọi phương thức này không trả về một Future, nhưng trả về kết quả của một trong những đối tượng Callables. Bạn không đảm bảo về kết nào bạn sẽ nhận được từ callable. Chỉ cần một trong số chúng hòan thành (Tức là ko cần tất cả các thread hòan thành, chỉ cần 1 task hòan thành phương thức get() sẽ nhận được kết quả.

Nếu 1 trong số task hòan thành (hoặc ném ra 1 ngoại lệ), phần còn lại của Callable sẽ được hủy bỏ (cancelled).

Đây là ví dụ:

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        int b = 3 / 0;
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

String result = executorService.invokeAny(callables);

System.out.println("result = " + result);

executorService.shutdown();

Đoạn mã trên sẽ in ra các kết quả được trả về từ 1 trong những Callable trong tập hợp. Chạy nó vài lần bạn sẽ nhận được những kết quả khác nhau. Thỉnh thoảng là “Task 1” hoặc “Task 3”.

**invokeAll(Collection<?> extends Callable<T> tasks) **

Phương thức invokeAll() gọi tất cả đối tượng Callable bạn đẩy vào trong tập hợp. Phương thức này trả về 1 danh sách các đối tượng Future (list of Future) mà được trả về từ việc thực thi các Callables.

Hãy nhớ rằng một công việc có thể hòan thành do một ngoại lệ, vì vậy có nghĩa nó có thể không “thành công” nhiệm vụ. Không có cách nào để biết sự khác biệt trên đối tượng Future.

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

List<Future<String>> futures = executorService.invokeAll(callables);

for(Future<String> future : futures){
    System.out.println("future.get = " + future.get());
}

executorService.shutdown();

Kết thúc ExecutorService (ExecutorService Shutdown)

Khi bạn đã thêm vào các nhiệm vụ cần thiết bạn nên tắt ExcutorService bằng phương thức shutdown(). Khi bạn gọi phương thức này có nghĩa ExcutorService sẽ từ chối nhận thêm các nhiệm vụ (Task), và một khi tất cả các nhiệm vụ đã được thêm vào trước đó đã hòan thành. Sau đó Executor sẽ được tắt (Có nghĩa tất cả các task được thêm vào trước khi gọi shutdown() đều sẽ được thực thi).

Nếu bạn muốn tắt ExecutorService ngay lập tức, bạn có thể gọi phương thức shutdownNow(). Điều này sẽ cố gắng ngắn chặn tất cả các nhiệm vụ ngay lập tức và loại bỏ các nhiệm vụ đã được đưa vào Queue nhưng chưa được thực thi. Không có j đảm bảo về việc tắt các nhiệm vụ đang chạy hòan tòan, nhưng phương thức này là nỗ lực tốt nhất để tắt chúng.

(Gợi ý: Sử dụng phương thức shutdownNow() cũng tương tự việc sử dụng interupt() trong Thread. Để an tòan và chắc chắn hơn khi stop các nhiệm vụ đang được thực thi bạn nên thêm vào 1 biến flag chung cho các nhiệm vụ, khi bạn 1 tắt cả thread sử dụng kèm cả shutdownNow() và set giá trị cho biến flag khiến quá trình dừng lại).

Mã nguồn demo việc sử dụng ExecutorService

Tham khảo


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí