Cách viết Python code concurrent chỉ với 3 dòng
Hôm nay tôi sẽ chia sẻ với các bạn một mẹo hay dùng trong Python’s standard library. Toàn bộ code chạy trên Python 3.2+ mà không cần thêm bất kì package ngoài nào khác. Vấn đề được đặt ra Giả sử như bạn có tới hàng trăm ngàn URL cần phải được xử lí/download/ xem xét, như vậy ...
Hôm nay tôi sẽ chia sẻ với các bạn một mẹo hay dùng trong Python’s standard library. Toàn bộ code chạy trên Python 3.2+ mà không cần thêm bất kì package ngoài nào khác.
Vấn đề được đặt ra
Giả sử như bạn có tới hàng trăm ngàn URL cần phải được xử lí/download/xem xét, như vậy bạn sẽ cần thật nhiều HTTP GET call cũng như thu thập kết quả của mỗi lần gọi.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
import http.client import socket def get_it(url): try: # always set a timeout when you connect to an external server connection = http.client.HTTPSConnection(url, timeout=2) connection.request("GET", "/") response = connection.getresponse() return response.read() except socket.timeout: # in a real world scenario you would probably do stuff if the # socket goes into timeout pass urls = [ "www.google.com", "www.youtube.com", "www.wikipedia.org", "www.reddit.com", "www.httpbin.org" ] * 200 for url in urls: get_it(url) |
Như các bạn đã thấy, Python sẽ lặp (iterate) trên 1000 URL và gọi từng đường link.
Quá trình này sẽ chiếm 2% công suất của CPU và phần lớn thời gian sẽ dành cho việc chờ I/O:
1 2 3 4 |
$ time python io_bound_serial.py 20.67s user 5.37s system 855.03s real 24292kB mem |
Như vậy, một lần chạy trung bình mất tới 14 phút.
Mẹo để tối ưu hóa
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
from concurrent.futures import ThreadPoolExecutor as PoolExecutor import http.client import socket def get_it(url): try: # always set a timeout when you connect to an external server connection = http.client.HTTPSConnection(url, timeout=2) connection.request("GET", "/") response = connection.getresponse() return response.read() except socket.timeout: # in a real world scenario you would probably do stuff if the # socket goes into timeout pass urls = [ "www.google.com", "www.youtube.com", "www.wikipedia.org", "www.reddit.com", "www.httpbin.org" ] * 200 with PoolExecutor(max_workers=4) as executor: for _ in executor.map(get_it, urls): pass |
Hãy xem đã có gì thay đổi
1 2 3 4 5 6 7 8 9 10 11 12 |
# import a new API to create a thread pool from concurrent.futures import ThreadPoolExecutor as PoolExecutor # create a thread pool of 4 threads with PoolExecutor(max_workers=4) as executor: # distribute the 1000 URLs among 4 threads in the pool # _ is the body of each page that I'm ignoring right now for _ in executor.map(get_it, urls): pass |
Vậy là chỉ với 3 dòng code, chúng ta đã biến một serial task chậm chạp thành diễn ra đồng thời (concurrent ), với chỉ 5 phút chạy:
1 2 3 4 |
$ time python io_bound_threads.py 21.40s user 6.10s system 294.07s real 31784kB mem |
Chờ đã, vẫn còn điều thú vị khác
Điều đặc biệt của API này là bạn có thể thay thế
1 2 3 |
from concurrent.futures import ThreadPoolExecutor as PoolExecutor |
thành
1 2 3 |
from concurrent.futures import ProcessPoolExecutor as PoolExecutor |
để cho Python sử dụng processes thay vì thread, khi đấy thời gian chạy sẽ là:
1 2 3 4 |
$ time python io_bound_processes.py 22.19s user 6.03s system 270.28s real 23324kB mem |
Tốc độ chạy được cải thiện 20 giây so với phiên bản xài thread.
Kết luận
Tôi nghĩ rằng ThreadPoolExecutor và ProcessPoolExecutor là những bổ sung siêu “ngầu” cho Python’s standard library. Dù bạn có thể đã làm hầu hết mọi thứ với threading, multiprocessing và FIFO queues nhưng API này thật sự tốt hơn rất nhiều.
Techtalk via Dev.to