07/09/2018, 15:49

Nhập liệu với JSON Streaming

Tuần vừa rồi tôi khởi động một dự án nho nhỏ để thí nghiệm cách viết một JSON API hoàn chỉnh với Lotus Framework, có một thủ thuật về nhập liệu mà tôi thấy rất hay và muốn chia sẻ với mọi người, thủ thuật tôi gọi là nhập liệu thông qua streaming JSON object. Tôi có viết một gem tên cricos_scrape ...

Tuần vừa rồi tôi khởi động một dự án nho nhỏ để thí nghiệm cách viết một JSON API hoàn chỉnh với Lotus Framework, có một thủ thuật về nhập liệu mà tôi thấy rất hay và muốn chia sẻ với mọi người, thủ thuật tôi gọi là nhập liệu thông qua streaming JSON object.

Tôi có viết một gem tên cricos_scrape để "vét" dữ liệu của trang web CRICOS của bộ giáo dục Úc. Khi bắt đầu viết tôi đã xem xét các định dạng dữ liệu mà tôi có thể sử dụng để lưu trữ, trong đó có CSV, XML, JSON. Tôi quyết định sử dụng JSON vì tôi thấy JSON có nhiều ưu điểm như có định dạng kiểu thay vì chỉ toàn là string như JSON, và cũng đơn giản để parse hơn nhiều so với XML.

Phiên bản đầu tiên của gem này sẽ vét và lưu toàn bộ dữ liệu thành một array các JSON object và được lưu giữ trong một file json. Vd:

[
  {"provider_id":1,"provider_code":"00001K","name":"Canberra Institute of Technology"},
  {"provider_id":2,"provider_code":"00002J","name":"Macquarie University (Macquarie)"}
]

Sau đó tôi chỉ việc viết một script nhỏ đọc nội dung của file JSON này và chuyển đổi nó thành một JSON object, sau đó lặp qua từng đối tượng của array này để nhập liệu vào DB.

Nhưng tôi không hài lòng với cách sử dụng một array JSON object.

Thứ nhất là nếu bạn có một khối lượng lớn dữ liệu trong array thì việc đọc file rồi lưu vào bộ nhớ chậm, và có thể sẽ là vấn đề nếu không đủ RAM lí do GC không dọn rác hiệu quả.

Thứ hai là tôi thấy giải pháp của tôi ở trên quá phức tạp và khá khó để bảo trì hay test. Để tôi nói rõ hơn một chút, đoạn mã nhập liệu của tôi không đơn giản chỉ là đọc file rồi lưu vào DB mà nó còn bao gồm các bước chuẩn bị dữ liệu, làm sạch hoặc chuyển đổi dữ liệu theo yêu cầu. Có thể thấy là cho hết mọi bước đấy vào cùng một chỗ đã vi phạm luật đơn nhiệm (Single Responsibility).

Thực sự vào thời điểm tôi viết phiên bản đầu tiên thì tôi không chuyên tâm để tìm cách khắc phục các vấn đề trên cho lắm. Vào trung tuần tháng 6, tôi có dịp tiếp cận và học NodeJS ở cty. Tôi thực sự rất ấn tượng về Streaming API của NodeJS. Tôi thích cái khái niệm Streaming, dữ liệu đầu vào sẽ đi qua một 'dòng chảy' và ở mỗi 'chặng' thì dữ liệu sẽ đc thay đổi và đc tiếp tục truyền đi tới chặng tiếp theo. Nghĩ lại thì thực sự khái niệm đơn giản này được sử dụng rất nhiều, từ UNIX pipeline (STDOUT, STDIN) cho đến cách của các ngôn ngữ hướng chức năng (functional programming) xử lý dữ liệu.

Dựa trên nguyên lý streaming này, tôi refactor lại gem cricos_scrape, thay vì sử dụng Ruby IO để lưu dữ liệu vào file, thì tôi tận dụng UNIX pipeline để làm việc đó. Tôi thay đổi để code của tôi trả về JSON object của dữ liệu vào STDOUT thông qua hàm puts, có thể tưởng tượng

open('data.json', 'w') do |f|
  scrapped_data.each do |line|
    f.puts line
  end
end

thay vào với:

scrapped_data.each do |line|
  puts line
end
cricos_scrape_script.rb >> data.json

Có thể thấy ở trên tôi append STDOUT vào file data.json (chú ý >>). Tôi xoá bỏ chức năng lưu trữ ra khỏi code và để người dùng toàn quyền quyết định cách lưu giữ.

Một điểm khác biệt nữa đáng lưu ý là nội dung của data.json là nhiều dòng JSON object, chứ không phải là một array object:

{"provider_id":1,"provider_code":"00001K","name":"Canberra Institute of Technology"}
{"provider_id":2,"provider_code":"00002J","name":"Macquarie University (Macquarie)"}

Điều này giúp cho việc đọc từng dòng và xử lý dễ dàng.

Tiếp theo thôi thay đổi code của import script:

STDIN.read.split("
").each do |line|
  json_obj = JSON.parse(line)
  save_object(json_obj)
end
cat data.json | cricos_import_script.rb

tôi dùng lệnh cat để đọc nội dung của file data.json sau đó pipe nó thành STDIN (thông qua |) vào import script của tôi. Import script sẽ tự động đọc STDIN stream này, tách chúng ra theo ký tự xuống dòng rồi mỗi line đc nhập liệu vào DB.

Hoặc có thể kết hợp với scrape script trực tiếp:

cricos_scrape_script.rb | cricos_import_script.rb

Sức mạnh của stream là xâu chuỗi sử lý dữ liệu đầu vào, tôi có thể thêm vào pipeline của tôi các đoạn script xử lý khác:

cricos_scrape_script.rb | cricos_sanitize_script.rb | cricos_import_script.rb

Chỉ việc đảm bảo là các đoạn script của tôi sẽ truyền dữ liệu thông qua STDOUT và nhận dữ liệu thông qua STDIN

Xin lưu ý là shell của bạn có giới hạn về buffer, nếu file của bạn quá lớn thì có thể kết hợp UNIX command split để tách nhỏ file thành nhiều file. Cuối cùng là cách tạo ra nhiều script nhỏ cũng khá hay, bạn sẽ không phải lo việc tràn bộ nhớ nếu GC không làm việc hiệu quả, tất cả các process bị huỷ sẽ gở bỏ phần bộ nhớ chiếm dụng ngay lập tức.

Có thể thấy là bằng cách phân tách nhiệm vụ ra thành nhiều script nhỏ liên kết với nhau thông qua UNIX pipeline, chúng ta có một pipeline rất đơn giản, dễ test và đc việc. Thủ thuật ở đây là phân tách dữ liệu thành các JSON object và pipe chúng ra từng dòng một.

Mong các bạn độc giả hãy tích cực tận dụng nguyên lý streaming/pipeline này để xử lý dữ liệu.

0