Introduce RxJava and Android

ReactiveX là gì?

ReactiveX API tập trung vào đồng bộ dữ liệu, là kết hợp tốt nhất từ các pattern Observer, Iterator và ngôn ngữ lập trình hàm. Lấy dữ liệu theo thời gian thực là vấn đề thông dụng đòi hỏi giải pháp rạch ròi, tối ưu, và có khả năng mở rộng. Sử dụng Observables và các toán tử, ReactiveX cung cấp một tổ hợp các API linh hoạt để tạo và thao tác trên dòng dữ liệu giải quyết vấn đề đồng bộ.

Giới thiệu RxJava

RxJava là thư viện mã nguồn mở implement ReactiveX trên Java. Có 2 lớp chính là Observable và Subscriber.

  • Observable là một lớp đưa ra dòng dữ liệu hoặc sự kiện (event). Flow của Observable là đưa ra một hoặc nhiều các items, sau đó gọi kết thúc thành công hoặc lỗi.
  • Subscriber lắng nghe flow, thực thi các hành động trên dòng dữ liệu hoặc sự kiện được đưa ra bởi Observable

Một Observable có thể có một hoặc nhiều Subcriber

  • mỗi item được đưa ra bởi Observable, nó sẽ được bắt bởi phương thức Subcriber.onNext().
  • Mỗi Observable kết thúc sẽ gọi phương thức Subscriber.onCompleted() hoặc Subcriber.onError().

Khởi tạo một Observable và đăng kí event từ Observable đó như sau:

Observable integerObservable = Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       subscriber.onNext(1);
       subscriber.onNext(2);
       subscriber.onNext(3);
       subscriber.onCompleted();
   }
});

Observable này đưa ra các message 1,2,3 rồi kết thúc. Tiếp theo chúng ta tạo một Subcriber để nhận lấy dữ liệu từ Observable

Subscriber integerSubscriber = new Subscriber() {
   @Override
   public void onCompleted() {
       System.out.println("Complete!");
   }

   @Override
   public void onError(Throwable e) {

   }

   @Override
   public void onNext(Integer value) {
       System.out.println("onNext: " + value);
   }
};

Đăng kí subcriber với Observable và kiểm tra kết quả của chương trình:

integerObservable.subscribe(integerSubscriber);
// Outputs:
// onNext: 1
// onNext: 2
// onNext: 3
// Complete!

Viết gọn quá trình trên bằng block sau:

Observable.just(1, 2 ,3).subscribe(new Subscriber() {
   @Override
   public void onCompleted() {
       System.out.println("Complete!");
   }

   @Override
   public void onError(Throwable e) {}

   @Override
   public void onNext(Integer value) {
       System.out.println("onNext: " + value);
   }
});

Hàm Obserable.just() tạo một Observable cho phép đưa ra các giá trị 1, 2, 3 và nhận lớp inner subcriber đăng kí.

Operator

Tạo và subcrie một Observable khá đơn giản, tuy nhiên subcriber không cần lắng nghe hết các event của Obervable tạo ra. Ở ví dụ dưới đây, một Observable sẽ sử dụng filter để lọc dữ liệu là số lẻ trả về cho subcriber:

 Observable.just(1, 2, 3, 4, 5, 6) // add more numbers
       .filter(new Func1() {
           @Override
           public Boolean call(Integer value) {
               return value % 2 == 1;
           }
       })
       .subscribe(new Subscriber() {
           @Override
           public void onCompleted() {
               System.out.println("Complete!");
           }

           @Override
           public void onError(Throwable e) {
           }

           @Override
           public void onNext(Integer value) {
               System.out.println("onNext: " + value);
           }
       });
// Outputs:
// onNext: 1
// onNext: 3
// onNext: 5
// Complete!

Hàm filter() sẽ lọc qua dữ liệu mỗi lần Observable bắn data ra, filter sẽ trả về true nếu dữ liệu là số lẻ, khi đó các subcriber sẽ nhận được data này ở onNext().

RxJava còn cung cấp một toán tử khác cho phép custom lại dữ liệu bắn ra từ Observable là map(), xem ví dụ sau:

Observable.just(1, 2, 3, 4, 5, 6) // add more numbers
       .filter(new Func1() {
           @Override
           public Boolean call(Integer value) {
               return value % 2 == 1;
           }
       })
       .map(new Func1() {
           @Override
           public Double call(Integer value) {
               return Math.sqrt(value);
           }
       })
       .subscribe(new Subscriber() { // notice Subscriber type changed to
           @Override
           public void onCompleted() {
               System.out.println("Complete!");
           }

           @Override
           public void onError(Throwable e) {
           }

           @Override
           public void onNext(Double value) {
               System.out.println("onNext: " + value);
           }
       });
// Outputs:
// onNext: 1.0
// onNext: 1.7320508075688772
// onNext: 2.23606797749979
// Complete!

Dữ liệu sau khi filter gồm các số lẻ, được tính ra căn bậc 2 để trả về cho Subcriber. Operator này cho phép trả về các dữ liệu một cách linh hoạt.

** ###Integrating RxJava with Android **

Implement

Để implement RxAndroid, trong maven dependence, ta thêm line sau

compile 'io.reactivex:rxandroid:1.0.1'.

Simple Threading in Android

Một concept thường gặp trên Android là thực hiện dữ liệu ở tầng background, khi task hoàn thành, đưa kết quả hiển thị lên main thread. Với Android có nhiều cách để làm điều này như sử dụng AsynTasks, Loaders, Services, etc. Tuy nhiên những phương pháp này chưa phải tốt nhất, như AsynTasks có thể dễ dẫn đến lỗi Memory Leak, CursorLoader và ContentProvider thì đòi hỏi nhiều setup và config trong code, Services lại đòi hỏi quá nhiều tài nguyên trong background khi nó cứ chạy suốt. Hãy xem cách RxJava có thể giúp giải quyết vấn đề như thé nào.

  • Layout dưới đây gồm 1 button để start, hiển thị trên progress circle, mọi xử lý tính toán được chạy dưới background chứ không phải trên main thread.
 <LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
   xmlns:app="http://schemas.android.com/apk/res-auto"
   android:id="@+id/root_view"
   android:layout_width="match_parent"
   android:layout_height="match_parent"
   android:fitsSystemWindows="true"
   android:orientation="vertical">

   <android.support.v7.widget.Toolbar
       android:id="@+id/toolbar"
       android:layout_width="match_parent"
       android:layout_height="?attr/actionBarSize"
       android:background="?attr/colorPrimary"
       app:popupTheme="@style/AppTheme.PopupOverlay"
       app:theme="@style/ThemeOverlay.AppCompat.Dark.ActionBar" />

   <Button
       android:id="@+id/start_btn"
       android:layout_width="wrap_content"
       android:layout_height="wrap_content"
       android:layout_gravity="center_horizontal"
       android:text="@string/start_operation_text" />

   <ProgressBar
       android:layout_width="wrap_content"
       android:layout_height="wrap_content"
       android:layout_gravity="center_horizontal"
       android:indeterminate="true" />

</LinearLayout>

Và task để tính toán, chạy dưới background:

public String longRunningOperation() {
   try {
       Thread.sleep(2000);
   } catch (InterruptedException e) {
       // error
   }
   return "Complete!";
}

private class SampleAsyncTask extends AsyncTask {

   @Override
   protected String doInBackground(Void... params) {
       return longRunningOperation();
   }

   @Override
   protected void onPostExecute(String result) {
       Snackbar.make(rootView, result, Snackbar.LENGTH_LONG).show();
       startAsyncTaskButton.setEnabled(true);
   }
}

Và giờ convert Asyntask trên sang RxJava

final Observable operationObservable = Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       subscriber.onNext(longRunningOperation());
       subscriber.onCompleted();
   }
});

Tạo Subcriber đăng kí nhận event

startRxOperationButton.setOnClickListener(new View.OnClickListener() {
   @Override
   public void onClick(final View v) {
       v.setEnabled(false);
       operationObservable.subscribe(new Subscriber() {
           @Override
           public void onCompleted() {
               v.setEnabled(true);
           }

           @Override
           public void onError(Throwable e) {}

           @Override
           public void onNext(String value) {
               Snackbar.make(rootView, value, Snackbar.LENGTH_LONG).show();
           }
       });
   }
});

Định nghĩa thread cho Observable

final Observable operationObservable = Observable.create(new Observable.OnSubscribe() {
   @Override
   public void call(Subscriber subscriber) {
       subscriber.onNext(longRunningOperation());
       subscriber.onCompleted();
   }
})
       .subscribeOn(Schedulers.io()) // subscribeOn the I/O thread
       .observeOn(AndroidSchedulers.mainThread()); // observeOn the UI Thread

Done!

RxAndroid là một extension nhẹ cung cấp đồng bộ trên Main Thread , cũng như cho phép đồng bộ trên bất kì Handler nào.

RxJava là giải pháp tốt cho multithread trên Android. Ví dụ về convert một Asyntask sang RxJava có thể tìm thấy ở github repo sau: https://github.com/alex-townsend/GettingStartedRxAndroid

Reference https://www.captechconsulting.com/blogs/getting-started-with-rxjava-and-android