+1

Tạo một Future object Với các thư viện concurrency trong ruby

Future là một khái niệm trừu tượng mô tả về kết quả của một xử lý không đồng bộ. Nó có nghĩa là nếu bạn đưa các xử lý tính toán vào Future thì nó sẽ được thực thi trong một thread mới riêng biệt, tức là thread chính của chương trình có thể thực hiện các tác vụ khác mà không cần phải chờ đợi.

Trong bài viết này, chúng ta sẽ viết một thư viện Future với Ruby và cùng với đó, các bạn có thể tìm hiểu thêm về các thư viện concurrency mà Ruby cung cấp.

Future hữu ích như thế nào?

Trước khi bắt đầu vào implement, hãy xem Future hữu ích như thế nào. Future rất thích hợp để gửi các HTTP request song song. Chúng ta sẽ bắt đầu bằng một ứng dụng ruby đơn giản là lấy một chuyện đùa bất kỳ về Chuck Norris từ The Internet Chuck Norris Database

require "open-uri"
require "json"
require "benchmark"

class Chucky
  URL = "http://api.icndb.com/jokes/random"

  def sequential
    open(URL) do |f|
      f.each_line { |line| puts JSON.parse(line)["value"]["joke"] }
    end
  end
end

Chucky.new.sequential
# There are two types of people in the world... people that suck, and Chuck Norris.

Mỗi lần hàm sequential được gọi, chương trình sẽ lấy một chuyện đùa bất kỳ về Chuck Norris. Vấn đề gì sẽ xảy ra nếu ta muốn lấy nhiều hơn 10 chuyện? Giải pháp thông thường sẽ là:

10.times { chucky.sequential }

Không may là viết thế này sẽ làm lãng phí tài nguyên của CPU và thời gian của chúng ta. Mỗi lần request được gửi để lấy dữ liệu, luồng xử lý chính sẽ bị block và phải chờ request được xử lý xong trước khi chuyển đến request tiếp theo. Với Future, ta hoàn toàn có thể khắc phục được điều đó.

Viết test

Chúng ta sẽ viết một gem Future riêng biệt với TDD (Test Driven Development). Đầu tiên, ta sẽ tạo một Ruby gem mới với câu lệnh:

bundle gem futurama

Tiếp theo, chạy bin/setup để thiết lập các dependencies

Bước 1

Test đầu tiên của chúng ta sẽ rất đơn giản:

require "spec_helper"
require "timeout"

module Futurama
  describe "Future" do
    it "returns a value" do
      future = Futurama::Future.new { 1 + 2 }
      expect(future).to eq(3)
    end
  end
end

Test trên mô tả cách thức để tạo ra một Future. Một Future object sẽ nhận một block chứa các tác vụ và khi object Future được gọi đến, nó sẽ trả về giá trị của các tác vụ được truyền vào từ block.

Sau khi đã có test, ta sẽ bắt đầu viết code cho nó. Tạo một file tên là future.rb trong folder lib/futurama. Sau đó là đưa require.rb vào lib/futurama.rb như sau:

require "futurama/version"
require "futurama/future"

module Futurama
end

Để có thể pass được test, một Future object cần phải:

  • nhận một block

  • trả về giá trị trong block khi nó được gọi

Với điều kiện đầu tiên, khá đơn giản:

module Futurama
  class Future
    def initialize &block
      @block = block
    end
  end
end

Với điệu kiện thứ 2 sẽ phức tạp hơn một chút:

require "delegate"

module Futurama
  class Future < Delegator
    # initialize was here
    def __getobj__
      @block.call
    end
  end
end

Khi kế thừa Delegator, ta phải viết hàm __getobj__, nếu không Ruby sẽ đưa ra exception. Tuy nhiên, hàm này là cần thiết vì khi object được gọi đến, nó sẽ gọi lại hàm này, và nếu muốn trả lại giá trị từ block truyền vào, ta chỉ cần gọi block trong method này.

Bây giờ khi chạy test, ta sẽ thấy là test pass.

Xử lý tính toán Background

Quay lại với việc viết test. Một object Future sẽ nhận một xỷ lý và chạy nó trong một thread mới. Để có thể test nó, ta có thể tạo một Future object và bắt nó đợi 1 giây trước khi trả về kết quả. Ở thread chính, ta cũng mô phỏng lại một xử lý mà mất 1 giây:

require "timeout"

module Futurama
  describe "Future" do
    it "executes the computation in the background" do
      future = Futurama::Future.new { sleep(1); 42 }

      sleep(1) # do some computation

      Timeout::timeout(0.9) do
        expect(future).to eq(42)
      end
    end
  end
end

Vì xử lý trong Future object chạy trong một thread mới cũng tương đương với việc thread chính delay 1 giây. Về mặt thực t thì nó sẽ mất ít hơn 1 giây trước khi kết quả từ Future được trả về. Ta sử dụng thư viện Timeout của Ruby để đảm bảo việc Future hoàn thành xử lý trong khoảng thời gian ta đã thiết lập.

Implementation

module Futurama
  class Future < Delegator
    def initialize &block
      @block  = block
      @thread = Thread.new { run_future }
    end

    def run_future
      @block.call
    end

    def __getobj__
      @thread.value
    end
  end
end

Ta gói đoạn xử lý block trong hàm run_future và hàm này được gói trong một thread. Thread này sẽ được chạy khi mà object Future được khởi tạo và khi thread hoàn thành việc xử lý, kết quả trả về được trả lại từ hàm Thread#value.

Chạy lại test và mọi thứ lại như bình thường.

Xử lý ngoại lệ

