12/08/2018, 13:45

RxSwift các khái niệm cơ bản P.1

Getting Started Trong bài viết này tôi sẽ nói về các khái niệm cơ bản của RxSwift Observables ( Sequences) Disposing Implicit Observable guarantees Creating your first Observable (aka observable sequence) Creating an Observable that performs work Observables (Sequences) Hiểu ...

Getting Started

Trong bài viết này tôi sẽ nói về các khái niệm cơ bản của RxSwift

  • Observables ( Sequences)
  • Disposing
  • Implicit Observable guarantees
  • Creating your first Observable (aka observable sequence)
  • Creating an Observable that performs work

Observables (Sequences)

Hiểu được sự tương đồng giữa observer pattern (Observable<Element> sequence) và sequences (SequenceType) là điều rất quan trọng khi sử dụng Rx.

Mọi Observable sequence về bản chất là một sequence. Ích lợi chính của việc sử dụng Observable thay vì SequenceType trong Swift đó là Observable có thể nhận được element không đồng bộ. Đây chính là điểm cốt lõi của RxSwift.

  • Observable(ObservableType) tương đương với SequenceType
  • ObservableType.subscribe method tương đương với SequenceType.generate method.
  • Observer (callback) cần được truyền tới ObservableType.subscribe method để nhận được các element trong sequence thay vì gọi next() trong generator được trả về.

Dưới đây là một sequence của các số nguyên:

--1--2--3--4--5--6--| // sequence kết thúc bình thường

Another sequence, with characters:

--a--b--a--a--a---d---X // sequence kết thúc có lỗi

Sequence có thể có giới hạn hoặc vô tận chẳng hạn như sequence của hành động nhấn button:

---tap-tap-------tap--->

Các sơ đồ sequence như trên được gọi là marble diagrams. Xem thêm về marble diagrams ở trang http://rxmarbles.com/ .

Chúng ta có thể thể hiện một sequence bằng mô tả sau:

next* (error | completed)?

Bằng mô tả này ta có thể hiểu sequence:

  • Các sequence có thể rỗng hoặc có nhiều element.
  • Khi có sự kiện error hoặc sự kiện completed ,sequence không thể tạo thêm element.

Sequences trong Rx được mô tả bằng push interface (hay còn gọi là callback).

enum Event<Element>  {
    case next(Element)      // element tiếp theo trong sequence
    case error(Swift.Error) // sequence có lỗi
    case completed          // sequence kết thúc thành công}

class Observable<Element> {
    func subscribe(_ observer: Observer<Element>) -> Disposable
}

protocol ObserverType {
    func on(_ event: Event<Element>)
}

Khi một sequence gửi sự kiện completed hoặc sự kiện error nguồn tài nguyên sử dụng để tính toán các element trong sequence sẽ được giải phóng.

Để huỷ việc tạo các element trong sequence và giải phóng tài nguyên ngay lập tức hãy gọi dispose trong subscription được trả về.

Nếu một sequence kết thúc trong một thời gian giới hạn thì việc không gọi dispose hoặc không sử dụng addDisposableTo(disposeBag) sẽ không gây ra rò rỉ tài nguyên ( resource leak) vĩnh viễn. Tuy nhiên, tài nguyên sẽ được sử dụng cho đến khi sequence kết thúc trong cả trường hợp kết thúc việc tạo element hoặc có lỗi trả về. Nếu một sequence không kết thúc, tài nguyễn sẽ được allocated vĩnh viễn trừ khi dispose được gọi bằng tay, gọi tự động trong disposeBag, takeUntil hoặc bằng một cách nào khác.

Sử dụng dispose bags hoặc takeUntil là cách chắc chắn để tài nguyên được giải phóng. Bạn nên sử dụng khi tạo element ngay cả với các sequence kết thúc trong khoảng thời gian giới hạn.

Để biết thêm lý do tại sao Swift.Error lại không generic, bạn có thể xem giải thích tại đây.

Disposing

Có một cách khác để kết thúc một observed sequence . Khi muốn giải phóng tài nguyên đã được allocated để tính toán element tiếp theo ta có thể gọi dispose với subscription.

Đây là ví dụ với interval operator.

let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
    .subscribe { event in
        print(event)
    }

NSThread.sleep(forTimeInterval: 2.0)

subscription.dispose()
In ra màn hình:
0
1
2
3
4
5

Chú ý rằng thường bạn sẽ không muốn gọi dispose bằng tay. Ở trên chỉ là ví dụ để tham khảo. Có những cách tốt hơn để dispose subscriptions. Ta có thể sử dung DisposeBag, takeUntil operator, hoặc một vài cơ chế khác.

Vậy đoạn code trên có thể in ra cái gì khác sau khi dispose được thực thi? Câu trả lời là: Tuỳ từng trường hợp.

  • Nếu scheduler là một scheduler nối tiếp (chẳng hạn MainScheduler) và dispose được gọi trong cùng một scheduler nối tiếp, câu trả lời là: Không.
  • Các trường hợp khác : Có.

Bạn có thể tìm hiểu thêm về scheduler ở đây.

Về cơ bản sẽ có 2 quá trình xảy ra song song:

  • Tạo ra element
  • disposing subscription

Câu hỏi “Có thể in cái gì khác sau đó” sẽ không còn ý nghĩa trong trường hợp các quá trình này diễn ra trong một scheduler khác.

Hãy lấy một ví dụ:

let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
            .observeOn(MainScheduler.instance)
            .subscribe { event in
                print(event)
            }

// ....

subscription.dispose() // được gọi từ main thread

Sau khi dispose trả về sẽ không có gì được in ra.

Cũng vậy, trong trường hợp:

let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
            .observeOn(serialScheduler)
            .subscribe { event in
                print(event)
            }

// ...

subscription.dispose() // thực thi trong cùng `serialScheduler`

Sau khi dispose trả về sẽ không có gì được in ra.

Dispose Bags

Dispose bags được sử dụng để có tác dụng giống như ARC trong RX.

Khi DisposeBag được deallocated, nó sẽ gọi dispose trong mỗi disposables được thêm vào.

Nó không có dispose method và do vậy nếu cần phải dispose thì chỉ cần tạo một bag mới.

