Cùng học RxJava, phần 1: Giới thiệu

RxJava là 1 trong những từ khóa hot nhất trong cộng đồng lập trình viên Java/Android thời gian gần đây. RxJava đem lại cho Java/Android Reactive Programming, 1 mẫu hình lập trình tuy mạnh mẽ nhưng lại rất khó tiếp cận đối với những lập trình viên Android vốn đã quen với kiểu lập trình mệnh lệnh (Imperative Programming). Ở bài viết này chúng ta sẽ cùng tìm hiểu lí do chúng ta áp dụng Reactive Programming vào lập trình Android và các bước cơ bản để có thể bắt đầu sử dụng nó.

Why RxJava

Với một nền tảng mà Java vẫn đang thống trị như Android, lập trình đa luồng vốn không phải là 1 công việc dễ dàng. Chúng ta có Thread và Future để thực thi các tác vụ bất đồng bộ đơn giản, nhưng sử dụng chúng để xử lý các tiến trình phức tạp hơn 1 chút thì lại là 1 câu chuyện khác. Một trong những use case phổ biến nhất trong Android là thực thi 1 tác vụ nào đó ở background và sau đó update kết quả lên UI. Ví dụ chúng ta sẽ truyền lên server id của 1 user, server sẽ trả về kết quả dưới dạng JSON, chúng ta convert JSON sang object User và sau đó hiển thị thông tin của User đó lên UI. Với case này thì chúng ta thường sử dụng AsyncTask:

public class RequestUserData extends AsyncTask<String, Void, User> {

    protected User doInBackground(String... strings) {
        try {
            User user = requestAndConvert(strings[0]);
            return user;
        } catch (Exception e) {
        }
    }

    protected void onPostExecute(User user) {
        super.onPostExecute(user);
        updateUserInfo(user);
    }
}

Rất đơn giản đúng không nào? Chờ chút. Nếu chúng ta muốn thông báo cho user biết khi lỗi xảy ra thì phải làm thế nào? Dĩ nhiên chúng ta ko thể làm việc đó trong doInBackground() bởi vì nó ko chạy trên main thread của Android. Nhưng nếu muốn update ở onPostExecute() thì chúng ta lại cần context để có thể update lên UI. Vậy thì ta lại phải truyền reference của Activity vào cho AsyncTask. Nhưng nếu quá trình request này nó mất khoảng vài phút mới xong nhưng user lại xoay màn hình thì sao? Lúc đó Activity sẽ bị destroy và recreate, nhưng AsyncTask lại vẫn giữ reference đến Activity đó và gây ra memory leak.

Vâng, quá nhiều vấn đề cho 1 tác vụ tưởng chừng như rất đơn giản. Hãy cùng tưởng tượng ra 1 tình huống nâng cao hơn: sau khi request thông tin user, tôi muốn request thêm cả các cài đặt và tin nhắn của user đó 1 cách đồng thời, sau đó kết hợp 2 kết quả lại và hiển thị lên màn hình. Chúng ta sẽ cần đến vài AsyncTask để làm công việc đó, và còn cần sử dụng cả cơ chế lock như Semaphore (để chờ cho 2 tiến trình cùng kết thúc). Về cơ bản, bài toán này có thể được giải quyết nhưng nó mất quá nhiều công sức và tệ nhất là chúng ta phải nghĩ đến những vấn đề chẳng hề liên quan đến cái chúng ta muốn làm.

Các vấn đề này có thể được giải quyết bằng việc sử dụng RxJava đúng cách. Qua series này tôi hi vọng chúng ta sẽ có thêm được những kiến thức cần thiết về Reactive Programming cũng như dễ dàng giải quyết được những bài toán tương tự.

Setup

https://github.com/ReactiveX/RxJava

https://github.com/ReactiveX/RxAndroid

Cấu trúc cơ bản

Những thành phần cơ bản nhất tạo nên RP là ObservableSubscriber.

  • Observable phát ra item.
  • Subsriber sẽ sử dụng những item đó.

Một Observable có thể tạo ra 0 hoặc nhiều item, sau đó nó sẽ kết thúc vì đã hoàn thành hoặc do xảy ra lỗi. Với mỗi Subscriber mà nó có, Observable sẽ gọi đến onNext() tương ứng với số item mà nó có, sau đó sẽ gọi đến onCompleted() hoặc onError() dựa vào kết quả của việc kết thúc.

Cách tạo

RxJava cung cấp cho chúng ta 10 hàm dùng để tạo mới 1 Observable. Ở bài viết này tôi xin chỉ đề cập đến những hàm cơ bản, 1 phần là do kiến thức còn hạn chế, phần khác là để tránh việc bài viết trở nên quá dài gây nhàm chán.

  1. Observable.from()

from.c.png

Observable.from(1,2,3).subscribe(new Subscriber<Integer>() {
            public void onCompleted() {

            }

            public void onError(Throwable e) {

            }

            public void onNext(Integer integer) {
               Log.i("onNext", String.valueOf(integer));
            }
        });

Sẽ in ra khi chạy:

I/onNext: 1
I/onNext: 2
I/onNext: 3

Giải thích: Observable.from() nhận vào số parameter không hạn chế và có thể thuộc mọi type (ở đây ta truyền vào 3 số 1, 2, 3). Observable.subscribe() sẽ tạo ra 1 Subscriber với 3 hàm onCompleted(), onError()onNext() để sử dụng các item được truyền vào ở trên.

  1. Observable.just()

just.c.png

Observable.just(1,2,3).subscribe(new Subscriber<Integer>() {
            public void onCompleted() {

            }

            public void onError(Throwable e) {

            }

            public void onNext(Integer integer) {
               Log.i("onNext", String.valueOf(integer));
            }
        });

Đoạn code này sẽ cho ra output tương tự như khi dùng Observable.from(). Vậy thì just() có điểm gì khác so với from()?


Integer[] integers = {1,2,3};

Observable.just(integers).subscribe(new Subscriber<Integer[]>() {
   public void onNext(Integer[] integers) {
       Log.i("onNext", Arrays.toString(integers));
   }
}

Observable.from(integers).subscribe(new Subscriber<Integer>() {
   public void onNext(Integer integer) {
       Log.i("onNext", String.valueOf(integer));
   }
}


just()

I/onNext: [1, 2, 3]

from()

I/onNext: 1
I/onNext: 2
I/onNext: 3

Với just(), khi chúng ta truyền vào 1 array hoặc list item, nó sẽ phát ra array và list item đó và Subscriber cũng sẽ nhận vào parameter là 1 array hoặc list tương ứng. Còn đối với from(), nó sẽ phát ra từng item trong list (sẽ gọi đến onNext() số lần bằng với size của list trong điều kiện ko có lỗi xảy ra).

  1. Observable.defer()

defer.c.png

Cùng xem đoạn code sau:

Movie movie = new Movie("Captain America: Civil War");
Observable<Movie> movieObservable = Observable.just(movie);
movie = new Movie("Batman v Superman: Dawn of Justice");
movieObservable.subscribe(new Subscriber<Movie>() {
      public void onNext(Movie movie) {
          Log.i("onNext", movie.name);
      }
});

Theo bạn thì onNext sẽ in ra kết quả gì? Nếu bạn trả lời là "Batman v Superman: Dawn of Justice" thì bạn đã nhầm rồi, nó phải là "Captain America: Civil War" bởi vì đó là giá trị của movie.name khi just() được gọi đến. just(), from() và các hàm khác sẽ lưu giá trị của item khi Observable được tạo ra chứ ko phải khi nó được subscribe bởi 1 Subscriber. Trong nhiều trường hợp, chúng ta sẽ muốn data của mình là mới nhất ở thời điểm request, vậy nên nếu chúng ta không subscribe ngay khi khởi tạo thì rất có khả năng data sẽ không phải mới nhất.

Đáp án cho bài toán này là sử dụng hàm Observable.defer(). defer() sẽ chỉ khởi tạo Observable khi nó có ít nhất 1 Subscriber, và nó sẽ tạo mới 1 Observable cho mỗi Subscriber mà nó có.


movie = new Movie("Captain America: Civil War");
        Observable<Movie> movieObservable = Observable.defer(new Func0<Observable<Movie>>() {
            public Observable<Movie> call() {
                return Observable.just(movie);
            }
        });
        movie = new Movie("Batman v Superman: Dawn of Justice");
        movieObservable.subscribe(new Subscriber<Movie>() {
            public void onNext(Movie movie) {
                Log.i("onNext", movie.name);
            }
        });
I/onNext: Batman v Superman: Dawn of Justice
  1. Observable.interval()

interval.c.png

interval() sẽ phát ra 1 item thuộc kiểu Long sau mỗi khoảng delay.

Observable.interval(2, TimeUnit.SECONDS).subscribe(new Subscriber<Long>() {
            public void onNext(Long aLong) {
                if(aLong == 5)
                    unsubscribe();
                Log.i("onNext", "" + aLong);
            }
        });

Với đoạn code này, onNext() sẽ được gọi đến sau mỗi 2 giây và sau 6 lần nó sẽ kết thúc (bằng hàm unsubscribe). interval() khá hiệu quả khi chúng ta muốn schedule các lần update data (ví dụ update data mỗi 5 phút).

  1. Observable.create()

create.c.png

Với create(), chúng ta sẽ tạo Observable một cách thủ công. Với đoạn code dưới đây:

 Observable.create(new Observable.OnSubscribe<Integer>() {
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
                subscriber.onCompleted();
            }
        }).subscribe(new Subscriber<Integer>() {
            public void onNext(Integer integer) {
                Log.i("onNext", "" + integer);
        }
});

sẽ cho ra output tương tự như sử dụng Observable.just(1,2,3). create() còn đem lại behavior như defer() - lưu giá trị của item khi đc subscribe chứ ko phải khi khởi tạo. Tuy nhiên trong hầu hết các trường hợp bạn không nên dùng đến hàm này vì nó có 1 số quy tắc mà chúng ta phải tuân thủ như chỉ được gọi Subscriber.onComplete() hoặc Subscriber.onError() duy nhất 1 lần và không được gọi thêm hàm nào sau đó.

Kết luận

Vậy là chúng ta đã học qua 1 số cách cơ bản và phổ biến nhất để tạo ra Observable. (Các bạn có thể tham khảo 5 hàm còn lại tại http://reactivex.io/documentation/operators.html#creating)

Ở phần sau chúng ta sẽ cùng tìm hiểu về concept threading của RxJava.

Cảm ơn các bạn đã theo dõi!