Cơ bản về RxAndroid

Bài viết sau đây trình bày về những kiến thức cơ bản để bắt đầu sử dụng RxJava trong các ứng dụng Android.

Một số khái niệm

Trước khi đi vào tìm hiểu theo những ví dụ, hãy bắt đầu với một số khái niệm cơ bản. Cốt lõi của RxJava là 2 khái niệm: Observable và Observer. Observable sẽ phát ra các giá trị. Đối tác của nó là Observer quan sát Observable bằng việc đăng ký lắng nghe.

Observer có thể thực hiện những hành động khi Observable phát ra một giá trị, khi Observable cho biết một lỗi đã xảy ra, hoặc khi Observable thông báo không còn giá trị nào để phát ra. Ba hành động này được đóng gói trong giao diện Observer với các phương thức tương ứng là onNext(), onError()onCompleted().

Với những khái niệm trên, hãy cùng đi vào tìm hiểu việc sử dụng RxAndroid.

Sử dụng cơ bản

Activity sẽ đơn giản thực hiện việc hiển thị danh sách color. Chúng ta sẽ tạo một Observable thực hiện phát ra một giá trị duy nhất là list string rồi hoàn tất. Giá trị phát ra này sẽ được thiết lập để hiển thị. Để làm được việc này, chúng ta sử dụng phương thức Observable.just(). Phương thức này tạo ra một Observable như vậy khi có một Observer thực hiện đăng ký, phương thức onNext() sẽ ngay lập tức được gọi với đối số được cung cấp trong Observable.just(). Phương thức onCompleted() sau đó sẽ được gọi khi Observable không còn giá trị nào để phát ra.

Observable<List<String>> listObservable = Observable.just(getColorList());

Chú ý rằng getColorList() là phương thức non-blocking. Tại sao lại như vậy, chúng ta sẽ trở lại giải thích ở mục sau.

Bước tiếp theo, chúng ta cài đặt một Observer để thực hiện quan sát Observable.

listObservable.subscribe(new Observer<List<String>>() {

    @Override
    public void onCompleted() { }

    @Override
    public void onError(Throwable e) { }

    @Override
    public void onNext(List<String> colors) {
        mSimpleStringAdapter.setStrings(colors);
    }
});

Như đã đề cập ở trên, khi đăng ký quan sát Observable với phương thức subcribe(), thứ tự sau sẽ xảy ra:

  1. Phương thức onNext() được gọi và danh sách color phát ra được thiết lập làm dữ liệu cho Adapter.
  2. Vì không có thêm dữ liệu (Chúng ta chỉ cho Observable phát ra một giá trị duy nhất trong phương thức Observable.just()), phương thức callback onCompleted() được gọi.

Qua đó, có thể đưa ra kết luận quan trọng, Observable được xác định hành vi khi đăng ký quan sát.

Trong trường hợp trên, chúng ta không quan tâm đến điều gì sẽ xảy ra sau khi Observable kết thúc, vì vậy, phương thức onCompleted() rỗng. Ngoài ra, cũng không có lỗi được ném ra, vì vậy, phương thức onError() cũng sẽ rỗng.

Tất cả những cài đặt này có vẻ như quá mức cần thiết. Chúng ta có thể đơn giản chỉ cần thiết lập trực tiếp danh sách color cho adapter. Tuy nhiên theo cách này, chúng ta sẽ đi vào tìm hiểu điều thú vị hơn.

Tải dữ liệu không đồng bộ

Lần này, chúng ta để Activity thực hiện tải không đồng bộ danh sách các chương trình truyền hình yêu thích. Tải dữ liệu không đồng bộ có thể nói là việc sử dụng phổ biến nhất của RxJava trong Android. Trước tiên, chúng ta tạo Observable:

Observable<List<String>> tvShowObservable = Observable.fromCallable(new Callable<List<String>>() {

    @Override
    public List<String> call() {
        return mRestClient.getFavoriteTvShows();
    }
});

Trong ví dụ trước, chúng ta sử dụng phương thức Observable.just() để tạo Observable. Các bạn có thể cũng muốn tạo Observable theo cách này như sau Observable.just(mRestClient.getFavoriteTvShows()). Nhưng chúng ta không thể làm vậy, bởi vì mRestClient.getFavoriteTvShows() là một lời gọi blocking network. Nếu chúng ta sử dụng nó với Observable.just(), mRestClient.getFavoriteTvShows() sẽ ngay lập tức block luồng chính (UI Thread).

Phương thức Observable.fromCallable() cung cấp cho chúng ta 2 điều quan trọng:

  1. Các giá trị sẽ không được phát ra cho đến khi có một Observer nào đó đăng ký quan sát.
  2. Phần phát ra các giá trị sẽ được chạy trên một thread khác UI thread.

Bây giờ, chúng ta hãy đăng ký quan sát Observable:

mTvShowSubscription = tvShowObservable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<List<String>>() {

        @Override
        public void onCompleted() { }

        @Override
        public void onError(Throwable e) { }

        @Override
        public void onNext(List<String> tvShows){
            displayTvShows(tvShows);
        }
    });

Hãy cùng phân tích từng lời gọi trên. subcribeOn về cơ bản làm thay đổi Observable chúng ta tạo ở trên. Tất cả code mà Observable thường sẽ chạy, bao gồm cả các code sẽ chạy khi đăng ký, bây giờ sẽ chạy trên một thread khác. Điều này có nghĩa logic bên trong đối tượng Callable, bao gồm cả lời gọi getFavoriteTvShows() sẽ được chạy trong một thread khác. Nhưng đó là thread nào?

Trong trường hợp này chúng ta xác định code sẽ được chạy trong "IO Scheduler" (Schedulers.io()).

Bởi vì Observable được thiết lập để chạy trên IO Scheduler, có nghĩa là nó sẽ tương tác với Observer trên IO Scheduler. Đây là một vấn đề vì điều này đồng nghĩa với việc onNext() cũng sẽ được chạy trên IO Scheduler. Tuy nhiên, code bên trong onNext() có gọi đến việc thiết lập các View. Mà việc thiết lập cho View chỉ có thể được gọi trong UI Thread.

Có một cách đơn giản để giải quyết vấn đề này. Chúng ta nói với RxJava rằng chúng ta muốn quan sát Observable này trên UI Thread, nghĩa là lời gọi onNext() được gọi trên UI Thread. Điều này được cài đặt bằng phương thức observeOn() với đối số là AndroidSchedules.mainThread().

Lời gọi cuối cùng subcirbe là quan trọng, bởi vì code trong Callable sẽ không được chạy cho đến khi Observable được đăng ký quan sát.

Một điều cuối, đối tượng mTvShowSubscription là gì? Khi một Observer đăng ký quan sát một Observable, đối tượng Subscription sẽ được tạo ra. Subscription đại diện cho kết nối giữa ObserverObservable. Một vài trường hợp, chúng ta cần ngắt kết nối này. Đó là khi Destroy Activity:

if (mTvShowSubscription != null && !mTvShowSubscription.isUnsubscribed()) {
    mTvShowSubscription.unsubscribe();
}

Nếu bạn từng làm việc với thread trong Android, bạn sẽ thường gặp một vấn đề lớn: điều gì sẽ xảy ra nếu các thread kết thúc (hoặc không bao giờ kết thúc) sau khi Activity bị huỷ. Điều này có thể gây ra một loạt các vấn đề về rò rỉ bộ nhớ và NullPointerExceptions.

Subcription cho phép chúng ta đối phó với vấn đề này. Giống như chúng ta sẽ nói "Này Observable, Observer này không muốn quan sát thêm nữa, hãy ngắt kết nối". Chúng ta làm được việc này bằng lời gọi unsubscribe().

Tóm tắt lại:

  • Observable.fromCallable() cho phép chúng ta trì hoãn việc tạo ra các giá trị được phát đi bởi Observable. Điều này là tiện dụng khi giá trị mà bạn muốn Observable phát đi cần phải được tạo ngoài UI Thread.
  • subscribeOn() cho phép chúng ta chạy code tạo ra các giá trị phát đi trên một thread đặc biệt mà không phải là UI Thread.
  • observeOn() cho phép chúng ta quan sát các giá trị phát đi từ Observable trên một thread thích hợp, cụ thể đây là UI Thread.
  • Chúng ta nên luôn luôn bỏ đăng ký quan sát (unsubcribe) khi sử dụng Observable để tải dữ liệu không đồng bộ.

Sử dụng Single

Chúng ta tiếp theo tải về một danh sách chương trình yêu thích nhưng theo cách đơn giản hơn. Observable là tuyệt vời, nhưng trong một số trường hợp nó là quá mức cần thiết. Chẳng hạn trong 2 ví dụ trên, chúng ta chỉ phát ra một giá trị duy nhất và không bao giờ sử dụng đến lời gọi onCompleted().

Bởi vậy, có một phiên bản đơn giản hơn của Observable được gọi là Single. Single là việc chính xác như những gì Obsevable làm. Nhưng thay vì có 3 lời gọi onNext(), onError()onCompleted(), nó chỉ có 2 lời gọi là onSuccess()onError().

Quay trở lại ví dụ trên và sử dụng với Single. Đầu tiên, chúng ta tạo đối tượng Single:

Single<List<String>> tvShowSingle = Single.fromCallable(new Callable<List<String>>() {

    @Override
    public List<String> call() throws Exception {
        mRestClient.getFavoriteTvShows();
    }
});

Sau đó, chúng ta đăng ký quan sát nó:

mTvShowSubscription = tvShowSingle
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new SingleSubscriber<List<String>>() {

        @Override
        public void onSuccess(List<String> tvShows) {
            displayTvShows(tvShows);
        }

        @Override
        public void onError(Throwable error) {
            displayErrorMessage();
        }
    });

Thay vì sử dụng Obsever, chúng ta sử dụng class gọi là SingleSubscriber. Nó là hoàn toàn tương tự Observer, ngoại trừ chỉ có 2 phương thức đã nhắc tới ở trên là onSuccess()onError().

Đăng ký quan sát với một Single kết quả cũng tạo ra đối tương Subscription. Đối tượng này hoàn toàn tương tự như ví dụ trên và cũng nên được huỷ đăng ký quan sát khi Destroy Activity.

Kết luận

Trên đây là 3 cách cơ bản hy vọng giúp các bạn hiểu rõ hơn về việc sử dụng RxJava trong Android.