Những điều cần biết về java.util.concurrent trong lập trình đa luồng
Bài đăng này đã không được cập nhật trong 8 năm
Viết code đa luồng vừa làm việc tốt vừa bảo vệ được các ứng dụng trước các lỗi là khó khăn, đó là lý do mà chúng ta có java.util.concurrent. Tôi sẽ giới thiệu các class của java concurrent: CopyOnWriteArrayList, BlockingQueue, ConcurrentMap đã đáp ứng các yêu cầu lập trình multithread như thế nào
Java concurrent là bổ sung to lớn cho java 5, nhưng nó các nhà phát triển không nhận thấy chúng vì còn đang quan tân đến annotations và generics. Nhưng thực tế java concurrent chứa nhiều class giải quyết có hiệu quả các vấn đề về multithread mà bạn không phải vất vả xử lý nữa. Hãy đọc và tìm hiểu các class trong java.util.concurrent như CopyOnWriteArrayList và BlockingQueue sẽ giúp bạn giải quyết những vấn đề nan giải của lập trình multithread.
-
TimeUnit
Thực chất nó không phải là class của java concurrnet, nó chỉ là kiểu liệt kê(enum) làm cho code dễ đọc hơn rất nhiều. Việc sử dụng TimeUnit giải phóng các nhà phát triển khỏi gánh nặng về mili giây khi sử dụng phương thức và API TimeUnit liệt kê tất cả đơn vị thời gian [DAYS HOURS MICROSECONDS MILLISECONDS MINUTES NANOSECONDS SECONDS] nó xử lý hầu như tất cả các kiểu thời gian mà một nhà phát triển có thể cần đến. Có thể chuyển đổi qua lại các kiểu thời gian rất dễ dàng.
-
CopyOnWriteArrayList
Việc tạo bản sao mới của 1 mảng là một hoạt động quá tốn kém về cả chi phí thời gian và bộ nhớ, khi xem xét sử dụng thông thường, các nhà phát triển thường sử dụng ArrayList có đồng bộ để thay thế. Tuy nhiên đó cũng là 1 lựa chọn vì mỗi khi bạn duyệt qua nội dung bản phải đồng bộ hóa tất cả các hoạt động , bao gồm cả việc đọc và viết, để đảm bảo tính nhất quán. Điều này làm cho việc xử lý gặp nhiều khó khăn vì có những tính huống ở nơi có rất nhiều người đang đọc, vài người lại đang sửa đổi nó CopyOnWriteArrayList:
- là 1 class tuyệt vời để giải quyết vấn đề này, tất cả các vấn đề đọc thêm sửa xóa đều được đồng bộ được thực hiện bằng cách tạo 1 bản sao mới của mảng.
- Class này thực hiện sao chép nội bộ các nội dung của nó vào một mảng mới khi có bất kỳ sự thay đổi nào, do đó khi những người đọc truy cập vào các nội dung của mảng sẽ không phải chịu thời gian đồng bộ hóa(bởi vì họ không bao giờ hoạt đông trên dữ liệu thay đổi)
- Các hàm cơ bản
- addIfAbsent(E e): Thêm mới phần tử nếu chưa tồn tại
- remove(Object e): Xóa phần tử nếu tồn tại
- Iterator iterator(): Trả về toàn bộ iterator tất cả element nhưng tất cả đểu ở trên snapshot của list.
-
BlockingQueue
Interface BlockingQueue nói rằng nó là 1 hàng đợi có nghĩa là nó được lưu trữ theo thứ tự vào trước ra trước(FIFO). Các mục được chèn theo thứ tự thế nào thì sẽ lấy ra cùng thứ tự đó, nhưng với sự đảm bảo thêm bất kỳ nỗ lực nào để lấy ra một mục từ một hàng đợi rỗng sẽ chặn luồng đang ddowcj gọi cho đến khi nó trở nên sẵn sàng để lấy ra. Tương tự như vậy bất kỳ sự cố gắng nào để chèn một mục vào trong hàng được đã đầy sẽ chặn luồng đang gọi cho đến khi nó sẵn chỗ đê lưu trữ vào hàng đợi
BlockingQueue giải quyết dọn vấn đề làm thế nào để chuyển vùng các mục được thu nhập bởi 1 luồng, đưa sang luồng khác để xử lý mà không phải quan tâm chi tiết đến các vấn để động bộ hóa.
ArrayBlockingQueue là 1 class đảm bảo công bằng giữa luồng đọc và luồng viết quyển truy cập vào trước ra trước.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueDemo {
public static void main(String[] args) throws Exception {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1024);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(2345);
}
}
class Producer implements Runnable {
protected BlockingQueue<String> queue = null;
public Producer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
try {
queue.put("100");
Thread.sleep(1000);
queue.put("200");
Thread.sleep(1000);
queue.put("300");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
protected BlockingQueue<String> queue = null;
public Consumer(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
try {
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Trên là ví dụ về việc đọc và ghi ở 2 luống khác biệt hoàn toàn. trong thực tế khi làm việc với dữ liệu và luồng nhiều thì dùng offer để đẩy queue vào sẽ nhanh hơn
-
ConcurrentMap
Map chứa đựng một lỗi xảy ra lỗi giữa đọc và ghi key vào map, nếu như trước thường phải sử dụng synchonized ở hàm để đồng bộ xử lý dữ liệu. Nhưng với ConcurrentMap thì giải quyết dễ dàng
Khi một map được truy cập từ nhiều luồng thường phổ biến là sử dụng containsKey() hoặc get() để tìm hiểu xem key đã có mặt trước đó không trước khi lưu trữ. Nhưng một khi map đã đồng bộ hóa thì một luồng có thể lẻn vào và nắm quyển điều khiển map. Vấn đề là khóa đồng thời get() và put() dẫn đến kết quả khác nhau tùy thuộc vào luồng nào đến trước
Nếu 2 luồng cùng gọi 1 phương thức chính xác tại 1 thời điểm, một luồng sẽ kiểm tra và sau đó một luồng sẽ đặt giá trị, làm mất đi giá trị của luồng 1. Nhưng với concurrentMap thì hỗ trợ 1 số phương thức để làm 2 việc dưới 1 khóa duy nhất. ví dụ hàm putIfAbsent(), đầu tiên nó sẽ kiểm tra key có chưa, sau đó chỉ đặt nếu key chưa tồn tại trong map
// ConcurrentMap
V putIfAbsent(K key, V value)
// HashMap
if (!map.containsKey(key))
return map.put(key, value);
else
return map.get(key);
-
SynchronousQueues
Một hàng được có chặn trong đó mỗi hoạt động chèn vào phải chờ 1 hoạt động gỡ bỏ tương ứng bởi một luồng khác và ngược lại. Một SynchronousQueue không có bất kỳ dung lượng nào, thậm chí ngay cả dung lượng = 1
Về cơ bản, SynchronousQueue là một việc triển khai thực hiện khác của BlockingQueue nói trên. Nó cung cấp cho chúng ta một cách rất gọn nhẹ để trao đổi các phần tử đơn lẻ từ một luồng này sang luồng khác.
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueDemo {
public static void main(String[] args) {
final SynchronousQueue<String> queue = new SynchronousQueue<String>();
// start publisher thread
new Thread(new Runnable() {
@Override
public void run() {
String event = "SYNCHRONOUS_EVENT";
String another_event = "ANOTHER_EVENT";
try {
queue.put(event);
System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), event);
queue.put(another_event);
System.out.printf("[%s] published event : %s %n", Thread.currentThread().getName(), another_event);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// start consumer thread
new Thread(new Runnable() {
@Override
public void run() {
try {
String event = queue.take();
// thread will block here
System.out.printf("[%s] consumed event : %s %n", Thread.currentThread().getName(), event);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
Ouput:
[Thread-0] published event : SYNCHRONOUS_EVENT
[Thread-1] consumed event : SYNCHRONOUS_EVENT
Trên là 5 vấn đề có thể bạn chưa biết về java.util.concurrent, trong bài viết tới tôi sẽ trình bày những khám phá sâu hơn về concurrent.
All rights reserved