[Reactive Functional Programing] Xây dựng ứng dụng Suggestion Follower với Rxjs

Overview

Trong bài lần trước tôi đã giới thiệu cho các bạn những khái niệm cơ bản nhất về RFP, lần này hãy đi sâu vào 1 ứng dụng thực tế theo phong cách RFP. Ứng dụng thực tế luôn là cách nhanh nhất để học về 1 ngôn ngữ mới, bên cạnh đó bạn sẽ trả lời được câu hỏi “Tại sao?” từng bước một. Nếu bạn đã từng sử dụng Twitter hẳn sẽ không lạ gì giao diện sau:

Chúng ta sẽ giả lập lại chức năng này, nó bao gồm những công việc sau:

  • Khi khởi động, load dữ liệu ra và hiển thị 3 suggestion
  • Khi click refresh, load ra 3 account khác
  • Khi click nút “x" , sẽ load ra account khác chỉ ở dòng đó
  • Mỗi dòng sẽ hiển thị avatar và link tới trang cá nhân của account

Thay vì sử dụng Twitter, chúng ta sẽ sử dụng API của Github, cũng khá tương tự với Twitter thôi, nhưng chúng ta sẽ không cần phải authorized như Twitter. Click để xem chi tiết hơn Github API Getting Users https://developer.github.com/v3/users/#get-all-users . Dành cho những ai muốn xem luôn kết quả thì Live Example ở phần cuối bài.

Request và Response

Chức năng đầu tiên: Khi khởi động, load dữ liệu ra và hiển thị 3 suggestion. Chức năng này sẽ được thực hiện như sau:

  • Gửi đi 1 request
  • Nhận về response
  • Dựa vào response trả về để render ra view

Đừng quên rằng “Everything can be a stream", điều này có nghĩa là mọi requests chúng ta gửi đi là 1 stream. Khi ứng dụng chạy lần đầu, chúng ta chỉ cần gửi 1 request nên stream cũng sẽ chỉ có 1 giá trị. --a------|-> Đây là stream các URLs mà ta sẽ request trong toàn bộ ứng dụng. Bất cứ khi nào 1 request gửi đi, cho chúng ta biết 2 điều: Khi nào 1 request được thực hiện Request đó là request cái gì? Thuật ngữ “chính thức" của stream là Observable, và trong rxjs chúng ta dùng đoạn code sau để tạo ra Observable:

var requestStream = Rx.Observable.just('https://api.github.com/users');

Việc chúng ta vừa làm mới chỉ là tạo ra 1 string stream, và để thực hiện 1 công việc khi stream có thêm 1 giá trị mới, ta cần Subscribe nó

requestStream.subscribe(function(requestUrl) {
    // Thực thi request
    jQuery.getJSON(requestUrl, function(responseData) {
    // ...
  });
}

Ở đây, tôi sử dụng hàm getJSON để thực hiện AJAX request mỗi khi có 1 string URL được emit ra từ url stream. Nếu chỉ dừng lại ở đây, bạn cũng có thể sử dụng callback đi kèm với hàm getJSON để thao tác với response trả về, nhưng với RFP, hãy biến những responses trong tương lai trở thành 1 stream như cách đã làm với các requests.

requestStream.subscribe(function(requestUrl) {) z{
  // Thực thi request
  var responseStream = Rx.Observable.create(function (observer) {
    $.getJSON(requestUrl)
    .done(function(response) { observer.next(response); })
    .fail(function(jqXHR, status, error) { observer.error(error); })
    .always(function() { observer.complete(); });
  });

  responseStream.subscribe(function(response) {
    // Thao tác với response
    console.log(response);
  });
});

Hàm Observable.create() tạo ra 1 custom observable bằng cách báo cho các Observers đang lắng nghe biết khi nào có 1 value được emit từ source, lỗi hay complete bằng cách gọi đến các method tương ứng của Observer. Bản thân Observable là 1 Promise++, thế nên bạn có thể convert 1 Promise trở thành 1 Observable bằng cách:

var stream = Rx.Observable.fromPromise(promise)

1 Promise là trường hợp đặc biệt của Observable với chỉ 1 giá trị được emit.

Sau khi 1 Promise đã được resolve hoặc reject thì nó sẽ rơi vào trạng thái đó luôn chỉ 1 lần duy nhất, còn Observable thì khác, nó sản sinh ra đa giá trị trong stream của mình. Nói cách khác Promise sẽ chỉ push data tới consumer 1 lần trong vòng đời của mình, còn khi Observer đã subscribe 1 Observable, nó có thể được gọi tới nhiều lần.

Nhìn lại đoạn code trên, ta thấy rằng đang có 1 Subscription ( là hành động subscribe 1 observable) bên trong 1 Subscription khác, điều này cũng tương tự như việc tạo ra 1 callback hell với nhiều đoạn code lồng vào nhau. Rxjs có thể cung cấp cơ chế transform stream dựa trên 1 stream gốc và ở đây ta sử dụng hàm map() để tạo ra response stream dựa trên request stream. Done, Subscription hell đã được triệt tiêu.

var responseMetastream = requestStream
  .map(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

Response stream lúc này được gọi là 1 metastream, có nghĩa là với mỗi giá trị được emit từ requests stream thì ta thu được 1 response stream. Ở ví dụ của chúng ta, mỗi url được map tới 1 promise ( mà đã được convert thành Observable ) chứa response ứng với nó.

Bây giờ kết cấu cũng như flow của code đã ngon lành hơn rất nhiều so với việc sử dụng callback. Nhưng ta còn có thể làm tốt hơn nữa, thay vì tạo ra mỗi response stream khi có 1 request những gì tôi nghĩ tới là 1 response stream duy nhất, mà mỗi giá trị emit của nó là 1 JSON object, không phải là 1 Promise. Để làm được việc này ta sẽ sử dụng function flatMap().

Nhiệm vụ của hàm này là tạo ra “trunk stream" và khi các stream con (được tạo ra từ url value ở requests stream) emit 1 value => giá trị đó cũng sẽ được emit trên “trunk stream". Lúc này mỗi khi có 1 request mới được tạo ra, ta thu được 1 response tương ứng trên responseStream. Code của chúng ta đã thực sự "reactive", mọi hành động đều đang dựa trên data stream.

requestStream:  --a-----b--c------------|->
responseStream: -----A--------B-----C---|->

Việc còn lại chỉ là dựa vào response trả về để render view.

var requestStream = Rx.Observable.just('https://api.github.com/users');

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

responseStream.subscribe(function(response) {
  // render ra view dựa vào response nhận được
});

The refresh button

Chức năng tiếp theo bây giờ được tóm tắt lại là: Khi click vào nut Refresh => đẩy 1 request mới vào requests stream => nhận được response mới. Việc đầu tiên cần làm là tạo ra 1 stream sự kiện click chuột. Rxjs cung cấp hàm fromEvent() để tạo ra 1 Observable chứa các data DOM events.

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

refreshClickStream, bản thân nó không mang 1 giá trị URL nào => Ta cần map mỗi event click với 1 URL để sau đó có thể đẩy URL này vào requestStream. Ngoài ra, hãy thêm 1 random parameter để danh sách users là ngẫu nhiên.

 var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

Với những thay đổi như vậy, ta vô tình làm mất request khi start up ứng dụng, bây giờ chỉ khi nào click Refresh thì danh sách users mới được trả về. Chúng ta sẽ phải tạo lại Observable đầu tiên với chỉ 1 giá trị được emit.

var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

Sau đó dùng function merge(), để gộp 2 Observable này lại với nhau.

stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
          vvvvvvvvv merge vvvvvvvvv
          ---a-B---C--e--D--o----->

Code tổng thể như sau :

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

var requestStream = Rx.Observable.merge(
  requestOnRefreshStream, startupRequestStream
);

Bạn cũng có thể viết như sau, ngắn gọn và dễ đọc hơn, về mặt logic không có gì khác biệt.

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  })
  .startWith('https://api.github.com/users');

Bất kể data source như thế nào, function startWith(argument) đều đẩy ra output 1 value chính là argument của nó ở thời điểm bắt đầu của stream. Vận dụng điều này, giả lập 1 refresh click vào lúc bắt đầu thay vì đẩy trực tiếp 1 URL string vào.

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

Bây giờ thì code đã gọn gàng và mang tính Reactive hơn nhiều rồi.

Modelling the 3 suggestions with stream

Cho đến giờ, mọi thứ đã hoạt động khá ổn, tuy vậy ở bước refresh, phát sinh 1 vấn đề. Khi ta click nút Refresh, nên làm thế nào để clear list user cũ đi? Hẳn nhiên việc này có thể thực hiện đơn giản bằng jQuery khi nhận được response trả ta có reset DOM đi và append nội dung mới vào. Hoặc nghĩ theo hướng RFP, quan sát stream click events và clear list user cũ đi như sau:

refreshClickStream.subscribe(function() {
    $('.users').html('');
});

Lúc này, ta đang subscribe 2 observer cho 1 events stream, điều này là không tốt, nó đi ngược lại nguyên tắc mỗi phân đoạn code giải quyết 1 vấn đề cũng như rất khó giải quyết được vấn đề khi cần thay thế 1 trong 3 user suggestion. Để giải quyết điều này, 1 lần nữa hãy hình tượng hoá các user suggestion thành 1 stream, mà mỗi giá trị được emit chính là 1 JSON object chứa user data. Và đây là stream của user ở dòng thứ 1:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // trả về 1 user random trong list users
    return listUsers[Math.floor(Math.random()*listUsers[length])];
  });

Chuyển công việc render ra view cho mỗi suggestion stream:

suggestion1Stream.subscribe((suggestion) => {
  // render user đầu tiên ra DOM
  var html = '';
        html += 
            "<tr>
                <td>${suggestion.id}</td>
                <td><a href="${suggestion.url}">${suggestion.login}</a></td>
                <td><img width="50px" height="50px" src="${suggestion.avatar_url}" alt="" /></td>
            </tr>";
    $('.users').append(html);
});

Khi Refresh lại, ta đơn giản map click event thành 1 null data suggestion, sau đó merge vào stream suggestion => Mỗi khi click refresh sẽ có 1 null data suggestion được đẩy vào suggestion stream.

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // trả về 1 user random trong list users
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  );

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // Xóa user đầu tiên trong DOM đi
  }
  else {
    // render user đầu tiên ra DOM
  }
});

Mô phỏng bằng diagram: ( N = Null )

refreshClickStream: ----------o--------o---->
     requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->   
 suggestion1Stream: ----s-----N---s----N-s-->
 suggestion2Stream: ----q-----N---q----N-q-->
 suggestion3Stream: ----t-----N---t----N-t→

Thêm nữa, ta cũng có thể render 1 suggestion rỗng lúc khởi động bằng function startWith(null). Kết quả cuối cùng :

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // trả về 1 user random trong list users
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);
refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->   
 suggestion1Stream: -N--s-----N----s----N-s-->
 suggestion2Stream: -N--q-----N----q----N-q-->
 suggestion3Stream: -N--t-----N----t----N-t→

Close a suggestion và sử dụng response đã được cached

Đây là tính năng cuối cùng trong ứng dụng. Mỗi suggestion có 1 nút “x" để đóng nó lại cũng như load ra 1 user khác. Response chúng ta nhận được từ những phần trước chứa 100 users, không có lý do gì để gửi lại 1 request nữa cho mỗi user chúng ta cần refresh. Một lần nữa, hãy nghĩ tới stream. Khi nút close 1 được bấm, ta sẽ dùng value mới nhất trên response stream để lấy 1 user khác.

   requestStream: --r--------------->
   responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->

RX cung cấp function combineLastest() giúp ta làm điều đó, hàm này nhận 2 stream A và B làm đầu vào, và khi một trong số 2 stream này có value được emit, hàm này sẽ kết nối 2 giá trị (x, y) được emit gần nhất của mỗi stream để xuất ra giá trị c = f(x,y) như sau:

stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          ----AB---AC--EC---ED--ID--IQ---->

Ta sẽ apply hàm này vào close1ClickStream và responseStream, và kết quả là mỗi khi close 1 button được bấm, chúng ta nhận được response mới nhất từ responseStream và ngược lại mỗi khi response mới được trả về, nó kết hợp với event close gần nhất để sản sinh ra suggestion mới.

var suggestion1Stream = close1ClickStream
  .combineLatest(responseStream,             
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

Nhìn vào diagram phía trên, bạn có thể thấy là combineLastest() sử dụng các giá trị gần nhất từ 2 stream, nhưng nếu 1 trong 2 stream chưa sản sinh giá trị thì combineLastest() cũng không thể xuất ra output được. Giải quyết điều này bằng cách giả lập 1 click khi khởi động ứng dụng.

var suggestion1Stream = close1ClickStream.startWith('startup click') // stimulate a click
  .combineLatest(responseStream,             
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

Wrapping up

Có thể thấy, xây dựng ứng dụng theo hướng Reactive Functional thực sự chất, nó cung cấp đầy đủ khả năng quản trị multievents, cũng như khả năng phân chia bài toán. Một khía cạnh khác thì lập trình hướng chức năng làm cho code trở nên sáng sủa hơn, không mang nặng tính "ép buộc" (imperative). Thay vì đưa ra các chỉ thị, ta định nghĩa các mối quan hệ và dựa vào đó nói cho máy tính biết ta cần làm gì hơn là ra lệnh cho nó. Thâm chí, với Functional Programing bạn hoàn toàn có thể loại bỏ những câu lệnh if() else() bằng cách sử dụng rx operator mà chúng ta sẽ đi tiếp ở những bài sau. Functional Programing đem tới nhiều sức mạnh hơn với số lượng dòng code ngắn hơn, dễ hiểu hơn, trong sáng hơn.

Live Example (http://jsbin.com/voxegisuxo/edit?html,js,console,output) Các bạn có thể tham khảo ứng dụng đã xây dựng, với 1 row suggestion tại đây, 2 row còn lại có thể làm tương tự, bước đầu hãy copy và paste để làm 2 row còn lại. Để đạt được mục đích DRY your code chúng ta sẽ tiếp tục đi sâu hơn nữa ở những bài viết tiếp theo.


All Rights Reserved