12/08/2018, 15:20

Ruby Concurrency và Parallelism

Trong quá trình phát triển ứng dụng, chắc hẳn các bạn đã có lần gặp phải các khái niệm về Concurrency (Đồng thời) và parallelism (song song) trong ruby code. Đôi lúc chúng ta hiểu lầm 2 khái niệm này giống nhau nhưng thực chất lại ngược lại. Trong quá trình tìm hiểu tôi có đọc được một bài báo về ...

Trong quá trình phát triển ứng dụng, chắc hẳn các bạn đã có lần gặp phải các khái niệm về Concurrency (Đồng thời) và parallelism (song song) trong ruby code. Đôi lúc chúng ta hiểu lầm 2 khái niệm này giống nhau nhưng thực chất lại ngược lại. Trong quá trình tìm hiểu tôi có đọc được một bài báo về sự so sánh 2 khái niệm này cũng như kèm theo phân tích ví dụ cụ thể. Xin phép được chia sẻ lại cho mọi người cùng tìm hiểu.

Đi vào chi tiết, ruby concurrency (đồng thời) là khi 2 task có thể start, run và complete trong những khoảng thời gian trùng lặp với nhau. Tuy nhiên, điều đó không có nghĩa là chúng sẽ chạy cả hai task cùng một lúc (ví dụ: multiple threads trên một máy tính đơn nhân xử lý). Ngược lại, parallelism (song song) là khi 2 task chạy cùng lúc đúng theo nghĩa đen (ví dụ: multiple threads trên một máy tính đa nhân xử lý)

Điểm mấu chốt ở đây là các luồng và / hoặc quy trình concurrency sẽ không nhất thiết phải chạy song song. Chúng ta đi vào ví dụ cụ thể để tìm hiểu kĩ hơn sự khác nhau này.

Test Case

Ta tạo 1 class Mailer và thêm vào 1 hàm Fibonacci thay cho hàm sleep() để tăng thời gian xử lý mỗi request của CPU.

class Mailer

  def self.deliver(&block)
    mail = MailBuilder.new(&block).mail
    mail.send_mail
  end

  Mail = Struct.new(:from, :to, :subject, :body) do 
    def send_mail
      fib(30)
      puts "Email from: #{from}"
      puts "Email to  : #{to}"
      puts "Subject   : #{subject}"
      puts "Body      : #{body}"
    end

    def fib(n)
      n < 2 ? n : fib(n-1) + fib(n-2)
    end  
  end

  class MailBuilder
    def initialize(&block)
      @mail = Mail.new
      instance_eval(&block)
    end
    
    attr_reader :mail

    %w(from to subject body).each do |m|
      define_method(m) do |val|
        @mail.send("#{m}=", val)
      end
    end
  end
end

Sau đó chúng ta có thể gọi class Mailer này như sau để gửi mail:

Mailer.deliver do 
  from    "eki@eqbalq.com"
  to      "jill@example.com"
  subject "Threading and Forking"
  body    "Some content"
end

Mục tiêu so sánh chung để xây dựng benchmark ở bài test là thực hiện gửi thư 100 lần.

puts Benchmark.measure{
  100.times do |i|
    Mailer.deliver do 
      from    "eki_#{i}@eqbalq.com"
      to      "jill_#{i}@example.com"
      subject "Threading and Forking (#{i})"
      body    "Some content"
    end
  end
}

Kết qủa với vi xử lý 4 nhân và implementer MRI Ruby 2.0.0p353

user  CPU time       system CPU time      total               real
15.250000              0.020000                    15.270000    ( 15.304447)

Multiple Processes và Multithreading

Khi sử dụng Multiple Processes và Multithreading trong Ruby application đều có nhưng ưu điểm nhược điểm riêng

Processes Threads
Sử dụng nhiều tài nguyên bộ nhớ Sử dụng ít tài nguyên bộ nhớ
-------- --------
Nếu task cha chết trước khi task con hoàn thành, task con sẽ không bị hủy mà vẫn tồn tại trong hệ thống Tất cả các luồng sẽ bị hủy khi 1 luồng bị hủy
-------- --------
Khó khăn trong việc chia nhỏ các processes để chuyển context vì Hệ điều hành cần phải lưu và reload lại toàn bộ các tiến trình Threads đơn giản hơn vì giữa các thread chia sẻ không gian địa chỉ và bộ nhớ
-------- --------
Các processes được đưa vào một không gian bộ nhớ ảo riêng (process isolation) Threads chia sẻ cùng một bộ nhớ, vì vậy cần phải kiểm soát và giải quyết các vấn đề khi sử dụng chung ô nhớ
-------- --------
Yêu cầu sự giao tiếp giữa các process Có thể giao tiếp thông qua queues và shared memory
-------- --------
Chậm hơn khi create và destroy Nhanh hơn khi create và destroy
-------- --------
Dễ dàng code và debug Phức tạp hơn để code và debug

Ruby solutions sử dụng multiple processes:

Resque: Thư viện Redis-backed Ruby để tạo các background jobs, đặt các job vào multiple queues và xử lý sau. Unicorn: Một HTTP server cho các Rack applications cung cấp kết nối có độ trễ thấp, băng thông lớn và tận dụng được các tính năng hữu ích trong Unix/Unix-like kernels.

Ruby solutions sử dụng multithreading:

Sidekiq: Một framework cho Ruby với đầy dủ tính năng dùng cho background processing. Mục đích của nó là nằm tích hợp với bất kì ứng dụng rail nào và để đạt được hiệu suất cao hơn các giải pháp hiện có. Puma: Ruby web server built for concurrency. Thin: A very fast and simple Ruby web server.

Multiple Processes

Trong Ruby, fork() được gọi để tạo ra một bản sao của process hiện tại. Process mới này được lập lịch ở operating system level, vì vậy nó có thể được chạy đồng thời với process gốc, giống như bất kỳ process độc lập khác. (note: fork() là POSIX system call vì vậy không thể gọi khi chạy ruby trên window platform)

Chúng ta sẽ chạy test case, nhưng gọi fork() để sử dụng multiple processes:

puts Benchmark.measure{
  100.times do |i|
    fork do     
      Mailer.deliver do 
        from    "eki_#{i}@eqbalq.com"
        to      "jill_#{i}@example.com"
        subject "Threading and Forking (#{i})"
        body    "Some content"
      end
    end
  end
  Process.waitall
}

(Process.waitall để đợi tất cả các process con hoàn thành và trả về 1 mảng status của các process.)

Kết qủa với vi xử lý 4 nhân và implementer MRI Ruby 2.0.0p353

user  CPU time       system CPU time      total               real
0.000000                 0.030000                    27.000000   (  3.788106)

Kết quả là thời gian gửi mail đã nhanh hơn 5 lần.

Tuy giải quyết được vấn đề thời gian xử lý, nhưng nó lại tốn một lượng lớn bộ nhớ, vì vậy giá phải trả là khá đắt. Nếu ứng dụng của bạn sử dụng 20MB bộ nhớ, khi sử dụng pork() sẽ tiêu tốn thành 100 lần ~ 2Gb bộ nhớ!

Mặc dù multithreading phức tạp hơn nhưng điều đó nên được cân nhắc với việc sử dụng fork().

Ruby Multithreading

Và bây giờ chúng ta test với multithreading techniques

Multiple threads trong một single process sẽ có chi phí ít hơn đáng kể so với một số lượng process vì chúng chia sẻ không gian địa chỉ cũng như bộ nhớ:

threads = []

puts Benchmark.measure{
  100.times do |i|
    threads << Thread.new do     
      Mailer.deliver do 
        from    "eki_#{i}@eqbalq.com"
        to      "jill_#{i}@example.com"
        subject "Threading and Forking (#{i})"
        body    "Some content"
      end
    end
  end
  threads.map(&:join)
}

