+1

Java: Concurrency with LMAX Disruptor

Bài viết sẽ giới thiệu về LMAX Disruptor và cách mà nó giúp phần mềm đạt được concurrenncy với độ trễ thấp. Bài viết cũng sẽ đưa ra ví dụ cơ bản về cách sử dụng của thư viện Disruptor.

1. Disruptor là gì ?

Disruptor là thư viện java mã nguồn mở được phát triển bởi LMAX. Đây là một thư viện hỗ trợ việc phát triển các ứng dụng chịu tải lớn, có thể xử lý đồng thời một số lượng lớn giao dịch với độ trễ thấp. Việc tối ưu hóa này đạt được bằng một thiết kế phần mềm khai thác hiệu quả sức mạnh phần cứng

1.1. Mechanical Sympathy

Mechanical Sympathy là một khái niệm về việc hiểu cách hoạt động của phần cứng và lập trình phần mềm theo cách tối ưu nhất với phần cứng đó.

Ví dụ, cùng xem cách mà CPU và việc tổ chức memory ảnh hưởng đến hiệu năng của chương trình. CPU thường sẽ có vài tầng cache (L1, L2, L3) giữa bản thân nó và main memory. Khi CPU đang thực thi một tác vụ, trước tiên nó sẽ tìm kiếm dữ liệu trong tầng cache L1 trước, rồi sau đó tìm đến L2, L3 và cuối cùng là main memory. Càng đi xa thì việc thực thi tác vụ càng trở lên lâu hơn.

Nếu việc thực thi một tác vụ trên cùng một phần dữ liệu nhiều lần (Ví dụ tăng biến counter trong for loop chẳng hạn, ...) sẽ thật hợp lý nếu dữ liệu được lưu rất gần CPU, cụ thể là trong tầng cache L1 chứ không phải trong main memory.

Một vài con số biểu thị cho chi phí của cache miss:

Latency from CPU to CPU cycles Time
Main memory Multiple ~60-80 ns
L3 cache ~40-45 cycles ~15 ns
L2 cache ~10 cycles ~3 ns
L1 cache ~3-4 cycles ~1 ns
Register 1 cycle Very very quick

1.2. Vì sao không sử dụng Queue ?

Queue implementation có xu hướng tranh chấp ghi trên các biến đầu, cuối và size variables. Trên thực tế hàng đợi luôn có xu hướng gần đầy hoặc gần trống do sự chênh lệch về tốc độ của producer và consumer. Hàng đợi rất hiếm khi hoạt động trên một nền tảng cân bằng, nơi mà tốc độ của producer và consumer là như nhau. Điều này càng làm tăng thêm việc tranh chấp đọc ghi trên hàng đợi.

Để xử lý tranh chấp đọc ghi, hàng đợi thường xử dụng Lock, điều này có thể gây ra context switch tới kernel. Khi context switch xảy ra có khả năng làm mất dữ liệu trong cache của các bộ xử lý có liên quan.

Để bộ nhớ đệm được hoạt động hiệu quả nhất, thiết kế chỉ nên có một core writing để ghi dữ liệu vào bất kỳ vị trí bộ nhớ nào (Nhiều core đọc dữ liệu cũng được vì các bộ xử lý thường sử dụng liên kết tốc độ cao, đặc biệt giữa các bộ đệm của chúng). Không may là hàng đợi lại không tuân theo quy tắc one-writer.

Nếu có 2 luồng riêng biệt đang ghi vào hai giá trị khác nhau, mỗi lõi sẽ vô hiệu hóa cache line của lõi còn lại (cache line là dữ liệu được truyền giữa bộ nhớ chính và bộ nhớ đệm trong các khối có kích thước cố định). Đó là sự tranh chấp giữa 2 luồng, mặc dù chúng đang ghi dữ liệu vào 2 biến khác nhau. Điều này được gọi là false sharing.

1.3. Cách Disruptor hoạt động

RingBuffer-1.jpg

Disruptor có một cấu trúc dữ liệu vòng tròn dựa trên mảng. Nó chứa một con trỏ để trỏ tới giá trị khả dụng tiếp theo và chứa đầy các transfer object được phân bổ từ trước. Producer và consumer có thể ghi và đọc dữ liệu tới vòng mà không có sự tranh chấp hay cần locking.

Trong Disruptor, tất cả các event được publish tới tất cả consumer (multicast) để tiêu thụ song song thông qua các hàng đợi downstream. Do các consumer tiêu thụ event song song nên cần phải điều phối mối quan hệ giữa các consumer (dependency graph).

Các consumer và producer có một bộ đếm tuần tự để cho biết vị trí trên bộ đệm mà nó đang hoạt động trên đó. Mỗi consumer và producer có thể ghi bộ đếm tuần tự của riêng nó nhưng có thể đọc bộ đếm tuần tự của consumer hay producer khác. Các consumer và producer đọc bộ đếm để chắc chắn vị trí mà nó muốn ghi khả dụng mà không cần bất kỳ lock nào cả.

2. Sử dụng thư viện Disruptor

2.1. Maven Dependency

Thêm disruptor dependency vào file pom.xml :

 <dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.3.6</version>
</dependency>

Phiên bản mới nhất được kiểm tra ở đây

2.2. Định nghĩa một Event

Tạo một event để chứa dữ liệu như sau:

public static class ValueEvent {
    private int value;
    public final static EventFactory EVENT_FACTORY 
      = () -> new ValueEvent();

    // standard getters and setters
}

2.3. Consumer

Consumer sẽ đọc dữ liệu từ Ring Buffer. Tạo một consumer để handle các event:

public class SingleEventPrintConsumer {
    ...

    public EventHandler<ValueEvent>[] getEventHandler() {
        EventHandler<ValueEvent> eventHandler 
          = (event, sequence, endOfBatch) 
            -> print(event.getValue(), sequence);
        return new EventHandler[] { eventHandler };
    }
 
    private void print(int id, long sequenceId) {
        logger.info("Id is " + id 
          + " sequence id that was used is " + sequenceId);
    }
}

Trong ví dụ trên consumer chỉ in ra console

2.4. Constructing Disruptor

Tạo một disruptor như sau:

ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;

WaitStrategy waitStrategy = new BusySpinWaitStrategy();
Disruptor<ValueEvent> disruptor 
  = new Disruptor<>(
    ValueEvent.EVENT_FACTORY, 
    16, 
    threadFactory, 
    ProducerType.SINGLE, 
    waitStrategy);

Giải thích các tham số trong hàm tạo của Disruptor:

  • Event Factory – Chịu trách nhiệm tạo ra các đối tượng sẽ được lưu trữ trong bộ đệm vòng trong quá trình khởi tạo
  • The size of Ring Buffer - Chúng ta định nghĩa 16 là kích thước của Ring Buffer. Nó phải là lũy thừa của 2 nếu trong một ngoại lệ sẽ được ném ra trong quá trình khởi tạo. Điều này là hết sức quan trọng vì có thể dễ dàng thực hiện hầu hết các thao tác bằng cách sử dụng các toán tử nhị phân logic. Ví dụ mod operation.
  • Thread Factory - Nhà máy để tạo các thread cho các bộ xử lý sự kiện
  • Producer Type - Chỉ định chúng ta sẽ có một hay nhiều producer
  • Waiting strategy - Định nghĩa cách để xử lý các consumer chậm chạp không theo kịp tốc độ của producer

Kết nối tới consumer handler:

disruptor.handleEventsWith(getEventHandler());

Có thể kết nối nhiều consumer với disruptor để xử lý dữ liệu do các producer tạo ra. Trong ví dụ trên chúng ta chỉ có một consumer.

2.5. Khởi động Disruptor

Để khởi động disruptor:

RingBuffer<ValueEvent> ringBuffer = disruptor.start();

2.6. Producing và Publishing Events

Các producer ghi dữ liệu vào trong ring buffer theo một trình tự. Các producer phải biết về vị trí khả dụng ghi tiếp theo để tránh ghi đè lên các giá trị chưa được tiêu thụ.

Sử dụng ring buffer từ disruptor trong case publish dữ liệu:

for (int eventCount = 0; eventCount < 32; eventCount++) {
    long sequenceId = ringBuffer.next();
    ValueEvent valueEvent = ringBuffer.get(sequenceId);
    valueEvent.setValue(eventCount);
    ringBuffer.publish(sequenceId);
}

Ở đây, các producer đang publish các event theo trình tự. Điều quan trọng cần lưu ý ở đây là Disruptor hoạt động tương tự như giao thức cam kết 2 pha (two phase commit protocol). Nó đọc một sequenceId mới và publish. Lần tiếp theo, nó sẽ nhận được sequenceId + 1 làm sequenceId tiếp theo.

3. Tổng kết

Trong hướng dẫn này, chúng ta đã biết Disruptor là gì và làm thế nào nó đạt được sự đồng thời với độ trễ thấp. Chúng ta đã thấy khái niệm về Mechanical Sympathy và cách nó có thể được khai thác để đạt được độ trễ thấp. Sau đó chúng ta đã thấy một ví dụ sử dụng thư viện Disruptor. Thank you !


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí