Reactive programing with Java [Part 2]

Dẫn nhập

Chào mọi người, tiếp tục về chủ đề xoay quanh Reactive Programing, hôm nay tôi sẽ tiếp tục đi sâu vào hơn các khái niệm sử dụng trong phương thức lập trình mới mẻ, hiện đại và mạnh mẽ này. Ở phần trước, tôi đã giới thiệu về các khái niệm cơ bản về Reactive Programing, bản chất, ưu điểm và cách khởi tạo một Observable đơn giản trong RxJava. Hôm nay, tôi sẽ đi sâu hơn các khái niệm khác trong RxJava như Scheduler, Operator, Subscription và các ví dụ cụ thể làm việc với RxJava.

Scheduler

Chúng ta đã biết cách khởi tạo một Observable đơn giản và lắng nghe trên chúng, thế nhưng, một trong những thế mạnh của ngôn ngữ lập trình hiện đại là khả năng xử lý đa luồng. Nếu một ứng dụng chỉ sử dụng một thread duy nhất thì ứng dụng của bạn chắc chắn sẽ gặp nhiều vấn đề bộ nhớ, về performance và bạn đang vô tình giết chết xu hướng lập trình bất đồng bộ để nâng cao hiệu suất ứng dụng. Và thực tế, chẳng ai làm điều đó, đặc biệt là với một úng dụng đòi hỏi việc xử lý phức tạp và tốn nhiều hiệu năng, sao bạn không sử dụng hết thế mạnh của CPU, của ngôn ngữ lập trình hiện đại?

Có một vài vấn đề khi bạn sử dụng multi threading trong lập trình, đó là việc quản lý thread, quản lý các tác vụ trong từng thread, khi nào nhận callback, khi nào update lên UI là điều không hề đơn giản , RxJava sẽ giúp các bạn làm việc đa luồng một cách vô cùng hiệu quả và tối thiểu hóa việc sinh code của bạn bằng một khái niệm gọi là Scheduler.

Scheduler bao gồm 2 phương thức cơ bản:

  • subscribeOn() : Mặc định khi bạn tạo một Observable, thread thực hiện việc emit data chính là thread bạn tạo Observable đó. Tuy nhiên, đôi khi chúng ta cần xử lý chúng ở một thread riêng biệt, chẳng hạn như network request, transfer data from disk hay tính toán một biểu thức phức tạp, việc làm chúng ở main thread là giải pháp rất tệ. Do đó, để xử lý chúng ở một thread riêng biệt, subsribeOn sẽ giúp bạn switch việc xử lý và emit data sang một thread khác được định nghĩa trong Scheduler.

    Lưu ý rằng, subcribeOn chỉ có tác dụng một lần, tức là bạn chỉ cần khai báo một lần duy nhất.

    Observable.just(1,2,3,4).subscribeOn(Schedulers.computation());

    Ở ví dụ trên, việc emit data trong mảng gồm 4 phần tử đó sẽ được thực hiện ở Computation Thread, một thread chạy ở background của ứng dụng, và dĩ nhiên khi bạn thực hiện subscribe trên Observable đó, các callback cũng chạy trên Computation thread. Trong Android, việc update UI trên background thread là điều không cho phép, do đó bạn cần switch thread này về main thread để thực hiện update UI. observeOn sẽ giúp bạn thực hiện điều này.

  • observeOn() : là phương thức interceptor, cho phép bạn gọi nhiều lần để switch worker thread giữa các Observable khi bạn muốn ghép chúng lại thành một data stream duy nhất. Đơn giản hơn, ở ví dụ trên, khi bạn muốn subscibe trên Main thread, bạn sẽ intercept vào để chuyển thread sang Main thread bằng lệnh observeOn(AndroidSchedulers.mainThread())

Bên dưới là ví dụ cụ thể để chúng ta nắm rõ hơn về cơ chế hoạt động của Scheduler.

Chúng ta nhận thấy method subscribeOn trên thread 'tam giác xanh' sẽ quyết định thread emit data ở data stream thứ nhất. Method observeOn trên thead 'tam giác vàng' sẽ switch worker thread sang thread 'tam giác vàng' và operator map sau đó cũng được thực hiện trên chính thread này. Method observeOn trên thread 'tam giác hồng' cuối cùng sẽ switch worker thread sang thread tưong ứng và dĩ nhiên, chúng sẽ được subscribe trên chính thread đó.

Operator

RxJava cung cấp cho chúng ta rất nhiều operator hữu hiệu để thực hiện việc filter stream, compute, gộp 2 stream thành một, chuyển đổi stream này thành stream khác … Chúng ta sẽ đi sâu vào từng operator cơ bản.

Filter

Operator này sẽ filter dữ liệu trước khi expose cho subscriber.

Chúng ta sẽ đến với ví dụ sau

Observable.just(1,2,3,4).filter(new Func1<Integer, Boolean>() {
    @Override
    public Boolean call(Integer integer) {
        return integer % 2 == 0;
    }
});