Do Future đưa các xử lý ra các thread riêng biệt và nếu có lỗi gì đó xảy ra khi thực hiện thì rất khó để debug. Do vậy ta cần phải lưu lại các lỗi (exception) này. Đây là một test để thể hiện điều này:

module Futurama
  describe "Future" do
    it "captures exceptions and re-raises them" do
      error_msg = "Good news, everyone!"
      future = Futurama::Future.new { raise error_msg }
      expect { future.inspect }.to raise_error RuntimeError, error_msg
    end
  end
end

Test này sẽ tự động pass mà không cần phải viết thêm một đoạn code nào cả. Tuy nhiên nó có một bug đó là nếu Thread.abort_on_exception được gán là true thì các exception không được xử lý ở bất kỳ một thread nào sẽ làm chương trình bị dừng lại. Hãy xem vấn đề này với đoạn test trên

module Futurama
  describe "Future" do
    it "captures exceptions and re-raises them" do
      Thread.abort_on_exception = true
      #...
    end
  end
end

Đoạn test này không thể tự pass được nếu ta không viết thêm code cho nó.

Queues

Ta cần một cơ chế để lưu lại các exception khi mà nó xảy ra thay vì dựa vào trình biên dịch. Ta cũng cần phải re-raise exception khi mà Future object được gọi đến.

Future cũng giống như một loại cấu trúc dữ liệu lưu lại kết quả trả về hoặc exception của một xử lý. Và cấu trúc dữ liệu này cũng cần được thread-safety nữa do nó là xử lý không đồng bộ. Vậy làm thế nào để ta thực hiện nó?

Trong Ruby, Queue là một thread-safe collection class, nó giúp đồng bộ việc trao đổi của các thread. Với Future, ta chỉ cần lưu kết quả trả về hoặc exception nên ta chỉ cần Queue có độ lớn tối đa là 1. SizeQueue sẽ giúp ta đạt được điều này.

module Futurama
  class Future < Delegator
    def initialize &block
      @block  = block
      @queue  = SizedQueue.new 1
      @thread = Thread.new { run_future }
    end

    def run_future
      @queue.push value: @block.call
    rescue Exception => ex
      @queue.push exception: ex
    end

    def __getobj__
      # TODO
    end
  end
end

Ở đây, ta thêm vào một biến instance queue cho Future. Biến này là một SizeQueue với độ lớn là 1. Sau đó, thay vì trả trực tiếp kết quả từ block, ta sẽ đẩy kết quả trả về hoặc exception từ việc gọi block vào SizeQueue.

Lấy kết quả từ Queue

Trước khi đi vào chi tiết, ta cần phải biết rằng một khi Future đã thực hiện xong xử lý, mỗi lần gọi lại thì nó sẽ trả về kết quả luôn chứ không thực hiện lại xử lý nữa. Có thể hiểu là kết quả của xử lý sẽ được Future cache lại.

module Futurama
  class Future < Delegator
    def initialize &block
      @block  = block
      @queue  = SizedQueue.new 1
      @thread = Thread.new { run_future }
      @mutex  = Mutex.new
    end

    #run_future

    def __getobj__
      resolved_future_or_raise[:value]
    end

    def resolved_future_or_raise
      @resolved_future || @mutex.synchronize do
        @resolved_future ||= @queue.pop
      end

      Kernel.raise @resolved_future[:exception] if @resolved_future[:exception]
      @resolved_future
    end
  end
end

Bây giờ ta sẽ tìm hiểu sâu hơn về hàm resolved_future_or_raise. Đầu tiên, hàm này sẽ kiểm tra xem Future đã hoàn thành xủ lý hay có gặp exception gì không bằng việc kiểm tra xem @resolved_future đã có giá trị hay chưa. Nếu chưa có thì nó sẽ lấy giá trị từ @queue. Và để đảm bảo việc lấy giá trị từ @queue và gán nó vào @resolved_future được thực hiện một cách lần lượt, ta cần gói nó vào một block và đưa vào Mutex.

Tiếp theo là kiểm tra xem @resolved_future có exception nào không. Nếu có thì raise lên với Kernel#raise. Nếu không dùng Kernel, Thread#raise sẽ được gọi.

Và cuối cùng, nếu không được gọi thì kết quả sẽ được trả về. Bây giờ chạy lại test, ta sẽ thấy là test pass.

Đưa Future vào Kernel

Thay vì phải viết Futurama::Future.new {} để sử dụng Future, ta có thể viết là future {}. Test case cho nó:

module Futurama
  describe "Future" do
    it "pollutes the Kernel namespace" do
      msg    = "Do the Bender!"
      future = future { msg }
      expect(future).to eq(msg)
    end
  end
end

Để pass test, ta cần tạo một file tên là kernel.rb trong lib/futurama:

require "futurama"

module Kernel
  def future &block
    Futurama::Future.new &block
  end
end

Thêm file này vào lib/futurama.rb với require và chạy bây giờ test sẽ lại pass.

Những câu chuyện đùa đồng thời về Chuck Norris

Ta sẽ viết lại chương trình để lấy các câu chuyện đùa về Chuck Norris bằng cách sử dụng Future mà ta đã viết ở trên và đưa nó vào sample của gem Futurama.

require "../lib/futurama"
require "open-uri"
require "json"
require "benchmark"

class Chucky
  URL = "http://api.icndb.com/jokes/random"

  def sequential
    open(URL) do |f|
      f.each_line { |line| puts JSON.parse(line)["value"]["joke"] }
    end
  end

  def concurrent
    future { sequential }
  end
end

Code repo: futurama

Nguồn: Learn Concurrency by Implementing Futures in Ruby


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí