Managing threads with Queue and SizedQueue

Để một ứng dụng đa luồng của chúng ta có thể chạy một cách nhanh hơn, điều đâu tiên mà các Ruby developer nghĩ đến đó chính là việc sử dụng các Thread. Đối với một ứng dụng lâu dài, sử dụng lại một tập hợp các Thread như vậy có thể mang lại những lợi ích tuyệt vời về hiệu suất. Thread pool là một cách để có một tập hợp các Thread thực hiện công việc, thay vì tạo ra những cái mới mỗi lần cần sử dụng.

Nghe khá hay ho nhưng điều đầu tiên cần làm rõ đó là Thread là gì?

Sơ lược về Thread

Thread là một implementation của Ruby cho một mô hình lập trình xảy ra đồng thời với luồng xử lý chính.
Các chương trình yêu cầu nhiều luồng thực thi là một ứng dụng hoàn hảo để class Thread của Ruby thể hiện được hết sức mạnh của mình. Như đã nói ở trên Thread là một class và để khởi tạo một Thread ta có thể dễ dàng làm điều này bằng method :new (method khá thường thấy trong việc khởi tạo các đối tượng trong Ruby).

thr = Thread.new { puts "Whats the big deal" }

Sau khi khởi tạo, chúng ta có thể tạm dừng việc thực hiện các Thread chính và cho phép Thread mới của chúng ta kết thúc, bằng việc sử dụng join:

thr.join #=> "Whats the big deal"

Nếu chúng ta không gọi thr.join trước khi thread chính kết thúc, thì tất cả các thread khác bao gồm thr sẽ bị kill (có nghĩa là nó sẽ không được thực hiện). Ngoài ra, bạn có thể sử dụng một mảng để xử lý nhiều luồng cùng một lúc, như trong ví dụ sau:

threads = []
threads << Thread.new { puts "Whats the big deal" }
threads << Thread.new { 3.times { puts "Threads are fun!" } }

Sau khi tạo ra một vài thread, giờ chúng ta phải làm là chờ đợi cho tất cả đều được kết thúc một các liền mạch.

threads.each { |thr| thr.join }


Kiến thức về Thread còn rất nhiều, trên đây chỉ là một vài khai báo cơ bản của Thread. Các bạn có thể tìm và đọc thêm về Thread trên trang ruby-doc.org
Nào nhìn sơ qua như vậy thì rất khó để thấy được ý nghĩa của Thread. Cùng xem xét một ví dụ đơn giản dưới đây, ta có thể thấy được tầm quan trọng của Thread trong một ứng dụng thực tế:

def add_elements(group)
  sleep(4)
  sum = 0
  group.each do |item|
    sum += item
  end
  sum
end
 
@group1 = [22, 44, 55]
@group2 = [45, 59, 72]
@group3 = [99, 22, 33]
 
puts "sum of group1 = #{add_elements(@group1)}"
puts "sum of group2 = #{add_elements(@group2)}"
puts "sum of group3 = #{add_elements(@group3)}"

Theo ví dụ trên, bạn sẽ nhận được tổng của mỗi mảng là đầu ra nhưng lệnh sleep(4) sẽ tạm dừng thực hiện trong 4 giây và sau đó tiếp tục. Như vậy, nhóm1 sẽ nhận được tổng sau 4 giây, nhóm2 sau 8 giây và vân vân, điều này không khả thi chút nào trong ứng dụng thực tế khi những công việc cuối cần một khoảng thời gian rất lâu để nhận được kết quả. Trong những trường hợp như vậy, ứng dụng sẽ tiết kiệm hơn nhiều nếu có các luồng, vì chúng ta có thể tính tổng của mỗi mảng một cách độc lập mà không liên quan gì đến nhau cả. Các Thread cho phép chúng tôi thực hiện các phần khác nhau của chương trình một cách độc lập. Để thực hiện các Thread, sau khi khởi tạo mỗi mảng,

threads = (1..3).map do |c|
  Thread.new(c) do |c|
    groups = instance_variable_get("@groups#{element}")
    puts "groups#{element} = #{add_element(groups)}"
  end
end
threads.each {|thr| thr.join}

Định nghĩa phương thức add_element là giống nhau nhưng chúng ta đã bao bọc phương thức gọi trong một khối Thread.new (phần này có thêm một chút meta programming mà có lẽ mình sẽ có một series riêng cho nó, đại khái là với cách viết này chúng ta không cần lặp đi lặp lại một đoạn code khai báo Thread giống nhau). Bây giờ, thay vì nhận được tổng của mỗi mảng sau 4 giây, 8 giây và 12 giây tương ứng, bạn sẽ nhận được tổng của tất cả các mảng sau 4 giây. Điều này cho thấy hiệu suất tốt hơn và hiệu quả đó là sức mạnh của Thread.
Phần sơ lược trên đây có thể giúp mọi người hiểu thêm được một phần nào về Thread. Giờ đến việc quản lý chúng như thế nào một cách hiệu quả.

Sử dụng Queue trong việc quản lý threads

Không cần tìm kiếm các thư viện ngoài ở đâu xa cả, ngay bản thân ruby cũng đã cung cấp cho chúng ta một thư viện (standard library) giúp quản lý các threads khi chúng được sinh ra.
Để trao đổi thông tin một cách an toàn giữa các threads, chúng ta có thể sử dụng Queue. Các tác vụ được thêm vào đầu tiên trong hàng đợi được truy xuất đầu tiên (first in first out). Trong đó, PUSH và POP là hai phương thức chính trong Queue để thêm và truy xuất một mục tương ứng.
Sơ qua về Queue. Class Queue thực hiện các hàng đợi đa người sản xuất, đa người dùng. Nó đặc biệt hữu ích trong lập trình đa luồng khi thông tin phải được trao đổi một cách an toàn giữa nhiều thread. Class Queue thực hiện tất cả các khóa ngữ nghĩa được yêu cầu sử dụng (required trong hệ thống).
Tương tự như Thread hay bất kỳ một class nào khác của Ruby, Queue cũng có các public class method, các instance method phục vụ cho việc thao tác của các developer một cách thuận tiện nhất, và đương nhiên nó cũng có docs riêng. Mình mới tìm hiểu qua và có thể có ích cho mọi người khi đọc thêm các method của class Queue ở đây.
Quay trở lại bài viết, khi muốn khởi tạo một instance của Queue, chúng ta tương tự sử dụng method .new() giống với Thread:

require 'thread'
queue = Queue.new
 
Thread.new do
  4.times do |i|
    sleep(2)
    queue << i
    puts "Thread #{i} produced"
  end
end

Như vậy đã có 4 items được thêm vào queue và đây là kết quả trả ra:

Thread 0
 
Thread 1
 
Thread 2
 
Thread 3

Bây giờ, để bật ra (pop off) các mục từ queue:

Thread.new do
  4.times do |i|
     sleep(2)
     puts "consumed thread #{queue.pop}"
  end
end

Đoạn code trên sẽ có kết quả như sau:

Thread 0
 
consumed 0
 
Thread 1
 
consumed 1
 
Thread 2
 
consumed 2
 
Thread 3
 
consumed 3

Kích thước của Queue trong trường hợp cần fix size

Queue có kích thước rất hữu ích trong những trường hợp tỷ lệ sản xuất cao hơn mức tiêu thụ (hay nói cách khác là việc sinh ra Thread nhiều hơn việc sử dụng). Trong ví dụ sau,

require 'thread'
queue = Queue.new
 
Thread.new do
  10.times do |i|
    sleep(2)
    queue << i
    puts "Thread #{i} produced"
  end
end
 
Thread.new do
  4.times do |i|
    sleep(2) 
    puts "consumed thread #{queue.pop}"
  end
end

Rất dễ nhìn thấy trong ví dụ, 10 threads được sản xuất và 4 threads được tiêu thụ và tích lũy trong hàng đợi. Đây là vấn đề lãng phí bộ nhớ. Do đó chúng tôi dựa vào hàng đợi có kích thước. (Nói rõ hơn là chỉ 4 threads được lưu trữ đầu tiên mới được tích lũy và truy suất ra khỏi queue, các threads sau mặc dù được lưu vào queue nhưng đều không thể truy xuất được). Thay vì Queue.new, chúng tôi sử dụng SizedQueue.new (maxvalue). Đối số maxvalue xác định số lượng tối đa mà chúng ta cho phép đặt trong hàng đợi. Sửa đổi ví dụ trên, chúng ta có thể tiết kiệm không gian bộ nhớ.

require 'thread'
queue = SizedQueue.new(4)
 
Thread.new do
  10.times do |i|
    sleep(2)
    queue << i
    puts "Thread #{i}"
  end
end
 
Thread.new do
  4.times do |i|
    sleep(2) 
    puts "consumed #{queue.pop}"
  end
end

4 threads được lưu trữ và truy xuất. Sau 4 threads đó, giới hạn maxvalue được kiểm tra và hoạt động pop bị chặn. Giá trị lớn nhất của hàng đợi có kích thước của chúng ta là 4 ở đây, do đó sau khi push đó, phép toán không được phép, mặc dù vòng lặp là 10 lần. Giờ kết quả sẽ là

Thread 0 produced 
 
consumed thread 0
 
Thread 1 produced 
 
consumed thread 1
 
Thread 2 produced 
 
consumed thread 2
 
Thread 3 produced 
 
consumed thread 3
 
Thread 4 produced 
 
Thread 5 produced 
 
Thread 6 produced 
 
Thread 7 produced

Tổng kết

Bài học trên đây chỉ là một chút tìm tòi và đọc hiểu được của bản thân mình. Nó còn khá ít tình hữu dụng. Tuy nhiên, ít nhất qua bài học chúng ta có thể hiểu thêm về thế nào là threads, hiểu tầm quan trọng của threads trong việc xử lý đa luồng và các cách quản lý threads một cách hiệu quả nhất. Việc này sẽ giúp tăng hiệu năng của ứng dụng.
Bài viết còn rất sơ sài, mong mọi người có thể góp ý để mình cũng như những người khác có thể hiểu sâu hơn các khía cạnh khác trong threads.
Mình xin cảm ơn!