Một mảng dữ liệu gồm 4 phần tử [1,2,3,4] sẽ được emit lần lượt, tuy nhiên trước khi được expose cho subscriber, nó sẽ được filter các phần tử chỉ chia hết cho 2 và chỉ emit các phần tử đó. Operator này thực sự hữu hiệu khi bạn muốn filter một data stream theo một criteria nào đó mà không cần phải xử lý ở subscriber.

Scan

Operator hữu dụng khi bạn muốn thực hiện một phép tính toán dựa trên kết quả trả về trước đó, nói cách khác, operator này sẽ thực hiện emit data là giá trị của phép toán có 2 tham số đầu vào là item sẽ được emit tiếp theo và kết quả của phép toán trước đó. Đến với biểu đồ sau để hiểu rõ hơn nhé.

Ta có một ví dụ bên dưới:

Observable.just(1,2,3,4, 5).scan(new Func2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer num1, Integer num2) {
        return num1 + num2;
    }
});

Khi ta thực hiện subscribe trên Observable trên, kết quả trả về ở hàm onNext lần lượt sẽ là : [1, 3, 6, 10, 15]

Map

Operator được sử dụng khá nhiều trong trường hợp bạn muốn convert dữ liệu trước khi expose cho subscriber. Operator này sẽ thực hiện việc convert dữ liệu thông qua một function chứa các logic để convert. Nói cách khác, operator này hỗ trợ việc chuyển đổi data stream này thành một data stream khác, thay đổi kiểu dữ liệu trả về, hữu dụng trong rất nhiều trường hợp, ví như muốn convert dữ liệu từ API trả về sang một dữ liệu phù hợp để hiển thị trên view.

Đến với ví dụ bên dưới:

Observable.just("Hieu", "Dep", "Trai", "Nhung", "Ao", "Tuong")
    .map(new Func1<String, Integer>() {
        @Override
        public Integer call(String s) {
            return s.length();
        }
    });

Operator map ở ví dụ trên sẽ thực hiện việc chuyển đổi việc emit lần lượt các String thành độ dài của nó.

Flat Map

Operator này tương tự như map, thực hiện việc chuyển đổi data stream này thành một data stream khác, synchonous, thường đường dùng để thực hiện nhiều công việc một cách đồng bộ, chuyển đổi kiểu dữ trả về của data stream.

Observable.just("Hieu", "Dep", "Trai", "Nhung", "Ao", "Tuong")
        .flatMap(new Func1<String, Observable<String>>() {
            @Override
            public Observable<String> call(String s) {
                return Observable.just("Va", "Tu", "Tin");
            }
        });

Merge

Operator được dùng để gộp nhiều dòng dữ liệu lại với nhau, thực hiện một cách bất đồng bộ. Trong nhiều trường hợp nó tỏ ra thực sự hữu hiệu, ví như gọi nhiều API cùng một lúc, thực hiện nhiều task song song và bất đồng bộ. Merge sẽ gọi onNext trên subscriber nhiều lần, tương ưng với các lần emit của các data stream.

Đến với ví dụ bên dưới:

Observable.merge(Observable.just(1, 2), Observable.just(3, 4), Observable.just(5, 6)); merge operator trong ví dụ trên sẽ gộp 3 data stream thành một và emit theo thứ tự: [1,2,3,4,5,6]

Merge delay error

Operator này tương tự như merge, chỉ khác ở chỗ nó sẽ emit hết data trước khi bắn ra exception, khác với merge sẽ bắn ra exception ở bất kỳ lúc nào xảy ra lỗi và dừng dòng dữ liệu ngay tại điểm đó.

Observable.merge(Observable.just(1, 2),
        Observable.just(3, 4).map(new Func1<Integer, Integer>() {
            @Override
            public Integer call(Integer integer) {
                throw new RuntimeException("Test error");
            }
        }), Observable.just(5, 6))

Ở ví dụ trên, data stream sẽ emit [1,2] trước khi bắn ra lỗi và dừng tại đó. Ta sẽ thay thế bằng mergeDelayError, kết quả sẽ emit lần lượt [1,2,5,6]exception được bắn sau đó.

Subscription

Là khái niệm mối liên kết giữa ObservableSubscriber khi tạo một kết nối giữa chúng. Nói cách khác, khi một subscriber tiến hành subsribe trên một Observable, nó sẽ tạo ra một Subscription. Trong lập trình với RxJava, bạn phải nhớ luôn tiến hành unSubscribe trên Subscription này trước khi thoát khỏi Activity hay Fragment để tránh tình trạng leak memory xảy ra và quản lý life cycle của ứng dụng một cách khoa học.

Kết luận

Ở bài viết này, tôi đã giới thiệu đến các bạn một số khái niệm trong RxJava sẽ giúp ích rất nhiều cho các bạn trong việc tiếp cận với xu hướng lập trình mới này. Ở bài viết sau, tôi sẽ đi sâu vào các ví dụ cụ thể hơn để làm rõ đặc tính của từng operator, kết hợp việc sử dụng operator với nhau cũng như multi threading với Scheduler. Cảm ơn đã theo dõi bài viết.