12/08/2018, 16:13

Rxjs Và Reactive Programming

Hẳn các bạn vẫn còn nhớ trong một số bài trước chúng ta có nói về Observable trong ứng dụng Angular, vậy Observable là gì, nó có quan hệ gì với Angular, làm thế nào để sử dụng Observable hiệu quả trong ứng dụng của bạn. Trong bài này chúng ta sẽ cùng tìm hiểu về Observable , về Rxjs , ...

Hẳn các bạn vẫn còn nhớ trong một số bài trước chúng ta có nói về Observable trong ứng dụng Angular, vậy Observable là gì, nó có quan hệ gì với Angular, làm thế nào để sử dụng Observable hiệu quả trong ứng dụng của bạn. Trong bài này chúng ta sẽ cùng tìm hiểu về Observable, về Rxjs, Reactive Programming.

Angular đi kèm với một dependency là Rxjs giúp cho nó trở nên reactive, một ứng dụng Angular là một reactive system. Dễ thấy nhất ở đây chính là EventEmitter, hay Reactive Forms mà chúng ta đã tìm hiểu trong các bài học trước.

Vậy Reactive Programming (RP) là gì? Điều gì khiến nó trở thành một chủ đề hot như vậy...

Hiện tại, có cả tá định nghĩa về RP, nhưng mình thấy định nghĩa sau đây là bao quát tốt vấn đề:

Reactive programming is programming with asynchronous data streams

Vâng đúng vậy, đây là phương pháp lập trình xoay quanh data streams và nó deal với các vấn đề của asynchronous. Nhưng bạn đừng hiểu lầm, nó có thể deal với cả synchronous nữa.

Bạn có thể tưởng tượng data streams như hình sau, với data được gửi đến trong suốt dòng thời gian của một stream (over time), giống như một array có các phần tử được gửi đến lần lượt theo thời gian.

data streams source: atom.io

Và chúng ta có thể coi mọi thứ là stream: single value, array, event, etc.

everything-is-a-stream

Không những thế, khi thao tác với stream, chúng ta có thể có value, error, hay complete signals. Đây là điều mà các API trước đây của các hệ thống event trong Javascript còn thiếu, chúng có qua nhiều interface khác nhau cho các loại event khác nhau, Observable sinh ra để tổng quát hóa các interface đó lại.

figure-stream

Và Rxjs giúp chúng ta có được reactive trong lập trình ứng dụng Javascript:

Rxjs is a library for composing asynchronous and event-based programs by using observable sequences.

Think of Rxjs as Lodash (ultility for array/object) for events/streams.

ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.

Các Concepts nền tảng của Rxjs bao gồm:

An Observable is a collection that arrives over time

  • Observable: đại diện cho khái niệm về một tập hợp các giá trị hoặc các sự kiện trong tương lai. Khi các giá trị hoặc sự kiện phát sinh trong tương lai, Observable sẽ điều phối nó đến Observer.
  • Observer: là một tập hợp các callbacks tương ứng cho việc lắng nghe các giá trị (next, error, hay complete) được gửi đến bởi Observable.
  • Subscription: là kết quả có được sau khi thực hiện một Observable, nó thường dùng cho việc hủy việc tiếp tục xử lý.
  • Operators: là các pure functions cho phép lập trình functional với Observable.
  • Subject: để thực hiện việc gửi dữ liệu đến nhiều Observers (multicasting).
  • Schedulers: một scheduler sẽ điều khiển khi nào một subscription bắt đầu thực thi, và khi nào sẽ gửi tín hiệu đi. (Trong bài này chúng ta sẽ không nói về phần này).

Trước khi bắt đầu với Observable, chúng ta sẽ ôn lại một số kiến thức về Array sẽ giúp ích trong việc tiếp cận Observable.

2.1 Array forEach

Array forEach là một trong các cách để ta có thể lặp qua lần lượt từng phần tử trong mảng.

const arr = [1, 2, 3, 4, 5];

arr.forEach((item, index) => {
  console.log(index + ' => ' + item);
});

Kết quả nhận được của chúng ta như sau:

"0 => 1"
"1 => 2"
"2 => 3"
"3 => 4"
"4 => 5"

Ngoại trừ một điểm chúng ta cần lưu ý khi các phần tử là kiểu reference thay vì kiểu primitive, thì forEach có thể khiến các phần tử của array ban đầu thay đổi giá trị.

const ref = [
  {
    value: 1
  }, {
    value: 2
  }, {
    value: 3
  }, {
    value: 4
  }, {
    value: 5
  }
];

ref.forEach((item, index) => {
  item.value++;
});

ref.forEach((item, index) => {
  console.log(index + ' => ' + item.value);
});

2.2 Array map

Array map cho phép chúng ta lặp qua tất cả các phần tử trong mảng, áp dụng một function nào đó lên các phần tử để biến đổi, sau đó trả về một mảng các giá trị sau khi thực hiện function.

const arr = [1, 2, 3, 4, 5];

const amp = arr.map((item, index) => {
  return item + 5 + index;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[6, 8, 10, 12, 14]

2.3 Array filter

Array filter cho phép chúng ta lặp qua tất cả các phần tử trong mảng, áp dụng một function nào đó lên các phần tử để kiểm tra, sau đó trả về một mảng các giá trị sau khi thực hiện hàm kiểm tra mà thỏa mãn điều kiện (return true) và giữ nguyên mảng cũ không bị ảnh hưởng.

const arr = [1, 2, 3, 4, 5];

const amp = arr.filter((item, index) => {
  return (item + index) % 3 == 0;
});

console.log(arr, amp);

// result

[1, 2, 3, 4, 5]
[2, 5]

2.4 Array reduce

Method reduce cho phép chúng ta lặp qua tất cả các phần tử và áp dụng một function nào đó vào mỗi phần tử, function này có các tham số:

  • accumulator: giá trị trả về từ các lần call callback trước.
  • currentValue: giá trị của phần tử hiện tại trong array.
  • currentIndex: index của phần tử hiện tại.
  • array: chính là mảng hiện tại.

Ngoài ra, chúng ta còn có thể cung cấp giá trị ban đầu initialValue sau tham số function đầu tiên.

const arr = [1, 2, 3, 4, 5];

const val = arr.reduce((acc, current) => acc * current, 1);

console.log(val);

// result
120

2.5 Flatten Array

Trong nhiều tình huống, chúng ta có các array, bên trong mỗi phần tử có thể là các array khác, lúc này chúng ta có nhiệm vụ làm giảm số chiều (flatten) đi chẳng hạn, chúng ta có thể có đoạn code xử lý sau trong Javascript.

Array.prototype.concatAll = function() {
  return [].concat.apply([], this);
};

const arr = [1, [2, 3], [4, 8, 0], [5]];

const flatten = arr.concatAll();

console.log(arr, flatten);

// result
[1, [2, 3], [4, 8, 0], [5]]
[1, 2, 3, 4, 8, 0, 5]

Như ở ví dụ trên, chúng ta flat mảng con 2 chiều thành 1 chiều, và chúng ta có thể flat nhiều lần để mỗi lần sẽ giảm đi 1 chiều.

Điều này các bạn sẽ hay gặp khi làm việc với Observable trả về Observable trong các phần tiếp theo.

Pull and Push are two different protocols how a data Producer can communicate with a data Consumer.

OK, chúng ta lại có một số khái niệm mới:

Producer: là nguồn sản sinh ra data.

Consumer: là nơi chế biến data sản sinh từ Producer.

Pull systems: Consumer sẽ quyết định khi nào lấy data từ Producer. Producer không quan tâm khi nào data sẽ được gửi đến cho Consumer.

Các function trong Javascript là một Pull system. Khi nào lời gọi hàm thì khi đó mới xử lý. Gọi n lần thì xử lý n lần.

Lưu ý: function chỉ trả về 1 giá trị sau khi lời gọi hàm được thực hiện. (một mảng cũng chỉ coi là 1 giá trị, vì nó được trả về 1 lần).

Push systems: Producer sẽ quyết định khi nào gửi dữ liệu cho Consumer. Consumer không quan tâm khi nào nhận được data.

Promise, DOM events là các Push systems. Chúng ta register các callbacks và khi event phát sinh, các callbacks sẽ được gọi với dữ liệu từ Producer truyền vào.

Chúng ta có một bảng so sánh như sau:

  Producer Consumer
Pull Passive: produces data when requested. Active: decides when data is requested.
Push Active: produces data at its own pace. Passive: reacts to received data.

Ví dụ:

Pull

const arr = [1, 2, 3, 4];
const iter = arr[Symbol.iterator]();

iter.next();

> {value: 1, done: false}

iter.next();

> {value: 2, done: false}

iter.next();

> {value: 3, done: false}

iter.next();

> {value: 4, done: false}

iter.next();

> {value: undefined, done: true}

Push

const button = document.querySelector('button');
button.addEventListener('click', () => console.log('Clicked!'));

Lưu ý, để code theo các ví dụ trong bài học này, bạn có thể clone bin sau:

Rxjs starter - jsbin

Vậy Observable là gì?

Observable chỉ là một function (class) mà nó có một số yêu cầu đặc biệt. Nó nhận đầu vào là một Function, mà Function này nhận đầu vào là một Observer và trả về một function để có thể thực hiện việc cancel quá trình xử lý. Thông thường (Rxjs 5) chúng ta đặt tên function đó là unsubscribe.

Observer: một object có chứa các phương thức next, error và complete để xử lý dữ liệu tương ứng với các signals được gửi từ Observable.

Observables are functions that tie an observer to a producer. That’s it. They don’t necessarily set up the producer, they just set up an observer to listen to the producer, and generally return a teardown mechanism to remove that listener. The act of subscription is the act of “calling” the observable like a function, and passing it an observer.

Nói theo cách lý thuyết hơn:

Observables are lazy Push collections of multiple values.

Như vậy, chúng ta có thể thấy Observable là lazy computation, giống như function, nếu chúng ta tạo chúng ra mà không gọi, thì không có gì thực thi cả.

Tạm thời bỏ qua các chi tiết về hàm create dưới đây, chúng ta sẽ gặp lại nó khi tìm hiểu về Operator:

function getDetail() {
  console.log('tiepphan.com');
  return 100;
}

const observable = Rx.Observable.create(function (observer) {
  console.log('Rxjs và Reactive Programming');
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

Đến đây, nếu chúng ta không gọi hàm getDetail, hoặc không invoke đến observable thì chẳng có gì xảy ra cả.

Để thực thi, chúng ta sẽ làm như sau:

const ret = getDetail();
console.log(ret);

// and

console.log('before subscribe');
observable.subscribe({
  next: val => console.log('next: ' + val),
  error: err => console.error('error: ' + err),
  complete: () => console.log('done'),
});
console.log('after subscribe');

Và sau đây là kết quả chúng ta nhận được:

"tiepphan.com"
100
"before subscribe"
"Rxjs và Reactive Programming"
"next: 1"
"next: 2"
"next: 3"
"after subscribe"
"next: 4"
"done"

Observable có thể deal với cả sync và async.

Observables are able to deliver values either synchronously or asynchronously.

Với Observable chúng ta sẽ quan tâm đến các thao tác như sau:

  • Creating Observables
  • Subscribing to Observables
  • Executing the Observable
  • Disposing Observables

5.1 Creating Observables

Rx.Observable.create là một operator, nó chỉ là một alias cho Observable constructor, chúng ta hoàn toàn có thể thay thế tương ứng bằng cách gọi constructor cũng cho kết quả tương tự.

Đầu vào của constructor yêu cầu một hàm gọi là subscribe mà hàm này có đầu vào là một observer object.

const observable = new Rx.Observable(function subscribe(observer) {
  const id = setInterval(() => {
    observer.next('Hello Rxjs');
  }, 1000);
});

// same

/*
const observable = Rx.Observable.create(function subscribe(observer) {
  const id = setInterval(() => {
    observer.next('Hello Rxjs');
  }, 1000);
});

*/

Bạn hoàn toàn có thể sử dụng new hoặc Rx.Observable.create.

Ngoài operator như create, Rxjs mang đến cho bạn nhiều lựa chọn khác nhau để tạo mới một Observable như các operators: of, from, interval, etc. Chúng được đặt trong nhóm creation operators.

Ví dụ, bạn muốn tạo Observable cho một mảng các giá trị, lúc này bạn không cần dùng Rx.Observable.create rồi lặp qua các phần tử, xong gọi next nữa. Rxjs có cách dùng khác, vì đây là một usecase rất hay dùng và create là một low-level API.

const arr = [1, 2, 3, 4];

const fromArrayObservable = Rx.Observable.from(arr);

5.2 Subscribing to Observables

Sau khi đã tạo xong một Observable, chúng ta cần invoke bằng cách subscribe vào như sau:

observable.subscribe(val => console.log(val));

Subscribing to an Observable is like calling a function, providing callbacks where the data will be delivered to.

Vậy nên chúng ta call một function n lần, chúng ta sẽ có n lần thực thi. Tương tự như thế, khi chúng ta subscribe vào một Observable m lần, thì có m lần thực thi, một lời gọi subscribe giống như một cách để Observable bắt đầu thực thi.

5.3 Executing Observables

Phần code khi chúng ta khởi tạo Observable Rx.Observable.create(function subscribe(observer) {...}) chính là "Observable execution". Giống như khai báo một function, phần code này để thực hiện một số hành động, xử lý nào đó; chúng là lazy computation - chỉ thực thi khi Observer thực hiện subscribe.

Có ba kiểu giá trị mà một Observable Execution có thể gửi đi:

  • "Next" notification: gửi đi một giá trị, có thể là bất kỳ kiểu dữ liệu nào như Number, a String, an Object, etc.
  • "Error" notification: gửi đi một JavaScript Error hoặc exception.
  • "Complete" notification: không gửi đi một giá trị nào, nhưng nó gửi đi một tín hiệu để báo rằng stream này đã completed, mục đích để Observer có thể thực hiện một hành động nào đó khi stream completed.

Next notifications thường được sử dụng rộng rãi, nó cực kỳ quan trọng, vì nó gửi đi dữ liệu cần thiết cho một Observer.

Error và Complete notifications có thể chỉ xảy ra duy nhất một lần trong một Observable Execution. Lưu ý rằng, chỉ có 1 trong 2 loại tín hiệu trên được gửi đi, nếu đã complete thì không có error, nếu có error thì không có complete. (Chúng không thuộc về nhau             </div>
            
            <div class=

0