Kết qủa với vi xử lý 4 nhân và implementer MRI Ruby 2.0.0p353

user  CPU time       system CPU time      total               real
13.710000              0.040000                    13.750000    ( 13.740204)

Kết quả không khả quan hơn mấy mặc dù chúng ta đã sử dụng multithreading, đó là vì Global Interpreter Lock (GIL). Và vì GIL, MRI implementation không thực sự hỗ trợ threading.

Global Interpreter Lock là cơ chế được sử dụng trong ngôn ngữ thông dịch nhằm đồng bộ hóa quá trình thực thi các threads vì vậy chỉ có duy nhất 1 thread được thực thi tại một thời điểm. Một trình thông dịch sử dụng GIL sẽ chỉ cho pheps có duy nhất 1 thread được thực thi tại một thời điểm, dù là ở trên multi-core processor. Ruby MRI and CPython là 2 ví dụ trình thông dịch có GIL.

Vậy làm cách nào để sử dụng multithreading thực thi code? Chúng ta thay thế MRI bằng các Ruby implementation khác như JRuby, Rubinius, vì chúng không có GIL và có hỗ trợ parallel Ruby threading.

Và lần này là implement bằng JRuby (thay vì MRI):

user  CPU time       system CPU time      total               real
43.240000              0.140000                    43.380000     (  5.655000)

Threads cũng không phải miễn phí

Nâng cao hiệu suất bằng sử dụng multiple threads khiến chúng ta nghĩ rằng có thể tăng thêm số thread để tăng tốc độ lên thật nhanh và nhanh hơn nữa. Tuy nhiên thực thế lại không phải vậy. Số thread không phải vô hạn, chúng có giới hạn Chúng ta thực hiện test gửi mail 100 lần thay vì 10,000 lần:

threads = []

puts Benchmark.measure{
  10_000.times do |i|
    threads << Thread.new do     
      Mailer.deliver do 
        from    "eki_#{i}@eqbalq.com"
        to      "jill_#{i}@example.com"
        subject "Threading and Forking (#{i})"
        body    "Some content"
      end
    end
  end
  threads.map(&:join)
}

Kết quả:

can't create Thread: Resource temporarily unavailable (ThreadError)

Thread Pooling

Vì giới hạn về tài nguyên, chúng ta có 1 phương pháp thay thế hữu hiệu hơn: thread pooling

Thread pool là một nhóm các thread đã được khởi tạo, có thể tái sử dụng và có thể được gọi để thực thi nếu cần. Thread pools đặc biệt hữu ích với các tasks nhỏ nhưng có số lượng lớn. Điều này giúp ngăn chặn việc tạo ra một số lượng lớn hẳn các thread gây quá tải với hệ thống. Một key configuration cho thread pool thường là số lượng threads trong pool. Các threads có thể được khởi tạo cùng một lúc (khi pool được tạo) hoặc lazily (Nếu cần thiết đến khi số lượng thread trong pool đạt số lương max để tạo pool ). Khi pool được giao task để thực thi, nó gán task tới một thread đang idle. Nếu không có thread nào đang rảnh (số lượng thread max đã được tạo) nó chờ cho thread hoàn thành việc và idle sau đó gán task cho thread đó.

Quay trở lại với test case, chúng ta sử dụng Queue để thực thi như ví dụ đơn giản về thread pool:

require “./lib/mailer” require “benchmark” require ‘thread’

POOL_SIZE = 10

jobs = Queue.new

10_0000.times{|i| jobs.push i}

workers = (POOL_SIZE).times.map do
  Thread.new do
    begin      
      while x = jobs.pop(true)
        Mailer.deliver do 
          from    "eki_#{x}@eqbalq.com"
          to      "jill_#{x}@example.com"
          subject "Threading and Forking (#{x})"
          body    "Some content"
        end        
      end
    rescue ThreadError
    end
  end
end

