Reactive Extensions - một vài thứ quan trọng trước khi bắt đầu

Reactive Extensions được đông đảo developer biết đến với cái tên ngắn gọn là Rx và việc áp dụng những library như :

  • Java: RxJava
  • JavaScript: RxJS
  • JRuby: RxJRuby
  • Kotlin: RxKotlin
  • Swift: RxSwift
  • PHP: RxPHP
  • ....

Dựa trên những gì tôi tìm hiểu và áp dụng vào dự án của mình được một thời gian, có 1 vài điểm cốt lõi từ góc nhìn cá nhân mình để cung cấp cho các bạn mới bắt đầu làm quen sẽ dễ dàng tìm hiểu cũng như áp dụng Rx vào dự án của mình được nhanh chóng hơn!

1. Xuất phát từ một vấn đề

Câu chuyện khởi đầu từ phương pháp bất đồng bộ, chúng ta mong muốn sẽ giải phóng công việc của main thread cần xử lý làm cho Application chạy mượt mà hơn. Mong muốn cải thiện trải nghiệm người dùng và nâng cao tốc độ xử lý chức năng của ứng dụng. Đó là những gì chúng ta đã tổng kết lại với mục đích sử dụng Asynchronous phải không nào. Từ đó nếu như công việc nặng nề của main thread phải xử lý đều nằm trong background thì UI sẽ được update nhẹ nhàng hơn, từ cái nhìn của người dùng có nhiều thiện cảm hơn . Mong muốn cao hơn khi một developer cần một library có thể đảm nhận được những value sau:

  • Explicit execution:

Trước khi thực hiện 1 thread mới chúng ta có thể chuẩn bị những nền tảng cần thiết và sau đó chúng ta chỉ việc kick-off công việc đó ở tầng background.

  • Easy thread management:

Công việc này luôn được ưu tiên hàng đầu khi thực hiện Asynchronous , nếu như chúng ta chỉ có start 1 thread mới mà không giải phóng nó ngay sau khi công việc đấy đã hoàn thành sẽ dẫn đến việc rò rỉ bộ nhớ. Càng khó khăn hơn khi quá nhiều thread đang chạy mà bạn không biết sẽ phải kill thread nào trước. Mindset của chúng ta trong việc xử lý này rất đơn giản, thực hiện công việc logic ở background thread sau khi xong sẽ update vào UI (trong main thread) và control tốt hơn là có thể switch các thread khác nhau 1 cách dễ dàng khi cần.

  • Easily composable:

Đôi lúc chúng ta sẽ cần nhiều task sẽ phải hoạt động trong thread background mà chúng không hề phụ thuộc đến nhau. Mỗi thread khi thực hiện xong chỉ việc update vào UI tại main thread mà thôi, ngoài ra việc xử lý thông báo lỗi cũng được định nghĩa sẵn nếu 1 trong các thread gặp vấn đề.

  • Minimum the side effects:

Hạn chế tối thiểu việc xung đột từ những thread khác khi chạy cùng lúc. Code của bạn khi viết ra nó phải clear và dễ đọc cho những người khác thừa kế lại.

=> Trên đây là 4 values của một Library chúng ta đang tìm kiếm, chắc hẳn các bạn cũng biết tôi đang muốn nhắc đến ReactiveX phải không nào? Khi xuất hiện vấn đề thì chúng ta sẽ bắt tay đi tìm giải pháp, nào bây giờ chúng ta đã muốn xem ReactiveX là gì và quan trọng hơn cốt lõi bên trong của nó ra sao?? Bạn hãy đọc tiếp để bóc tách vấn đề nhé!

2. ReactiveX là gì ?

Định nghĩa bởi Wiki:

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s), and that an inferred dependency within the associated execution model exists, which facilitates the automatic propagation of the change involved with data flow.

Ngắn gọn : Trong ReactiveX những data stream sẽ được phát đi từ 1 component nằm dưới tầng cấu trúc của library, và data sẽ được chuyển tới 1 component khác đã đăng kí khi nhận được sự thay đổi của data stream.

Thành phần chính tạo lên Rx library:

RX = OBSERVABLE + OBSERVER + SCHEDULERS

  • Observable:

Observable không có gì khác ngoài luồng dữ liệu, các gói dữ liệu của Observable có thể truyền từ thread này sang thread khác. Nó làm nhiệm vụ phát đi data có thể theo chu kì lặp lại hoặc chỉ 1 lần theo cách cài đặt của bạn.

  • Observers:

Observers nhận data stream từ Observable thông qua method đăng kí nhận subscribeOn() Sau khi Observable phát đi data stream tất cả dữ liệu sẽ truyền tới những Observer đã đăng kí thông qua call back onNext() , ở đây bạn có thể phân tích data ( chuỗi Json nhận được) hoặc cập nhật UI. Trong tình trạng xảy ra lỗi bạn sẽ xử lý tại call back onError().

  • Schedulers:

Như bạn thấy được 1 trong 4 values ở trên thì việc quan trọng nhất chính là quản lý bất đồng bộ ** Easy thread management** , Schedulers sẽ làm nhiệm vụ quy định

  • Observable chạy trên thread nào thông qua method : scheduleOn()
  • Observers chạy trên thread do method observeOn()
  • Schedulers.newThread() : tạo mới 1 background thread
  • Schedulers.io() : thực thi code của bạn trên IO thread (nếu cần)

3. Ví dụ

Từ hình vẽ trên tôi giải thích 1 chút, data stream từ Thread 1 - Observable sẽ phát đi dựa trên cấu hình phân định thread nào Observable sẽ được chạy. Sau đấy Observers đăng kí nhận data khi có sự thay đổi luồng dữ liệu và hoạt động ở Thread 2.

Bước thực hiện:

Step 1: Khởi tạo Observable chứa data stream.

Observable<String> database = Observable      //Observable. This will emit the data
                .just(new String[]{"1", "2", "3", "4"});    //Operator

Step 2: Khởi tạo Observers nhận data stream

Observer<String> observer = new Observer<String>() {
           @Override
            public void onCompleted() {
                //Notify when finish all data in Observable
            }

            @Override
            public void onError(Throwable e) {
                //Push error message when fail
            }

            @Override
            public void onNext(String s) {
                //Handle receiving any data from Observable
            }
        };

Step 3: Cài đặt việc quản lý thread cho Observable và Observers

database.subscribeOn(Schedulers.newThread())          //Observable runs on new background thread.
        .observeOn(AndroidSchedulers.mainThread())    //Observer will run on main UI thread.
        .subscribe(observer);                         //Subscribe the observer

RxDemo.java

Observable<String> database = Observable      //Observable. This will emit the data
                .just(new String[]{"1", "2", "3", "4"});    //Operator

 Observer<String> observer = new Observer<String>() {
           @Override
            public void onCompleted() {
                //...
            }

            @Override
            public void onError(Throwable e) {
                //...
            }

            @Override
            public void onNext(String s) {
                //...
            }
        };

database.subscribeOn(Schedulers.newThread())          //Observable runs on new background thread.
        .observeOn(AndroidSchedulers.mainThread())    //Observer will run on main UI thread.
        .subscribe(observer);                         //Subscribe the observer

4. Tổng kết

Bài viết tổng hợp những kiến thức cốt lõi về Reactive Extensions của tôi tạm khép lại ở đây, phần trình bày dựa trên khá ngắn gọn với những phần chính hy vọng sẽ làm cho các bạn mới tìm hiểu sẽ có 1 góc nhìn đơn giản hơn. Cũng như các bạn đã có kinh nghiệm làm việc rồi sẽ có thể áp dụng 1 chút nào đó để tiếp tục phát triển với những góc cạnh khác của ReactiveX vì thực tế nó còn rất nhiều những phần sâu hơn mà mình cũng đang tìm hiểu. Thời gian đầu tôi hay bị nhầm lẫn 1 số thành phần trong Rx ngay cả khi đã áp dụng vào dự án, bài viết sắp tới sẽ là một góc cạnh mới và rất thú vị trong Rx đó ! 😄 Happy coding...