 self.disposeBag = DisposeBag()

Take until

Một cách khác để tự động dispose subscription khi dealloc là sử dụng takeUntil operator.

sequence
    .takeUntil(self.rx.deallocated)
    .subscribe {
        print($0)
    }

Implicit Observable guarantees

Không quan trọng element được Observables tạo ra ở thread nào, nhưng nếu nó tạo ra một element và truyền tới observer observer.on(.next(nextElement)), Observables sẽ không thể gửi element tiếp theo cho đến khi observer.on method kết thúc việc thực thi.

Observables có thể gửi .completed hoặc .error trong trường hợp sự kiên .next chưa kết thúc. Xem xét ví dụ sau:

someObservable
  .subscribe { (e: Event<Element>) in
      print("Event processing started")
      // processing
      print("Event processing ended")
  }
sẽ luôn in ra:
Event processing started
Event processing ended
Event processing started
Event processing ended
Event processing started
Event processing ended
sẽ không bao giờ in ra:
Event processing started
Event processing started
Event processing ended
Event processing ended

Tự tạo Observable (hay còn gọi là observable sequence)

Observable có thể sinh ra element theo nhiều cách khác nhau. Một số có thể gây ra những tác động phụ, mốt số gây ảnh hưởng tới các process đang được thực hiện.

Nhưng nếu ta chỉ gọi một method mà kết quả trả về là một Observable sẽ không có sequence được sinh ra và sẽ không có những tác động phụ. Observable định nghĩa sequence được tạo ra như thế nào và những parameter nào được sử dụng để sinh ra element. Sequence bắt đầu sinh ra khi subscribe method được gọi.

Xét ví dụ sau:

func searchWikipedia(searchTerm: String) -> Observable<Results> {}
let searchForMe = searchWikipedia("me")

// không có request nào được thực hiện
let cancel = searchForMe
  // sequence được sinh ra
  .subscribe(onNext: { results in
      print(results)
  })

Có nhiều cách để ta có thể tự tạo Observable sequence. Cách dễ nhất đó là sử dụng function create . Ta sẽ tạo một hàm tạo một sequence, sequence này sẽ trả về một element với subscription. Hàm này được gọi là 'just'.

func myJust<E>(element: E) -> Observable<E> {
    return Observable.create { observer in
        observer.on(.next(element))
        observer.on(.completed)
        return Disposables.create()
    }
}

myJust(0)
    .subscribe(onNext: { n in
      print(n)
    })
Kết quả:
0

Vậy function create là gì ?

Nó là một convenience method cho phép dễ dàng thực hiện subscribe method sử dụng Swift closures.Giống như subscribe method , nó lấy một argument, observer , và trả về một disposable.

Sequence ở đây là một synchronous sequence (sequence đồng bộ) . Nó sẽ tạo ra các element và kết thúc trước khi subscribe trả về disposable đại diện cho subscription. Do đó, không quan trọng disposable nào trả về , quá trình sinh ra element không bị gián đoạn.

Khi sinh ra synchronous sequences, disposable trả về là singleton instance của NopDisposable. Hãy tạo một observable trả về element từ một array.

func myFrom<E>(sequence: [E]) -> Observable<E> {
    return Observable.create { observer in
        for element in sequence {
            observer.on(.next(element))
        }

        observer.on(.completed)
        return Disposables.create()
    }
}

let stringCounter = myFrom(["first", "second"])

print("Started ----")

// lần đầu tiên
stringCounter
    .subscribe(onNext: { n in
        print(n)
    })

print("----")

//  thực hiện lại
stringCounter
    .subscribe(onNext: { n in
        print(n)
    })

print("Ended ----")
Kết quả :
Started ----
first
second
----
first
second
Ended ----

Tạo Observable thực thi

Một ví dụ thú vị hơn, ta sẽ tạo interval operator được sử dụng trong ví dụ trước .

func myInterval(interval: NSTimeInterval) -> Observable<Int> {
    return Observable.create { observer in
        print("Subscribed")
        let queue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0)
        let timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue)

        var next = 0

        dispatch_source_set_timer(timer, 0, UInt64(interval * Double(NSEC_PER_SEC)), 0)
        let cancel = Disposables.create {
            print("Disposed")
            dispatch_source_cancel(timer)
        }
        dispatch_source_set_event_handler(timer, {
            if cancel.isDisposed {
                return
            }
            observer.on(.next(next))
            next += 1
        })
        dispatch_resume(timer)

        return cancel
    }
}
let counter = myInterval(0.1)

print("Started ----")

let subscription = counter
    .subscribe(onNext: { n in
       print(n)
    })

NSThread.sleep(forTimeInterval: 0.5)

subscription.dispose()

print("Ended ----")

Kết quả:

Started ----
Subscribed
0
1
2
3
4
Disposed
Ended ----

Nếu ta viết

let counter = myInterval(0.1)

print("Started ----")

let subscription1 = counter
    .subscribe(onNext: { n in
       print("First (n)")
    })
let subscription2 = counter
    .subscribe(onNext: { n in
       print("Second (n)")
    })

NSThread.sleep(forTimeInterval: 0.5)

subscription1.dispose()

NSThread.sleep(forTimeInterval: 0.5)

subscription2.dispose()

print("Ended ----")

Kết quả là :

Started ----
Subscribed
Subscribed
First 0
Second 0
First 1
Second 1
First 2
Second 2
First 3
Second 3
First 4
Second 4
Disposed
Second 5
Second 6
Second 7
Second 8
Second 9
Disposed
Ended ----

Mỗi subscriber trên subscription thường sinh ra sequence của các element riêng của nó. Operators mặc định là stateless. Có nhiều stateless operator hơn stateful hơn stateful operator.

Trong phần sau tôi sẽ tiếp tục giới thiệu về các phần tử cơ bản còn lại. Hẹn gặp lại các bạn.

Nguồn : RxSwift

0