Tạo một Future object Với các thư viện concurrency trong ruby
Future là một khái niệm trừu tượng mô tả về kết quả của một xử lý không đồng bộ . Nó có nghĩa là nếu bạn đưa các xử lý tính toán vào Future thì nó sẽ được thực thi trong một thread mới riêng biệt, tức là thread chính của chương trình có thể thực hiện các tác vụ khác mà không cần phải chờ đợi. ...
Future là một khái niệm trừu tượng mô tả về kết quả của một xử lý không đồng bộ. Nó có nghĩa là nếu bạn đưa các xử lý tính toán vào Future thì nó sẽ được thực thi trong một thread mới riêng biệt, tức là thread chính của chương trình có thể thực hiện các tác vụ khác mà không cần phải chờ đợi.
Trong bài viết này, chúng ta sẽ viết một thư viện Future với Ruby và cùng với đó, các bạn có thể tìm hiểu thêm về các thư viện concurrency mà Ruby cung cấp.
Future hữu ích như thế nào?
Trước khi bắt đầu vào implement, hãy xem Future hữu ích như thế nào. Future rất thích hợp để gửi các HTTP request song song. Chúng ta sẽ bắt đầu bằng một ứng dụng ruby đơn giản là lấy một chuyện đùa bất kỳ về Chuck Norris từ The Internet Chuck Norris Database
require "open-uri" require "json" require "benchmark" class Chucky URL = "http://api.icndb.com/jokes/random" def sequential open(URL) do |f| f.each_line { |line| puts JSON.parse(line)["value"]["joke"] } end end end Chucky.new.sequential # There are two types of people in the world... people that suck, and Chuck Norris.
Mỗi lần hàm sequential được gọi, chương trình sẽ lấy một chuyện đùa bất kỳ về Chuck Norris. Vấn đề gì sẽ xảy ra nếu ta muốn lấy nhiều hơn 10 chuyện? Giải pháp thông thường sẽ là:
10.times { chucky.sequential }
Không may là viết thế này sẽ làm lãng phí tài nguyên của CPU và thời gian của chúng ta. Mỗi lần request được gửi để lấy dữ liệu, luồng xử lý chính sẽ bị block và phải chờ request được xử lý xong trước khi chuyển đến request tiếp theo. Với Future, ta hoàn toàn có thể khắc phục được điều đó.
Viết test
Chúng ta sẽ viết một gem Future riêng biệt với TDD (Test Driven Development). Đầu tiên, ta sẽ tạo một Ruby gem mới với câu lệnh:
bundle gem futurama
Tiếp theo, chạy bin/setup để thiết lập các dependencies
Bước 1
Test đầu tiên của chúng ta sẽ rất đơn giản:
require "spec_helper" require "timeout" module Futurama describe "Future" do it "returns a value" do future = Futurama::Future.new { 1 + 2 } expect(future).to eq(3) end end end
Test trên mô tả cách thức để tạo ra một Future. Một Future object sẽ nhận một block chứa các tác vụ và khi object Future được gọi đến, nó sẽ trả về giá trị của các tác vụ được truyền vào từ block.
Sau khi đã có test, ta sẽ bắt đầu viết code cho nó. Tạo một file tên là future.rb trong folder lib/futurama. Sau đó là đưa require.rb vào lib/futurama.rb như sau:
require "futurama/version" require "futurama/future" module Futurama end
Để có thể pass được test, một Future object cần phải:
-
nhận một block
-
trả về giá trị trong block khi nó được gọi
Với điều kiện đầu tiên, khá đơn giản:
module Futurama class Future def initialize &block @block = block end end end
Với điệu kiện thứ 2 sẽ phức tạp hơn một chút:
require "delegate" module Futurama class Future < Delegator # initialize was here def __getobj__ @block.call end end end
Khi kế thừa Delegator, ta phải viết hàm __getobj__, nếu không Ruby sẽ đưa ra exception. Tuy nhiên, hàm này là cần thiết vì khi object được gọi đến, nó sẽ gọi lại hàm này, và nếu muốn trả lại giá trị từ block truyền vào, ta chỉ cần gọi block trong method này.
Bây giờ khi chạy test, ta sẽ thấy là test pass.
Xử lý tính toán Background
Quay lại với việc viết test. Một object Future sẽ nhận một xỷ lý và chạy nó trong một thread mới. Để có thể test nó, ta có thể tạo một Future object và bắt nó đợi 1 giây trước khi trả về kết quả. Ở thread chính, ta cũng mô phỏng lại một xử lý mà mất 1 giây:
require "timeout" module Futurama describe "Future" do it "executes the computation in the background" do future = Futurama::Future.new { sleep(1); 42 } sleep(1) # do some computation Timeout::timeout(0.9) do expect(future).to eq(42) end end end end
Vì xử lý trong Future object chạy trong một thread mới cũng tương đương với việc thread chính delay 1 giây. Về mặt thực t thì nó sẽ mất ít hơn 1 giây trước khi kết quả từ Future được trả về. Ta sử dụng thư viện Timeout của Ruby để đảm bảo việc Future hoàn thành xử lý trong khoảng thời gian ta đã thiết lập.
Implementation
module Futurama class Future < Delegator def initialize &block @block = block @thread = Thread.new { run_future } end def run_future @block.call end def __getobj__ @thread.value end end end
Ta gói đoạn xử lý block trong hàm run_future và hàm này được gói trong một thread. Thread này sẽ được chạy khi mà object Future được khởi tạo và khi thread hoàn thành việc xử lý, kết quả trả về được trả lại từ hàm Thread#value.
Chạy lại test và mọi thứ lại như bình thường.
Xử lý ngoại lệ
Do Future đưa các xử lý ra các thread riêng biệt và nếu có lỗi gì đó xảy ra khi thực hiện thì rất khó để debug. Do vậy ta cần phải lưu lại các lỗi (exception) này. Đây là một test để thể hiện điều này:
module Futurama describe "Future" do it "captures exceptions and re-raises them" do error_msg = "Good news, everyone!" future = Futurama::Future.new { raise error_msg } expect { future.inspect }.to raise_error RuntimeError, error_msg end end end
Test này sẽ tự động pass mà không cần phải viết thêm một đoạn code nào cả. Tuy nhiên nó có một bug đó là nếu Thread.abort_on_exception được gán là true thì các exception không được xử lý ở bất kỳ một thread nào sẽ làm chương trình bị dừng lại. Hãy xem vấn đề này với đoạn test trên
module Futurama describe "Future" do it "captures exceptions and re-raises them" do Thread.abort_on_exception = true #... end end end
Đoạn test này không thể tự pass được nếu ta không viết thêm code cho nó.
Queues
Ta cần một cơ chế để lưu lại các exception khi mà nó xảy ra thay vì dựa vào trình biên dịch. Ta cũng cần phải re-raise exception khi mà Future object được gọi đến.
Future cũng giống như một loại cấu trúc dữ liệu lưu lại kết quả trả về hoặc exception của một xử lý. Và cấu trúc dữ liệu này cũng cần được thread-safety nữa do nó là xử lý không đồng bộ. Vậy làm thế nào để ta thực hiện nó?
Trong Ruby, Queue là một thread-safe collection class, nó giúp đồng bộ việc trao đổi của các thread. Với Future, ta chỉ cần lưu kết quả trả về hoặc exception nên ta chỉ cần Queue có độ lớn tối đa là 1. SizeQueue sẽ giúp ta đạt được điều này.
module Futurama class Future < Delegator def initialize &block @block = block @queue = SizedQueue.new 1 @thread = Thread.new { run_future } end def run_future @queue.push value: @block.call rescue Exception => ex @queue.push exception: ex end def __getobj__ # TODO end end end
Ở đây, ta thêm vào một biến instance queue cho Future. Biến này là một SizeQueue với độ lớn là 1. Sau đó, thay vì trả trực tiếp kết quả từ block, ta sẽ đẩy kết quả trả về hoặc exception từ việc gọi block vào SizeQueue.
Lấy kết quả từ Queue
Trước khi đi vào chi tiết, ta cần phải biết rằng một khi Future đã thực hiện xong xử lý, mỗi lần gọi lại thì nó sẽ trả về kết quả luôn chứ không thực hiện lại xử lý nữa. Có thể hiểu là kết quả của xử lý sẽ được Future cache lại.
module Futurama class Future < Delegator def initialize &block @block = block @queue = SizedQueue.new 1 @thread = Thread.new { run_future } @mutex = Mutex.new end #run_future def __getobj__ resolved_future_or_raise[:value] end def resolved_future_or_raise @resolved_future || @mutex.synchronize do @resolved_future ||= @queue.pop end Kernel.raise @resolved_future[:exception] if @resolved_future[:exception] @resolved_future end end end
Bây giờ ta sẽ tìm hiểu sâu hơn về hàm resolved_future_or_raise. Đầu tiên, hàm này sẽ kiểm tra xem Future đã hoàn thành xủ lý hay có gặp exception gì không bằng việc kiểm tra xem @resolved_future đã có giá trị hay chưa. Nếu chưa có thì nó sẽ lấy giá trị từ @queue. Và để đảm bảo việc lấy giá trị từ @queue và gán nó vào @resolved_future được thực hiện một cách lần lượt, ta cần gói nó vào một block và đưa vào Mutex.
Tiếp theo là kiểm tra xem @resolved_future có exception nào không. Nếu có thì raise lên với Kernel#raise. Nếu không dùng Kernel, Thread#raise sẽ được gọi.
Và cuối cùng, nếu không được gọi thì kết quả sẽ được trả về. Bây giờ chạy lại test, ta sẽ thấy là test pass.
Đưa Future vào Kernel
Thay vì phải viết Futurama::Future.new {} để sử dụng Future, ta có thể viết là future {}. Test case cho nó:
module Futurama describe "Future" do it "pollutes the Kernel namespace" do msg = "Do the Bender!" future = future { msg } expect(future).to eq(msg) end end end
Để pass test, ta cần tạo một file tên là kernel.rb trong lib/futurama:
require "futurama" module Kernel def future &block Futurama::Future.new &block end end
Thêm file này vào lib/futurama.rb với require và chạy bây giờ test sẽ lại pass.
Những câu chuyện đùa đồng thời về Chuck Norris
Ta sẽ viết lại chương trình để lấy các câu chuyện đùa về Chuck Norris bằng cách sử dụng Future mà ta đã viết ở trên và đưa nó vào sample của gem Futurama.
require "../lib/futurama" require "open-uri" require "json" require "benchmark" class Chucky URL = "http://api.icndb.com/jokes/random" def sequential open(URL) do |f| f.each_line { |line| puts JSON.parse(line)["value"]["joke"] } end end def concurrent future { sequential } end end
Code repo: futurama
Nguồn: Learn Concurrency by Implementing Futures in Ruby