+6

RFP with Rxjs - Hot vs Cold Observable

Intro

Chào các bạn, hôm nay chúng ta lại gặp lại nhau trong phần thứ 4 của series về lập trình FRP bằng Rxjs. Ở các bài lần trước thì tôi đã cung cấp cho các bạn những khái niệm căn bản nhất về FRP cũng như cách sử dụng Rxjs. Mặc dù vậy, thực tế việc ứng dụng FRP trong code dự án, hay cụ thể là Reactive Extension vẫn còn khá hạn chế, nó mang hơi hướm trendy nhiều hơn. Đa số các framework lớn nhất hiện giờ vẫn đang sử dụng OOP là chính, nhưng họ cũng khuyến khích code FRP ở 1 số những module và tình huống nhất định. Hôm nay tôi sẽ phân tích 1 trong những case thường gặp nhất trong Angular 2 mà fw này đã sử dụng Rxjs để làm việc. Qua việc làm rõ vấn đề này thì bạn cũng sẽ hiểu thế nào là Hot Observable và Cold Observable. Bài viết này cũng đòi hỏi bạn đã có những khái niệm về Observable, Observer, Subject nên nếu bạn chưa biết nó là gì thì hãy đọc lại những bài trước đã nhé.

Khái niệm

Ngắn gọn là Cold Observable được khởi tạo khi nó được subscribe. Còn Hot Observable thì được khởi tạo không phụ thuộc vào việc nó được subscribe. Để cho dễ hình dung thì những mouse event như click là Hot Observable, những sự kiện click chuột luôn xảy ra bất chấp việc bạn có subscribe hay không. Cold Observable thì trừu tượng hơn 1 chút ví dụ khi bạn khởi tạo 1 websocket bên trong hàm khởi tạo của Observable. Như thế này:

const source = Observable.create((observer) => {
  const socket = new WebSocket('ws://someurl');
  socket.addEventListener('message', (e) => observer.next(e));
  return () => socket.close();
});

Bây giờ chỉ khi nào bạn dùng 1 Observer subscribe nó thì function khởi tạo mới được chạy, nếu không subscribe cũng có nghĩa là không có socket nào được tạo ra.

Tại sao cần đến Hot observable

Vẫn với ví dụ về websocket, bạn sẽ không muốn mỗi lần add 1 observer vào thì lại tạo ra 1 socket mới phải không? Đúng vậy, đối với Cold Observable thì mỗi lần bạn subsribe nó bằng 1 Observer thì hàm khởi tạo lại được chạy và lại phát sinh ra 1 instance mới của Observable và cái mà bạn lắng nghe sự kiện là từ 2 socket khác nhau chứ không phải 1.

Multicasted Observables VS Unicasted Observable

Trong bài viết Các chủ thể trong Reactive Extension thì bạn đã biết thế nào là Observable và cách nó emit sự kiện lên Observer hoặc Subject. Thì Multicasted Observable cũng không có gì khác biệt ngoại trừ thay vì nó emit sự kiện lên Observer thì nó sẽ emit đến 1 Subject.

Điều này giúp cho các Observer đang subscribe Subject này nhận được cùng 1 giá trị mỗi lần Observable emit sự kiện.

Xem ví dụ sau:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // Kết quả hàm khởi tạo chỉ chạy 1 lần nhưng được in ra bởi 2 Observer, subject lúc này mang tính chất của 1 Multicasted Observable

Và Hot observable là 1 loại Multicasted Observable.

Vận dụng những kết thức đã biết ta hoàn toàn có thể viết ra 1 function để biến bất cứ Cold observable nào trở thành Hot Observable thông qua 1 multicasted Observable

function makeHot(cold) {
  const subject = new Subject();
  cold.subscribe(subject);
  return Observable.create((observer) => subject.subscribe(observer));
}

Hàm này trả về 1 Observable,mỗi khi có 1 Observer subsrcibe Observable diễn biến như sau:

  • Hàm khởi tạo được chạy
  • Dùng Observer subscribe Subject được tạo ra bên trong hàm
  • Subject này subscribe Cold Observable chính là đối số cold đầu vào của makeHot(cold)
  • Kết quả có được là luôn luôn các Observer mới chỉ subscribe Subject thôi, và bởi vì Subject có tính chất multicast nên nó sẽ không tạo ra instance mới mỗi lần được subscribe.

Convert Cold to Hot Observable in RXjs

Trong rxjs tất nhiên ta sẽ không cần viết 1 function makeHot(cold) như trên, thư viên này đã cung cấp sẵn các operator còn mạnh mẽ hơn thông qua các từ khoá publish(), connect(), share(),... Chúng ta điểm qua các từ khoá quan trọng nhất.

  • publish() biến 1 Cold Observable trở thành Hot Observable.
  • connect() khi được gọi sẽ thực thi hàm khởi tạo Observable.
let obs = Rx.Observable
            .create(observer => observer.next(Date.now()))
            .publish();

obs.subscribe(v => console.log("1st subscriber: " + v));
obs.subscribe(v => console.log("2nd subscriber: " + v));

obs.connect();
// Kết quả in ra màn hình là cùng 1 giá trị v ở cả 2 subscriber

Hơi khác 1 chút với hàm connect(), hàm refCount() khiến cho hàm khởi tạo Observable được gọi khi có 1 observer bắt đầu subscribe nó.

let obs = Rx.Observable
            .interval(1000)
            .publish()
            .refCount();

setTimeout(() => {
  // delay cả 2 Subscriber 2 giây
  obs.subscribe(v => console.log("1st subscriber:" + v));
  setTimeout(
    // delay sau khi Subscriber 1 subscribe Observable hơn 1 giây
    () => obs.subscribe(
          v => console.log("2nd subscriber:" + v)), 1100);

},2000);

//Kết quả nhận được là Subsriber 1 nhận được đầy đủ giá trị của v, trong khi Subscriber 2 chỉ nhận được bắt đầu từ v = 1, đó là do obs đã trở thành Hot sau khi được Subscriber 1 subscribe và nó không quan tâm Subcriber 2 có đang subcribe nó hay không.

HTTP request in Angular 2

OK, nếu đã sử dụng Angular 2(hay version mới nhất là 4 - gọi tắt là Angular vì kể ra Angular 2 chúng ta sẽ không phân biệt nó dựa vào version nữa, đơn giản là Angular thôi - Angular 1 sẽ là angular.js) thì bạn sẽ biết Http service dùng để gửi request trong Framework này trả về kết quả là 1 Observable. Và bây giờ bạn sẽ được biết thêm đây chính xác là 1 Cold Observable. Chỉ khi nào bạn subscribe() nó thì request mới được gửi đi. Điều này khá hiển nhiên vì không phair lúc nào bạn cũng cần gửi request phải không. Đây là Example hướng dẫn sử dụng HTTP service trên trang chủ documentation của Angular http://plnkr.co/edit/Vq9fHqfTIo6OU746LW1c?p=preview Quan sát app.component

...
@Component({
  ...
  template: `
    <ul>
      <li *ngFor="let contact of contacts | async">{{contact.name}}</li>
    </ul>
    `
  ...
})
export class AppComponent {
  contacts: Observable<Array<any>>;
  constructor (http: Http) {
    this.contacts = http.get('contacts.json')
                        .map(response => response.json().items);
  }
}

Nhiệm vụ của nó là get nội dung từ file contact.json và in ra màn hình. contacts.json

{
  "items": [
    { "name": "John Conner" },
    { "name": "Arnold Schwarzenegger" }
  ]
}

Sẽ ra sao nếu tôi sửa template như sau

1st List
<ul>
  <li *ngFor="let contact of contacts | async">{{contact.name}}</li>
</ul>
2st List
<ul>
  <li *ngFor="let contact of contacts | async">{{contact.name}}</li>
</ul>

Bật F12 lên bạn sẽ thấy là đang có 2 request được gửi đi. Mặc dù tôi chỉ sử dụng 1 property contact để gọi http service, như đang nói ở trên thì đây chính là tính chất của Cold Observable, nó đã tạo ra 2 instance của http request.Hãy sử dụng hàm publish() để biến nó thành Hot Observable và vấn đề sẽ được giải quyết.

this.contacts = http.get('contacts.json')
                        .map(response => response.json().items)
                        .publish()
                        .refCount();

Những vẫn chưa triệt để, nếu bạn muốn sử dụng contacts list thứ 2 trong 1 điều kiện khác, có thể là sau vài giây hoặc khi có 1 sự kiện từ người dùng

@Component({
  ...
  template: `
    1st List
    <ul>
      <li *ngFor="let contact of contacts | async">{{contact.name}}</li>
    </ul>
    2st List
    <ul>
      <li *ngFor="let contact of contacts2 | async">{{contact.name}}</li>
    </ul>
    `,
  ...
})
export class AppComponent {
  contacts: Observable<Array<any>>;
  contacts2: Observable<Array<any>>;
  constructor (http: Http) {
    this.contacts = http.get('contacts.json')
                        .map(response => response.json().items)
                        .publish()
                        .refCount();

    setTimeout(() => this.contacts2 = this.contacts, 500);
  }
}

Contacts2 không xuất hiện, lý do vì http request lúc này là 1 Hot Observable và nó không quan tâm đến những gì subscribe nó sau lần đầu tiên. Để giải quyết hãy dùng operator publishLast() Operator này giúp tất cả các Observer đang subsribe có thể nhìn thấy giá trị cuối cùng mà Observable đã emit, bất kể thời điểm nào.

this.contacts = http.get('contacts.json')
                    .map(response => response.json().items)
                    .publishLast()
                    .refCount();

Vậy là vấn đề đã được giải quyết. Như các bạn đã thấy 1 vấn đề nhỏ nhưng Angular đã cung cấp cho chúng ta thứ vũ khí tuyệt vời để giải quyết hãy tưởng tượng mớ code loằng ngoằng cần thiết để lưu, cache, logic cho những công việc tương tự ví dụ này xem, chúng ta hoàn toàn có thể tập trung hơn vào xây dựng bussiness logic hơn là đi sau vào implementation phải không?

Ending

Thực ra sau bài này, những gì căn bản nhất cần truyền tải về Reactive Extension và FRP cũng đã hết. Nếu bạn có thể đọc hết 4 bài từ đầu đến giờ tôi chắc là bạn cũng đã hình dung ra FRP và Rxjs là như thế nào và nó hoạt động ra sao. Các operator thì bạn hoàn toàn có thể tham khảo ở đây, có các ví dụ bằng hình rất trực quan. Và một số khái niệm nâng cao cũng như các API thì tham khảo ở đây. Nhưng như tôi đã nói ngay từ những bài đầu, FRP quan trọng nhất là cách suy nghĩ giải quyết vấn đề chứ không phải thao tác code, việc này chỉ đạt được sau qua trình rèn luyện mà tôi cũng đang trong quá trình đó. Rất cảm ơn mọi người đã quan tâm series này !

References

http://reactivex.io/ https://blog.thoughtram.io/angular/2016/06/16/cold-vs-hot-observables.html#caveat-http-with-observables https://medium.com/@benlesh/hot-vs-cold-observables-f8094ed53339


All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.