Sử dụng Backpressure trong RxJava2
Giới thiệu Chào mọi người,ở bài viết này mình muốn trình bày về Backpressure khi bạn làm việc với RxJava2. Bài viết này mình muốn hướng đến đối tượng những người mới học và làm việc với RxAndroid như mình. Trong RxJava không phải là hiếm gặp khi bạn gặp phải tình huống mà một Observable ...
Giới thiệu
Chào mọi người,ở bài viết này mình muốn trình bày về Backpressure khi bạn làm việc với RxJava2.
Bài viết này mình muốn hướng đến đối tượng những người mới học và làm việc với RxAndroid như mình.
Trong RxJava không phải là hiếm gặp khi bạn gặp phải tình huống mà một Observable phát ra các mục nhanh hơn operator hoặc subscriber dùng đến chúng. Dẫn đến có một vấn đề bạn cần quan tâm đó là phải làm gì với những item được phát ra từ Observable. Hay nói cách khác bạn cần xử lý việc tồn đọng của Observable khi mà việc phát ra các mục không cần thiết (operator hoặc subscriber không cần đến chúng)
Có một vài khái niệm mới có trong Rx2 như Flowable,Completable, Single và Maybe chúng đều có điểm chung đều là các Observer. Sự khác biệt chính giữa chúng là xử lý back-pressure. Về cơ bản, Flowable là một Observer xử lý back-pressure. Còn các observer như Completable, Single và Maybe không xử lý back-pressure. Backpressure có thể hiểu là một cơ chế thông báo ngược trở lại cho Observable số lượng data mà Subcriber có thể xử lý.
Nói vậy nghĩa là sao? Tức là khi bạn sử dụng Flowable một observer xử lý back-pressure thì bạn cần lựa chọn một BackpressureStrategy thích hợp, tối ưu nhất.
Flowable.create(subscriber -> { int count = 0; while (true) { count++; subscriber.onNext(count + " "); } }, BackpressureStrategy.DROP) .observeOn(Schedulers.newThread(), false, 2) .subscribe( val -> { Thread.sleep(1000); System.out.println(val); }, err -> { System.out.println("Lỗi rồi!"); err.printStackTrace(); }, () -> System.out.println("Hoàn thành!!") );
Ở đoạn code khi khởi tạo 1 Flowable các bạn có thể để ý đến tham số thứ 2 BackpressureStrategy. Cụ thể ở đây là BackpressureStrategy.DROP. Có 5 loại BackpressureStrategy bao gồm:
- BackpressureStrategy.BUFFER: Trường hợp này sử dụng khi bạn không muốn bỏ mất bất kì 1 item nào được phát ra và gửi chúng đến subcriber. Hay nói các khác sẽ buffer lại tất cả các giá trị onNext cho đến khi downstream sử dụng nó. BackpressureStrategy sẽ dễ dẫn đến OOM(OutOfMemoryError) nếu bạn không cung cấp một ngưỡng nhất định cho Flowable.
- BackpressureStrategy.DROP: Trong trường hợp subcriber của bạn không theo kịp luồng phát ra thì sẽ drop giá trị onNext gần nhất.
- BackpressureStrategy.LATEST: Chỉ giữ sự kiện gần nhất và loại bỏ các sự kiện chưa từng có trước đây. Giữ lại duy nhất giá trị onNext mới nhất, overwrite bất kỳ giá trị nào trước đó nếu downstream không thể theo kịp.
- BackpressureStrategy.ERROR: Phát ra thông báo một lỗi MissingBackpressureException trong trường hợp downstream không thể theo kịp.
- BackpressureStrategy.MISSING: Event onNext được viết mà không có bất kỳ một buffer hay drop nào. Hay nói các khác khi bạn không quan tâm đến backpressure. Và giao lại cho downstream xử lý trên các hàm onBackpressureXXX.
Ngoài ra chúng ta còn có các operator đễ hỗ trợ việc xử lý backpressure
- onBackpressureBuffer() tương tự như strategy buffer việc gọi operator này sẽ buffer lại tất cả các giá trị onNext cho đến khi downstream sử dụng nó
- onBackpressureDrop() tương tự như strategy drop
- onBackpressureLatest() tương tự như strategy lastest
Như vậy là mình đã trình bày xong bài viết về sử dụng Backpressure trong RxJava2. Hi vọng qua bài viết các bạn có thể hiểu về Backpressure. Sử dụng Backpressure trong việc xử lý luồng giữa Observable và Operator hoặc Subscriber một cách hiệu quả. Tránh việc sử dụng tài nguyên không hợp lý gây lên những lỗi không đáng có.
Nguồn: https://github.com/ReactiveX/RxJava/wiki/Backpressure, https://eng.uber.com/rxjava-backpressure/. Nếu có gì sai sót rất mong nhận được sự đóng góp từ phía các bạn.