workers.map(&:join)

Trong code chúng ta tạo 1 hàng đợi cho các job cần thực thi. Queue được sử dụng vì nó là thread-safe (nếu multiple threads truy cập tới nó chúng một thời điểm, nó sẽ vẫn duy trì tính nhất quán). Push IDs của mailers vào job queue và tạo pool với 10 thread. Trong mỗi thread, ta pop items từ jobs queue. Do đó, life-cycle của 1 thread được tiếp tục chờ cho tới khi task được đưa vào job Queue và được thực thi. Việc xử lý như trên sẽ không vấp phải bất kì lỗi nào, tuy nhiên code lại phức tạp, kể cả với ví dụ đơn giản như trên.

Celluloid

Vì có hệ sinh thái Ruby Gem đa dạng, rất nhiều multithreading phức tạp được đóng gói dễ dàng dưới dạng gem và dễ dàng để sử dụng, một ví dụ là Celluloid. Celluloid framework là cách đơn giản và dễ để thực thi, giúp chunsgt a có thể build các chương trình concurrent từ các đối tượng concurrent cũng dễ dàng như build các chương trình sequential từ các đối tượng sequential.

Sau đây là đoạn code ứng dụng multithreaded và sử dùngj Celluloid:

require "benchmark"
require "celluloid"

class MailWorker
  include Celluloid

  def send_email(id)
    Mailer.deliver do 
      from    "eki_#{id}@eqbalq.com"
      to      "jill_#{id}@example.com"
      subject "Threading and Forking (#{id})"
      body    "Some content"
    end       
  end
end

mailer_pool = MailWorker.pool(size: 10)

10_000.times do |i|
  mailer_pool.async.send_email(i)
end

Background Jobs

Một vài gem hỗ trợ background processing (lưu job vào hàng đợi và xử lý sau mà không ảnh hưởng tới thread hiện tại) ví dụ Sidekiq, Resque, Delayed Job, and Beanstalkd. Trong bài viết ta sử dung một ví dụ quen thuộc về sử dụng Sidekiq và Redis.

Cài đặt redis:

brew install redis
redis-server /usr/local/etc/redis.conf

Sử dụng Sidekiq:

require_relative "../lib/mailer"
require "sidekiq"

class MailWorker
  include Sidekiq::Worker
  
  def perform(id)
    Mailer.deliver do 
      from    "eki_#{id}@eqbalq.com"
      to      "jill_#{id}@example.com"
      subject "Threading and Forking (#{id})"
      body    "Some content"
    end  
  end
end

Chúng ta gọi Sidekiq mail_worker.rb file:

sidekiq  -r ./mail_worker.rb

Kết quả:

⇒  irb
>> require_relative "mail_worker"
=> true
>> 100.times{|i| MailWorker.perform_async(i)}
2014-12-20T02:42:30Z 46549 TID-ouh10w8gw INFO: Sidekiq client with redis options {}
=> 100

Kết luận

Một các tiếp cận đơn giản là sử dụng fork để chạy các process nhưng tốn tài nguyên và nguồn tài nguyên ấy lại có hạn. Các kĩ thuật khác để ứng dụng multithreading như thread pool nhưng phức tạp, tuy nhiên cũng có các gems đóng gói sẵn giúp việc sử dụng multithreading trong ứng dụng trở nên dễ dàng hơn, ví dụ như Celluloid. Một cách khác để xử lý các process tốn thời gian là sử dụng background processing. Có rất nhiều thư viện và dịch vụ cho phép bạn thực hiện background jobs trong ứng dụng. There are many libraries and services that allow you to implement background jobs in your applications. Vài công cụ phổ biến đó là database-backed job frameworks va message queues. Forking, threading, và background processing cũng là các thay thế tốt. Tùy thuộc và yêu cầu cũng như tính chất của ứng dụng mà các bạn có thể chọn cho hợp lý.

Nguồn: https://www.toptal.com/ruby/

0