Tạo một Future object Với các thư viện concurrency trong ruby
Bài đăng này đã không được cập nhật trong 8 năm
Future
là một khái niệm trừu tượng mô tả về kết quả của một xử lý không đồng bộ. Nó có nghĩa là nếu bạn đưa các xử lý tính toán vào Future
thì nó sẽ được thực thi trong một thread mới riêng biệt, tức là thread chính của chương trình có thể thực hiện các tác vụ khác mà không cần phải chờ đợi.
Trong bài viết này, chúng ta sẽ viết một thư viện Future
với Ruby và cùng với đó, các bạn có thể tìm hiểu thêm về các thư viện concurrency mà Ruby cung cấp.
Future hữu ích như thế nào?
Trước khi bắt đầu vào implement, hãy xem Future
hữu ích như thế nào. Future
rất thích hợp để gửi các HTTP request song song. Chúng ta sẽ bắt đầu bằng một ứng dụng ruby đơn giản là lấy một chuyện đùa bất kỳ về Chuck Norris từ The Internet Chuck Norris Database
require "open-uri"
require "json"
require "benchmark"
class Chucky
URL = "http://api.icndb.com/jokes/random"
def sequential
open(URL) do |f|
f.each_line { |line| puts JSON.parse(line)["value"]["joke"] }
end
end
end
Chucky.new.sequential
# There are two types of people in the world... people that suck, and Chuck Norris.
Mỗi lần hàm sequential
được gọi, chương trình sẽ lấy một chuyện đùa bất kỳ về Chuck Norris. Vấn đề gì sẽ xảy ra nếu ta muốn lấy nhiều hơn 10 chuyện? Giải pháp thông thường sẽ là:
10.times { chucky.sequential }
Không may là viết thế này sẽ làm lãng phí tài nguyên của CPU và thời gian của chúng ta. Mỗi lần request được gửi để lấy dữ liệu, luồng xử lý chính sẽ bị block và phải chờ request được xử lý xong trước khi chuyển đến request tiếp theo. Với Future
, ta hoàn toàn có thể khắc phục được điều đó.
Viết test
Chúng ta sẽ viết một gem Future
riêng biệt với TDD (Test Driven Development). Đầu tiên, ta sẽ tạo một Ruby gem mới với câu lệnh:
bundle gem futurama
Tiếp theo, chạy bin/setup
để thiết lập các dependencies
Bước 1
Test đầu tiên của chúng ta sẽ rất đơn giản:
require "spec_helper"
require "timeout"
module Futurama
describe "Future" do
it "returns a value" do
future = Futurama::Future.new { 1 + 2 }
expect(future).to eq(3)
end
end
end
Test trên mô tả cách thức để tạo ra một Future
. Một Future
object sẽ nhận một block
chứa các tác vụ và khi object Future
được gọi đến, nó sẽ trả về giá trị của các tác vụ được truyền vào từ block.
Sau khi đã có test, ta sẽ bắt đầu viết code cho nó. Tạo một file tên là future.rb trong folder lib/futurama
. Sau đó là đưa require.rb vào lib/futurama.rb như sau:
require "futurama/version"
require "futurama/future"
module Futurama
end
Để có thể pass được test, một Future
object cần phải:
-
nhận một block
-
trả về giá trị trong block khi nó được gọi
Với điều kiện đầu tiên, khá đơn giản:
module Futurama
class Future
def initialize &block
@block = block
end
end
end
Với điệu kiện thứ 2 sẽ phức tạp hơn một chút:
require "delegate"
module Futurama
class Future < Delegator
# initialize was here
def __getobj__
@block.call
end
end
end
Khi kế thừa Delegator
, ta phải viết hàm __getobj__
, nếu không Ruby sẽ đưa ra exception. Tuy nhiên, hàm này là cần thiết vì khi object được gọi đến, nó sẽ gọi lại hàm này, và nếu muốn trả lại giá trị từ block truyền vào, ta chỉ cần gọi block
trong method này.
Bây giờ khi chạy test, ta sẽ thấy là test pass.
Xử lý tính toán Background
Quay lại với việc viết test. Một object Future
sẽ nhận một xỷ lý và chạy nó trong một thread mới. Để có thể test nó, ta có thể tạo một Future
object và bắt nó đợi 1 giây trước khi trả về kết quả. Ở thread chính, ta cũng mô phỏng lại một xử lý mà mất 1 giây:
require "timeout"
module Futurama
describe "Future" do
it "executes the computation in the background" do
future = Futurama::Future.new { sleep(1); 42 }
sleep(1) # do some computation
Timeout::timeout(0.9) do
expect(future).to eq(42)
end
end
end
end
Vì xử lý trong Future
object chạy trong một thread mới cũng tương đương với việc thread chính delay 1 giây. Về mặt thực t thì nó sẽ mất ít hơn 1 giây trước khi kết quả từ Future
được trả về. Ta sử dụng thư viện Timeout
của Ruby để đảm bảo việc Future
hoàn thành xử lý trong khoảng thời gian ta đã thiết lập.
Implementation
module Futurama
class Future < Delegator
def initialize &block
@block = block
@thread = Thread.new { run_future }
end
def run_future
@block.call
end
def __getobj__
@thread.value
end
end
end
Ta gói đoạn xử lý block
trong hàm run_future
và hàm này được gói trong một thread. Thread này sẽ được chạy khi mà object Future
được khởi tạo và khi thread hoàn thành việc xử lý, kết quả trả về được trả lại từ hàm Thread#value
.
Chạy lại test và mọi thứ lại như bình thường.
Xử lý ngoại lệ
Do Future
đưa các xử lý ra các thread riêng biệt và nếu có lỗi gì đó xảy ra khi thực hiện thì rất khó để debug. Do vậy ta cần phải lưu lại các lỗi (exception) này. Đây là một test để thể hiện điều này:
module Futurama
describe "Future" do
it "captures exceptions and re-raises them" do
error_msg = "Good news, everyone!"
future = Futurama::Future.new { raise error_msg }
expect { future.inspect }.to raise_error RuntimeError, error_msg
end
end
end
Test này sẽ tự động pass mà không cần phải viết thêm một đoạn code nào cả. Tuy nhiên nó có một bug đó là nếu Thread.abort_on_exception
được gán là true
thì các exception không được xử lý ở bất kỳ một thread nào sẽ làm chương trình bị dừng lại. Hãy xem vấn đề này với đoạn test trên
module Futurama
describe "Future" do
it "captures exceptions and re-raises them" do
Thread.abort_on_exception = true
#...
end
end
end
Đoạn test này không thể tự pass được nếu ta không viết thêm code cho nó.
Queues
Ta cần một cơ chế để lưu lại các exception khi mà nó xảy ra thay vì dựa vào trình biên dịch. Ta cũng cần phải re-raise
exception khi mà Future
object được gọi đến.
Future
cũng giống như một loại cấu trúc dữ liệu lưu lại kết quả trả về hoặc exception của một xử lý. Và cấu trúc dữ liệu này cũng cần được thread-safety nữa do nó là xử lý không đồng bộ. Vậy làm thế nào để ta thực hiện nó?
Trong Ruby, Queue
là một thread-safe collection class, nó giúp đồng bộ việc trao đổi của các thread. Với Future
, ta chỉ cần lưu kết quả trả về hoặc exception nên ta chỉ cần Queue
có độ lớn tối đa là 1. SizeQueue
sẽ giúp ta đạt được điều này.
module Futurama
class Future < Delegator
def initialize &block
@block = block
@queue = SizedQueue.new 1
@thread = Thread.new { run_future }
end
def run_future
@queue.push value: @block.call
rescue Exception => ex
@queue.push exception: ex
end
def __getobj__
# TODO
end
end
end
Ở đây, ta thêm vào một biến instance queue
cho Future
. Biến này là một SizeQueue
với độ lớn là 1. Sau đó, thay vì trả trực tiếp kết quả từ block
, ta sẽ đẩy kết quả trả về hoặc exception từ việc gọi block vào SizeQueue
.
Lấy kết quả từ Queue
Trước khi đi vào chi tiết, ta cần phải biết rằng một khi Future
đã thực hiện xong xử lý, mỗi lần gọi lại thì nó sẽ trả về kết quả luôn chứ không thực hiện lại xử lý nữa. Có thể hiểu là kết quả của xử lý sẽ được Future
cache lại.
module Futurama
class Future < Delegator
def initialize &block
@block = block
@queue = SizedQueue.new 1
@thread = Thread.new { run_future }
@mutex = Mutex.new
end
#run_future
def __getobj__
resolved_future_or_raise[:value]
end
def resolved_future_or_raise
@resolved_future || @mutex.synchronize do
@resolved_future ||= @queue.pop
end
Kernel.raise @resolved_future[:exception] if @resolved_future[:exception]
@resolved_future
end
end
end
Bây giờ ta sẽ tìm hiểu sâu hơn về hàm resolved_future_or_raise
. Đầu tiên, hàm này sẽ kiểm tra xem Future
đã hoàn thành xủ lý hay có gặp exception gì không bằng việc kiểm tra xem @resolved_future
đã có giá trị hay chưa. Nếu chưa có thì nó sẽ lấy giá trị từ @queue
. Và để đảm bảo việc lấy giá trị từ @queue
và gán nó vào @resolved_future
được thực hiện một cách lần lượt, ta cần gói nó vào một block và đưa vào Mutex
.
Tiếp theo là kiểm tra xem @resolved_future
có exception nào không. Nếu có thì raise lên với Kernel#raise
. Nếu không dùng Kernel
, Thread#raise
sẽ được gọi.
Và cuối cùng, nếu không được gọi thì kết quả sẽ được trả về. Bây giờ chạy lại test, ta sẽ thấy là test pass.
Đưa Future vào Kernel
Thay vì phải viết Futurama::Future.new {}
để sử dụng Future
, ta có thể viết là future {}
. Test case cho nó:
module Futurama
describe "Future" do
it "pollutes the Kernel namespace" do
msg = "Do the Bender!"
future = future { msg }
expect(future).to eq(msg)
end
end
end
Để pass test, ta cần tạo một file tên là kernel.rb trong lib/futurama:
require "futurama"
module Kernel
def future &block
Futurama::Future.new &block
end
end
Thêm file này vào lib/futurama.rb với require
và chạy bây giờ test sẽ lại pass.
Những câu chuyện đùa đồng thời về Chuck Norris
Ta sẽ viết lại chương trình để lấy các câu chuyện đùa về Chuck Norris bằng cách sử dụng Future
mà ta đã viết ở trên và đưa nó vào sample của gem Futurama
.
require "../lib/futurama"
require "open-uri"
require "json"
require "benchmark"
class Chucky
URL = "http://api.icndb.com/jokes/random"
def sequential
open(URL) do |f|
f.each_line { |line| puts JSON.parse(line)["value"]["joke"] }
end
end
def concurrent
future { sequential }
end
end
Code repo: futurama
All rights reserved