Cùng học RxJava, phần 3: Core Operators

Xin chào các bạn. Ở 2 bài trước chúng ta đã học những khái niệm và concept cơ bản nhất của RxJava, tuy nhiên có thể nhiều bạn sẽ nghĩ rằng nếu RxJava chỉ có thế thì việc thêm nó vào dependencies có vẻ hơi bị "overkill". Ở bài này tôi sẽ bắt đầu chứng minh cho bạn thấy rằng RxJava không chỉ là 1 phiên bản tốt hơn của AsyncTask hay ListenableFuture, sức mạnh thật sự của nó nằm ở 1 tập hợp khổng lồ các toán tử (operators) có thể sẽ thay đổi cách mà bạn đã code từ trước đến giờ.

Operator là gì?

Có thể nhiều bạn không để ý nhưng chúng ta đã học được 1 số Operator từ phần 1 của series, đó là các Operator dùng để tạo ra Observable. Ngoài ra thì chúng ta còn có rất nhiều Operator khác và được ReactiveX cho vào các category như Transformation, Combination, Filtering, Side-effect,... nhằm phục vụ cho các use case khác nhau.

getANumberObservable()
               .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        Log.i("Operator thread", Thread.currentThread().getName());
                        return String.valueOf(integer);
                    }
                })

                .subscribe(new Action1<String>() {
                    @Override
                        public void call(String s) {
                        Log.i("Subscriber thread", Thread.currentThread().getName());
                    }
                });

Trong đoạn code trên thì hàm map chính là 1 Operator. Hầu hết các operator đều hoạt động trên 1 Observable và sẽ trả về 1 Observable khác nên nó rất phù hợp để bạn nối (chain) các Operator với nhau để ra được Observable mong muốn trước khi gửi nó cho Subscriber. Cần lưu ý rằng đây là 1 serial execution; Ví dụ nếu bạn nối 3 operator map với nhau thì nó sẽ đc execute theo trình tự - map thứ 2 sẽ chỉ chạy khi map thứ nhất chạy xong, tương tự với map 3.

Vậy Operator để làm gì?

Câu trả lời là: Operator có thể làm được mọi thứ - tất nhiên là nếu bạn biết cách sử dụng chúng. Vì số lượng Operator là quá lớn nên sẽ là nhiệm vụ bất khả thi để liệt kê ra cách sử dụng của tất cả, do vậy trong bài này tôi sẽ chỉ tập trung giới thiệu 1 số Operator mà tôi coi nó là "core" - phù hợp với những use case phổ biến.

Chúng ta sẽ bắt đầu với việc setup để sử dụng Retrolambda. Do Android chưa support hoàn toàn lambda expression của Java 8 (bạn có thể sử dụng lambda bằng Jack nhưng hiện giờ Jack chưa production-ready) nên Retrolambda là lib không thể thiếu nếu bạn muốn dùng RxJava. OK, tôi đùa thôi, thật ra bạn không bắt buộc phải dùng Retrolambda nhưng như thế sẽ làm RxJava trông rất dài dòng vì nó có khá nhiều boilerplate code. Bạn chưa biết lambda expression là gì? Hãy xem trên wiki, Bạn không biết sử dụng lambda? Đừng lo, lúc đầu bạn cứ code như bình thường, Android Studio sẽ hỗ trợ bạn convert code sang lambda nên dùng nhiều sẽ quen. Ở bài này tôi cũng sẽ chỉ dùng lambda nên cố gắng xem nhé 😃

https://github.com/evant/gradle-retrolambda

Bài toán

Giả sử tôi có interface 2 hàm sau

//Truyền vào 1 chuỗi String và trả về 1 Observable phát ra danh sách phim dưới định dạng JSON
Observable<String> searchMovie(String query);

//Truyền vào 1 chuỗi JSON và trả về 1 List<Movie>
List<Movie> parse(String json);

Yêu cầu: convert chuỗi JSON trả về sang List<Movie> sau đó truyền vào Adapter.

Vậy thì làm sao để áp dụng 2 hàm này với RxJava? Với những gì chúng ta đã học thì ta sẽ làm như sau:

searchMovie("Doctor Strange")
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Action1<String>() {
      @Override
      public void call(String json) {
           list = parse(json);
           adapter.notifyDataSetChanged();
  });

Nhìn cũng ổn đúng không? Nhưng giả sử hàm searchMovie sẽ trả về 1 chuỗi JSON khá lớn và hàm parse của tôi lại rất chậm (do tôi code kém) thì sao? Như đã học ở bài trước thì hàm observeOn sẽ làm cho các hàm trong Subscriber chạy trên thread mà nó định nghĩa. Theo như yêu cầu thì chúng ta phải update adapter sau khi lấy được List<Movie> nên sẽ phải dùng hàm notifyDatasetChanged, nhưng hàm này lại chỉ có thể chạy trên main thread của Android => Nếu hàm parse chạy quá chậm thì nó sẽ block UI cho đến khi xong, có trường hợp còn làm cho app bị ANR => Người dùng sẽ không vui => Người dùng sẽ uninstall app của bạn => Bạn sẽ mất đi hàng trăm triệu đồng tiền thu nhập hàng tháng => Startup của bạn phá sản => Bạn phải trốn nợ và ngủ gầm cầu...

Đừng lo, đã có thuốc viêm họng Đ..nhầm, đã có Operator ở đây để giải cứu bạn. Với trường hợp này, chúng ta có thể sử dụng toán tử map để convert JSON sang List<Movie> như sau:

searchMovie("Doctor Strange")
  .map(new Func1<String, List<Movie>() {
        @Override
        public List<Movie> call(String s) {
            return parse(s);
        }
   })
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Action1<List<Movie>>() {
      @Override
      public void call(List<Movie> movies) {
           list.clear();
           list.addAll(movies);
           adapter.notifyDataSetChanged();
  });

Nhìn dài dòng thế thôi chứ với lambda thì (ngon)

searchMovie("Doctor Strange")
  .map(json -> parse(json))
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(movies -> {
      list.clear();
      list.addAll(movies);
      adapter.notifyDataSetChanged();
  });

Về cơ bản thì map là 1 hàm thuộc nhánh Transformation, nó có tác dụng biến đổi data phát ra từ Observable. Điểm tuyệt vời nhất của map là data trả về từ nó không nhất thiết phải cùng type với data nhận vào: Trong case trên, Observable gốc sẽ phát ra chuỗi JSON thuộc type String và hàm map sẽ sử dụng hàm parse để convert String sang List<Movie> => Subscriber sẽ nhận được List<Movie> chứ không phải String. Với vấn đề ở trên ta nói đến là hàm parse có thể sẽ mất nhiều thời gian để chạy xong thì câu trả lời cũng đã có từ bài trước: Nó sẽ chạy trên thread định nghĩa bởi hàm subscribeOn. Bạn có thể nối nhiều hàm map lại với nhau để ra được kết quả như ý trước khi mang đến cho Subscriber.

searchMovie("Doctor Strange")
  .map(json -> parse(json))
  .map(movies -> unparse(movies)) //parse ngược lại vì tôi thích thế
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(json -> // ?? wheres my List<Movie>);

chucknorrisapproves.gif

Đến đây lại phát sinh ra thêm 1 vấn đề khác: parse có thể throw JSONException. Đáng tiếc là map lại không giỏi trong việc handle exception lắm và nó bắt buộc chúng ta phải return 1 thứ gì đó vì nó có return type. Nếu tôi mới học thì có thể tôi sẽ làm như thế này:

searchMovie("Doctor Strange")
  .map(json -> {
    try {
      return parse(json);
    } catch(JSONException e) {
      return null;
    }
  })
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(movies -> {
     if(movies != null) {
        list.clear();
        list.addAll(movies);
        adapter.notifyDataSetChanged();
     }
  });

OK, trước khi vào bài này tôi muốn thuyết phục các bạn dùng RxJava thay cho AsyncTask mà giờ nhìn nó chả khác gì AsyncTask cả. Và nếu chúng ta muốn handle 1 case riêng khi xảy ra lỗi trong onError của Subscriber thì cách này cũng không được. Chúng ta có thể tip trick như sau:

searchMovie("Doctor Strange")
  .map(json -> {
    try {
      return parse(json);
    } catch(JSONException e) {
      throw new RuntimeException();
    }
  })
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<List<Movie>>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        showError();
                    }

                    @Override
                    public void onNext(List<Movie> movies) {
                        list.clear();
                        list.addAll(movies);
                        adapter.notifyDataSetChanged();
                    }
  });

RxJava sẽ chỉ gửi unchecked exception tới onError vì vậy chúng ta không thể throw JSONException (checked exception) trong catch (và cũng không được phép bởi vì vẫn cần return). Với cách này thì chúng ta có thể handle đc case lỗi trong onError tuy nhiên nó sẽ làm cho code rất khó để debug (nếu chúng ta có nhiều map và đều throw RuntimeException thì sẽ không thể biết code lỗi ở đâu).

Bad smell with text.jpg

Introducing flatMap

flatMap hoàn toàn có thể được dùng để thay thế map. Điểm khác biệt giữa 2 hàm này là:

  • map trả về 1 Object thuộc type T

  • flatMap trả về Observable<T>

Ví dụ với đoạn code dùng map có thể được viết lại như sau:

searchMovie("Doctor Strange")
  .flatMap(json -> {
     try {
      return Observable.defer(() -> Observable.just(parse(json)));
    } catch(JSONException e) {
      return Observable.error(e);
    }
  })
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<List<Movie>>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        showError();
                    }

                    @Override
                    public void onNext(List<Movie> movies) {
                        list.clear();
                        list.addAll(movies);
                        adapter.notifyDataSetChanged();
                    }
  });

Trong trường hợp xảy ra lỗi, chúng ta chỉ đơn giản là gọi hàm error và truyền vào lỗi. Hàm này sẽ gửi lỗi đến onError của Subscriber và chúng ta sẽ dễ dàng xử lý ở đó.

obamacodereview.jpg

Việc có return type là Observable<T> làm cho flatMap linh hoạt hơn rất nhiều so với map. flatMap có thể thay đổi số lượng item được phát ra. Khi bạn truyền 1 item vào map, nó sẽ trả về 1 item có cùng hoặc khác type. Đối với flatMap thì bạn có thể dùng nó để không phát ra item nào, phát ra chính xác 1 item, phát ra nhiều item hay phát ra 1 lỗi.

Đến đây thì bài cũng khá dài rồi nhưng khách hàng lại vừa gọi điện cho tôi và họ yêu cầu app phải có thêm 1 chức năng mới, đó là hàm searchMovie chỉ trả về những phim có rating lớn hơn 5.0. Để làm được điều này thực chất thì khá đơn giản, nhưng tôi đã nói với họ đây là 1 chức năng khó và yêu cầu thêm tiền + estimate để trang trải chi phí nhân lực.

Introducing filter

Có lẽ bạn khi đọc tên của Operator này cũng hiểu nó có tác dụng gì rồi đúng không ạ? Vâng, nó dùng để lọc các item phát ra bởi 1 Observable dựa trên những điều kiện mà chúng ta định nghĩa. Tuy nhiên với đoạn code mà chúng ta đang có thì sau khi flatMap được gọi, nó sẽ trả về 1 Observable phát ra 1 List<Movie>. Như đã nói ở trên thì Operator sẽ hoạt động trên Observable trả về từ Operator đứng trước nó, vậy nên nếu không có gì thay đổi thì chúng ta sẽ truyền vào hàm filter 1 List<Movie>. Điều này là không ổn bởi vì chúng ta muốn lọc từng Movie nằm trong List chứ không phải là cả List.

Ở đây tôi sẽ chứng minh tại sao flatMap lại là 1 trong những Operator linh hoạt và được sử dụng nhiều nhất trong thế giới Rx:

searchMovie("Doctor Strange")
  .flatMap(json -> {
     try {
      return Observable.defer(() -> Observable.just(parse(json)));
    } catch(JSONException e) {
      return Observable.error(e);
    }
  })
  .flatMap(movies -> Observable.from(movies))
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<Movie>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        showError();
                    }

                    @Override
                    public void onNext(Movie movie) {
                    }
  });

Tôi đã nối 1 flatMap khác vào flatMap mà chúng ta đang có*. Như đã nói ở phần 1 thì hàm from khi nhận vào 1 List hay 1 Array thì nó sẽ phát ra từng item trong List/Array đó. Điểm mấu chốt ở đây là bây giờ Subscriber sẽ nhận vào những Movie riêng lẻ chứ không phải là List<Movie> nữa (onNext sẽ được gọi với số lần bằng với số item trong List/Array). Rất tuyệt đúng không? Với flatMap chúng ta có thể nhận vào 1 item (List<Movie>) và trả ra nhiều item (nhiều Movie).

*Thực chất thì bạn không cần nối 2 flatMap lại như trên. Bạn có thể dùng from thay cho just ngay từ flatMap đầu tiên cũng sẽ cho ra output tương tự. Mục đích ở đây là tôi muốn demonstrate rằng bạn có thể return các Operator trực tiếp từ trong flatMap nên nó rất tiện trong việc setup các logic phức tạp. Giới hạn duy nhất là ở bản thân chúng ta mà thôi 😄

Giờ thì chúng ta đã có đủ điều kiện để sử dụng hàm filter:

searchMovie("Doctor Strange")
  .flatMap(json -> {
     try {
      return Observable.defer(() -> Observable.just(parse(json)));
    } catch(JSONException e) {
      return Observable.error(e);
    }
  })
  .flatMap(movies -> Observable.from(movies))
  .filter(movie -> movie.rating > 5.0)
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<Movie>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        showError();
                    }

                    @Override
                    public void onNext(Movie movie) {
                    }
  });

Ở đây thì filter sẽ nhận vào từng Movie và dựa trên rule chúng ta định nghĩa (movie.rating > 5.0) nó sẽ trả về 1 boolean là true hoặc false. Nếu là true, Movie đó sẽ được đi tiếp xuống dưới đến tay Subscriber. Nếu false, Movie đó sẽ đi vào thùng rác.

Như vậy là chúng ta đã hoàn thành được requirement mới của khách hàng :^).

5YovKN3.gif

Chờ đã, hình như có gì đó không ổn. Subscriber của chúng ta hiện giờ vẫn đang nhận vào từng Movie riêng lẻ chứ không phải là 1 List<Movie> để chúng ta có thể gọi notify của Adapter.

Introducing toList

searchMovie("Doctor Strange")
  .flatMap(json -> {
     try {
      return Observable.defer(() -> Observable.just(parse(json)));
    } catch(JSONException e) {
      return Observable.error(e);
    }
  })
  .flatMap(movies -> Observable.from(movies))
  .filter(movie -> movie.rating > 5.0)
  .toList()
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<List<Movie>>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {
                        showError();
                    }

                    @Override
                    public void onNext(List<Movie> movies) {
                        list.clear();
                        list.addAll(movies);
                        adapter.notifyDataSetChanged();
                    }
  });

Bởi vì operator trong RxJava là serial execution, sau khi filter chạy xong chúng ta sẽ có rất nhiều Movie riêng lẻ. Ở đây hàm toList sẽ đơn giản là lấy tất cả chúng và cho vào trong 1 List. Rất tiện đúng không?

wow.gif

Kết

RxJava còn có rất nhiều Operator khác mà tôi đảm bảo rằng nó đáp ứng 96% nhu cầu của bạn. ReactiveX cung cấp cho chúng ta decision tree để có thể chọn đúng Operator mà bạn cần, xem ở đây http://reactivex.io/documentation/operators.html .

Có thể đọc xong bài này bạn vẫn chưa hoàn toàn bị thuyết phục và có thể còn cảm thấy khó khăn nhưng một khi đã vượt qua được giai đoạn này và bắt đầu hiểu hơn 1 chút về concept của RxJava, mọi thứ sẽ trở nên rất tự nhiên và bạn sẽ code nhanh và ít lỗi hơn rất nhiều. Bạn chỉ cần nhớ là "Practice makes perfect".

Ở phần sau, chúng ta sẽ học thêm về side-effect và error handling với RxJava. Cảm ơn các bạn đã theo dõi.