+5

[Reactive Functional Programing] Các chủ thể trong Reactive Programing

Các chủ thể trong Reactive Programing

Overview

Khi mới bắt đầu vào học Angular 2, tôi đã tìm kiếm khá nhiều những tài liệu trên internet, không may là đa số các bài viết đều đi ở mức hướng dẫn sử dụng (cook-book) thay vì giải thích nguyên lý hoạt động của framework này. 1 mảng rất quan trọng trong lập trình front-end là việc xử lý bất đồng bộ, trong angular 2, công việc này được xây dựng dựa trên bộ thư viện "Rxjs".Đối với những ai đã từng sử dụng Reactjs thì bên Reactjs có 1 tool cũng có phần nào tương đồng với Rxjs là Redux (Store của Redux cũng sử dụng Observer DP). Thông qua series này mình muốn mọi người sâu hơn đến base của Angular 2 nên những bài này không phải tutorial mà đòi hỏi bạn có sẵn 1 nền tảng với các framework front-end rồi. Việc hiểu được bản chất của 1 framework thì luôn luôn là cần thiết để code ngon hơn rồi. Ở 2 bài lần trước, chúng ta đã lần lượt đi qua khái niệm tổng quát và cách giải quyết bài toán bằng Reactive Functional Programing thông qua 1 chương trình đơn giản viết bằng Rxjs. Qua đó, bạn đã có được những khái niệm chung nhất về công việc chúng ta đang làm, giờ là lúc đi sâu vào tính chất của các chủ thể trong mô hình Reactive Programing hơn 1 chút, để có thể hiểu được cách hoạt động cũng như concept chính của nó.

Observable / Observer

Xử lý bất đồng bộ (ASYNC) trong Rxjs dựa trên push-based system (nghĩa là data producer sẽ push data cho data-consumer - chi tiết xin đọc lại bài trước).

Chủ thể data - producer ở đây được trừu tượng hóa bằng đối tượng Observable. Observable đại diện cho 1 data - producer, nó có thể là bất cứ thứ gì phát sinh ra data: http request, DOM event, … Observable có thể được subscribe bởi Observer, tức là nó có thể push data tới cho các Observer này tiêu thụ.

Chủ thể data - consumer chính là đối tượng Observer, data sau khi được Observable đẩy vào Observer sẽ được đối tượng này sử dụng.

Để Observer có thể bắt đầu nhận data từ Observable, ta dùng hàm subscribe(). Hàm subscribe() trả về 1 đối tượng Subscription (chủ thể này sẽ nói rõ hơn ở phần sau). Subscription cung cấp các hàm để điều khiển quá trình truyền - nhận data giữa 2 đối tượng Observable và Observer. VD như hàm Subscription.unsubscribe() sẽ loại bỏ liên kết truyền nhận data. Mặc định khi sử dụng hàm Observable.subscribe() yêu cầu 3 tham số tương ứng với 3 hành vi của Observer : onNext(v), onCompleted() và onError(e). Các hành vi này của Observer chính là các hàm callback() sẽ được gọi khi Observable push các notification (có thể là data, hoặc lỗi, hoặc thông báo hoàn thành) cho Observer. Nếu không cung cấp đủ 3 callback(), quá trình subscribe vẫn diễn ra bình thường nhưng các hành vi không khai báo sẽ bị bỏ qua.

Sau đây là ví dụ đơn giản về mô hình Subscrition = Observable.subscribe(observer)

// Tạo ra 1 Observable gồm số từ 1 - 5
var source = Rx.Observable.range(1, 5);

// In ra các giá trị
var subscription = source.subscribe(
    x => console.log('onNext: ' + x),
    e => console.log('onError: ' + e.message),
    () => console.log('onCompleted'));

// Kết quả
// => onNext: 1
// => onNext: 2
// => onNext: 3
// => onNext: 4
// => onNext: 5
// => onCompleted

Observable có tính "lazy" - Điều đó có nghĩa là bản thân Observable không tự thân phát sinh giá trị (trừ trường hợp nó là ConnectableObservable) chỉ khi subscribe() Observable mới thực hiện push data sang cho Observer. Nếu bạn đã từng xử lý event trong javascript sẽ thấy ở đây có gì đó tương đồng giữa Observable - EventEmitter và Observer - EventListener. Nhưng không, Observable không phải là EventEmitter, tự bản thân Observable không sinh ra giá trị và Observer cũng không phải là 1 Listener.

Observable.subscribe(observer) 

không tương đương với

eventEmitter.addEventListener(event, handleFunction());

Thay vào đó mỗi lần gọi hàm subscribe() sẽ kích hoạt những side effect riêng biệt không chia sẻ với nhau.

Cùng nhìn vào ví dụ sau: Dùng hàm Observable.create() để tạo ra 1 Observer custom theo ý người lập trình.

var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next('Xin chào!!!')
  }, 1000);
});

Điều gì sẽ xảy ra khi bạn gọi observable.subscribe() ?

observable.subscribe(x => console.log(x));

Mỗi khi gọi hàm Observable.subscribe(observer) , function tham số của hàm Observable.create() sẽ được triệu gọi 1 cách riêng biệt với nhau. Ở trường hợp này là hàm subscribe() nằm trong Observable.create(function subscribe(observer) {...}). Điều này lý giải vì sao các Observer không chia sẻ cùng 1 data source. Đối với ConnectableObservable cách hoạt động sẽ hơi khác 1 chút, ConnectableObservable là kết quả trả về của hàm Observable.multicast() hay Observable.connect()... ConnectableObservable chia sẻ cùng 1 subscription với Observable nguồn, tức là datasource có gì ConnectableObservable có cái đó. ConnectableObservable đóng vai trò như 1 lớp trung gian giữa Observable nguồn và các Observers, các Observers lúc này sẽ subscribe ConnectableObservable thay cho Observable nguồn, và cái hay ở đây là các Observers share chung nhau 1 tiến trình của data source.

let obs = Rx.Observable
            .create(observer => observer.next(Date.now()))
            .publish();

obs.subscribe(v => console.log("1st subscriber: " + v));
obs.subscribe(v => console.log("2nd subscriber: " + v));

obs.connect();

Kết quả như bạn đã đoán được trước, 2 Observers lúc này đều in ra những giá trị giống nhau

Subscription

Subscription là 1 đối tượng đại diện cho sự thực thi truyền - nhận data của Observable và Observer. Đối tượng này là kết quả trả về của Observable.subscribe(observer). Subscription chỉ có 1 hàm quan trọng là Subscription.unsubscribe() . Hàm này chấm dứt hoạt đông của Observable.

Ví dụ sau sử dụng operator Subscription.add() để đồng thời chấm dứt hoạt động của 2 Observable. Mặc định khi gọi unsubscribe() chỉ có 1 Observable đang gắn với Subscription được ngắt.

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
  // Unsubscribes cả subscription and childSubscription
  subscription.unsubscribe();
}, 1000);

Subject

Subject là 1 khái niệm khó hiểu nhưng rất quan trọng trong Rxjs cũng như Angular 2, Subject mang lại nhiều practice rất hữu dụng nên hãy hiểu nó thật kỹ. Subject cũng là tiền đề của Hot và Cold Observable

Subject là 1 chủ thể mà mang đầy đủ tính chất của cả Observable và Observer. Như đã nói ở trên thì mỗi khi Observer subscribe Observable thì Observable sẽ thực thi riêng rẽ giữa các instance Subscription, mỗi Observable push vào 1 Observer. Subject ưu việt hơn Observable ở điểm nó đồng thời push data đến nhiều Observer. Có thể thấy Subject giống với EventEmitter. Khi observer subscribe 1 Subject, đơn giản là ta đã add thêm 1 listener vào Subject. Đứng từ phía Observer không có cách nào để biết cái Observer đang subscribe là Observable hay Subject.

Ngoài ra, bởi vì Subject mang tính chất của Observer nên nó cũng có các hành vi onNext(v), onError(e) và onComplete(). Ví dụ sau, ta push data vào Observer thông qua hàm Subject.next()

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);
//Kết quả
//observerA: 1
//observerB: 1
//observerA: 2
//observerB: 2

2 observers trong ví dụ này đã share chung 1 datasource. Ví dụ sau cho thấy Subject mang tính chất của 1 observer, hay nói cách khác nó có thể subscribe() Observable. Thông qua tính chất này, ta đã biến 1 Observable có thể push data đến nhiều Observer sử dụng Subject như 1 proxy.

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // Pass Subject vào làm tham số như 1 observer

//observerA: 1
//observerB: 1
//observerA: 2
//observerB: 2
//observerA: 3
//observerB: 3

Trong Rxjs, có 4 loại Subject, mỗi lại có tính chất khác nhau và được sử dụng trong những practice khác nhau.

AsyncSubject : Chỉ push giá trị cuối cùng được emitted bởi Observable nguồn đến tất cả các observer đang subscribe() nó, và chỉ sau khi Observable complete().

Behavior Subject: Khi observer subscribe 1 Behavior Subject thì ngay lập tức Behavior Subject push giá trị gần nhất nó nhận được từ Observable source (hoặc giá trị khởi tạo Behavior Subject) và sau đó tiếp tục push data như bình thường (nhận được cái gì push sang cái đó)

Publish Subject (hay Subject) : Đây là loại thông thường, nó push mọi data nó nhận được source sang cho Observer theo thời gian thực. Khi sử dụng Publish Subject sẽ có khả năng, data bị thất lạc trong khoảng thời gian từ khi Subject được tạo ra (Observable bắt đầu sản sinh giá trị) tới lúc Subject được subscribe. Bạn sẽ cần lưu tâm tới điều này khi sử dụng loại Subject này.

Replay Subject: 1 trong số những cách ngăn chặn bad practice bị mất data khi sử dụng Publish Subject đó là sử dụng Replay Subject bởi vì nó sẽ push toàn bộ giá trị mình đã nhận được trong suốt vòng đời của mình vào Observer

Conclusion

Vậy là tôi đã liệt kê ra 4 loại chủ thể quan trọng nhất trong Reactive Programing (bởi vì bài này không có Functional). Đây cũng là những kiến thức nền tảng để xử lý ASYNC theo 1 cách hoàn toàn khác những gì bạn đã làm từ trước đến nay. Bài viết khá nặng về lý thuyết nhưng hãy cố gắng đọc và làm theo các ví dụ trong bài để hiểu ngọn ngành vấn đề.

Tương tác real-time trong Angular 2 cần dùng đến rất nhiều kiến thức về xử lý bất đồng bộ, nếu bạn chỉ cần học cách sử dụng Angular 2 thì có thể bỏ qua series này nhưng nếu muốn xây dựng 1 SPA đúng nghĩa thì hãy nâng cao kiến thức về Reactive Programing của mình vì đây là nền tảng của mọi front-end framework.

Trong bài sau, chúng ta sẽ cùng thực hành các kiến thức của bài này thông qua việc tối ưu http request trong angular 2.

References

http://reactivex.io/documentation/ https://github.com/ReactiveX/rxjs https://xgrommx.github.io/rx-book/index.html


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí