Cùng học RxJava, phần 2: Threading concept

Chào mừng các bạn đã trở lại với series Cùng học RxJava. Ở bài trước chúng ta đã tìm hiểu về 1 số cách cơ bản để tạo ra Observable - 1 trong những thành phần chủ yếu cấu tạo nên RxJava nói chung và thư viện ReactiveX nói riêng. Tuy nhiên để hiểu được cách áp dụng những Observable này vào thực tế thì chúng ta cần phải tìm hiểu về threading concept của ReactiveX trước, và đó là khái niệm mà chúng ta sẽ học ở bài post này.

A free-threaded model

Trái với những gì mà mọi người thường nghĩ khi nhắc đến ReactiveX, ở trạng thái mặc định nó hoàn toàn không chạy đa luồng.

Cấu trúc của Rx bao gồm: 1 Observable theo sau bởi 0 hay nhiều Operator theo sau bởi 1 Subscriber.

*Operator là 1 tập hợp những hàm có thể dùng giữa Observable gốc và Subscriber với nhiều chức năng như tính toán, lọc hay biến đổi dữ liệu. Chúng ta sẽ học thêm về Operator ở những bài post sau.

Với cấu trúc này thì concept threading mặc định là:

  • Các phần tính toán để tạo nên Observable gốc được chạy trên thread mà hàm subscribe() được gọi đến.
  • Các phần tính toán của 1 Operator được chạy trên thread mà Operator ở trước nó được chạy. Nếu trước nó không có 1 Operator nào khác, nó sẽ được chạy trên thread tạo ra Observable gốc.
  • Các phần tính toán của 1 Subscriber được chạy trên thread mà Operator ở trước nó được chạy. Nếu trước nó không có 1 Operator nào khác, nó sẽ được chạy trên thread tạo ra Observable gốc.

Code sample:

private Observable<Integer> getANumberObservable() {
        return Observable.defer(new Func0<Observable<Integer>>() {
            @Override public Observable<Integer> call() {
                Log.i("Observable thread", Thread.currentThread().getName());
                return Observable.just(1);
            }
        });
}

//Run this inside onCreate() of an Activity
getANumberObservable()
               .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        Log.i("Operator thread", Thread.currentThread().getName());
                        return String.valueOf(integer);
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                        public void call(String s) {
                        Log.i("Subscriber thread", Thread.currentThread().getName());
                    }
                });

Sẽ log ra output:

Observable thread: main
Operator thread: main
Subscriber thread: main

Giải thích:

  • getANumberObservable() là hàm để tạo ra Observable gốc và được chạy trên main thread vì hàm subscribe() được gọi trên main thread..
  • map() là 1 trong các Operator và cũng được chạy trên main thread bởi vì trước nó không có 1 Operator nào khác nên mặc định nó sẽ chạy trên thread đã tạo ra Observable.
  • Hàm call() (hay tất cả các hàm nằm trong subscribe() như onNext(), onCompleted(), onError()) là hàm tính toán của Subscriber, được chạy trên main thread bởi vì Operator trước nó là map() cũng chạy trên main thread.

Dễ thấy rằng mặc định Rx chỉ chạy trên thread mà hàm subscribe() được gọi đến. Tuy có bản chất là mô hình luồng tự do nhưng không có nghĩa rằng Rx sẽ tự động sử dụng đa luồng cho bạn, nó chỉ mang ý nghĩa là bạn có thể chọn bất cứ thread nào để thực thi công việc trên đó.

Nói như vậy cũng không có nghĩa rằng bạn không thể lập trình đa luồng với Rx. Rx cung cấp cho chúng ta 1 cơ chế xử lý đa luồng rất tiện dụng và hữu ích, đó chính là Scheduling.

Scheduler

Về cơ bản thì 1 Scheduler sẽ định nghĩa ra thread để chạy 1 khối lượng công việc. RxJava cung cấp những lựa chọn Scheduler như sau:

  1. immediate(): Tạo ra và trả về 1 Scheduler để thực thi công việc trên thread hiện tại.
  2. trampoline(): Tạo ra và trả về 1 Scheduler để sắp xếp 1 hàng chờ cho công việc trên thread hiện tại để thực thi khi công việc hiện tại kết thúc.
  3. newThread(): Tạo ra và trả về 1 Scheduler để tạo ra 1 thread mới cho mỗi đơn vị công việc.
  4. computation(): Tạo ra và trả về 1 Scheduler với mục đích xử lý các công việc tính toán, được hỗ trợ bởi 1 thread pool giới hạn với size bằng với số CPU hiện có.
  5. io(): Tạo ra và trả về 1 Scheduler với mục đích xử lý các công việc không mang nặng tính chất tính toán, được hỗ trợ bởi 1 thread pool không giới hạn có thể mở rộng khi cần. Có thể được dùng để thực thi các tiến trình bất đồng bộ không gây ảnh hưởng lớn tới CPU.

OK, đọc qua chắc các bạn cũng có thể hiểu được 1 chút về việc nên dùng Scheduler nào cho công việc nào. Đến đây chúng ta sẽ làm quen với 2 hàm mới để dùng các Scheduler trên, đó là subscribeOn()observeOn(). Đây là 2 khái niệm thường bị nhầm lẫn rất nhiều bởi các lập trình viên mới làm quen với Rx, nên sau đây tôi sẽ giải thích cụ thể cách sử dụng cũng như sự khác nhau giữa 2 hàm này.

subscribeOn()

Hàm subscribeOn() nhận vào tham số là 1 Scheduler, sẽ quyết định việc xử lý các phần tính toán để tạo nên 1 Observable trên thread cung cấp bởi Scheduler đó.

getANumberObservable()
               .subscribeOn(Schedulers.newThread())
               .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        Log.i("Operator thread", Thread.currentThread().getName());
                        return String.valueOf(integer);
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                        public void call(String s) {
                        Log.i("Subscriber thread", Thread.currentThread().getName());
                    }
                });
Observable thread: RxNewThreadScheduler-1
Operator thread: RxNewThreadScheduler-1
Subscriber thread: RxNewThreadScheduler-1

Với đoạn code trên, hàm gọi subscribeOn(Schedulers.newThread()) sẽ làm cho tất cả các operation được thực thi trên 1 thread mới. Như đã nhắc đến ở trên về biểu hiện luồng mặc định thì các phần tính toán của Operator sẽ chạy trên thread mà Observable được tạo và phần code của Subscriber được chạy trên thread của Operator.

Vị trí gọi subscribeOn() không quan trọng

Bạn có thể gọi hàm này ở bất cứ chỗ nào giữa Observable và Subscriber bởi vì nó chỉ có tác dụng khi hàm subscribe() được gọi đến. Đoạn code dưới đây...

getANumberObservable()
               .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        Log.i("Operator thread", Thread.currentThread().getName());
                        return String.valueOf(integer);
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .subscribe(new Action1<String>() {
                    @Override
                        public void call(String s) {
                        Log.i("Subscriber thread", Thread.currentThread().getName());
                    }
                });

...cũng sẽ cho ra output như ở trên, mặc dù vị trí gọi subscribeOn() đã thay đổi.

Tương tác giữa các hàm khởi tạo Observable đối với subscribeOn()

Bạn cần lưu ý điều này khi sử dụng các hàm như Observable.just(), Observable.from() hay Observable.range(): Những hàm này sẽ nhận vào giá trị ngay khi chúng được khởi tạo nên subscribeOn() sẽ không có tác dụng; Nguyên nhân là do subscribeOn() chỉ có tác dụng khi hàm subscribe() được gọi đến, mà những hàm khởi tạo nói trên lại khởi tạo Observable trước khi gọi subscriber() nên các bạn cần tránh đưa vào các giá trị mà cần tính toán trong 1 khoảng thời gian dài (blocking) vào các hàm khởi tạo đó.

Observable.just(someLongBlockingOperation()) // never do this
                .subscribeOn(Schedulers.newThread()) // wont have any effect on the previous call
                .subscribe(new Action1<String>() {
                    @Override
                        public void call(String s) {
                        Log.i("Subscriber thread", Thread.currentThread().getName());
                    }
                });

Thay vào đó đối với các hàm blocking thì bạn có thể sử dụng Observable.create() hoặc Observable.defer(). 2 hàm này về cơ bản sẽ đảm bảo là Observable sẽ chỉ được khởi tạo khi hàm subscribe() được gọi đến.

Observable.defer(new Func0<Observable<String>>() {
            @Override public Observable<String> call() {
                return Observable.just(someLongBlockingOperation()); // this will run on a new thread
            }
         })
         .subscribeOn(Schedulers.newThread())
         .subscribe(new Action1<String>() {
                    @Override
                        public void call(String s) {
                        Log.i("Subscriber thread", Thread.currentThread().getName());
                    }
         });

Gọi nhiều subscribeOn()

Nếu bạn gọi nhiều lần hàm subscribeOn() với các Scheduler khác nhau thì cũng chỉ có hàm gọi đầu tiên từ trên xuống là có tác dụng thôi.

getANumberObservable()
               .map(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        Log.i("Operator thread", Thread.currentThread().getName());
                        return String.valueOf(integer);
                    }
                })
                .subscribeOn(Schedulers.io()) // only this takes effect
                .subscribeOn(Schedulers.newThread()) // this wont produce any effect
                .subscribe(new Action1<String>() {
                    @Override
                        public void call(String s) {
                        Log.i("Subscriber thread", Thread.currentThread().getName());
                    }
                });

observeOn()

Hàm observeOn() nhận vào tham số là 1 Scheduler sẽ làm cho các Operator hay Subscriber được gọi đằng sau nó chạy trên thread được cung cấp bởi Scheduler đó.

getANumberObservable() //this will run on main thread
                .observeOn(Schedulers.io())
                .map(new Func1<Integer, String>() { // this will run on a thread intended for I/O bound
                    @Override
                    public String call(Integer integer) {
                        Log.i("Operator thread", Thread.currentThread().getName());
                        return String.valueOf(integer);
                    }
                })
                .subscribe(new Action1<String>() {  // this will run on a thread intended for I/O bound
                    @Override
                        public void call(String s) {
                        Log.i("Subscriber thread", Thread.currentThread().getName());
                    }
                });
Observable thread: main
Operator thread: RxIoScheduler-2 // io
Subscriber thread: RxIoScheduler-2 // io

Sau khi observeOn(Schedulers.io() được gọi đến, tất cả các hàm đằng sau nó đều sẽ chạy trên thread cung cấp bởi Schedulers.io().

Gọi nhiều observeOn()

getANumberObservable() //this will run on main thread
                .observeOn(Schedulers.newThread()) // Changing the thread
                .map(new Func1<Integer, String>() { // this will run on new thread
                    @Override
                    public String call(Integer integer) {
                        Log.i("Operator thread", Thread.currentThread().getName());
                        return String.valueOf(integer);
                    }
                })
                .observeOn(Schedulers.newThread()) // Changing the thread
                .subscribe(new Action1<String>() {  // this will run on a new thread different from the previous one
                    @Override
                        public void call(String s) {
                        Log.i("Subscriber thread", Thread.currentThread().getName());
                    }
                });
Observable thread: main
Operator thread: RxNewThreadScheduler-2
Subscriber thread: RxNewThreadScheduler-1

Khi chúng ta có nhiều hàm gọi đến observeOn(), các Scheduler truyền vào sẽ có tác dụng giữa hàm gọi này cho đến hàm gọi tiếp theo. Nói cách khác, observeOn() sẽ thay đổi thread của tất cả các hàm chạy đằng sau nó cho đến khi có 1 hàm observeOn() khác được gọi đến. Trong trường hợp chúng ta truyền vào cùng 1 Scheduler vào nhiều hàm observeOn(), 1 thread mới cũng sẽ được tạo ra chứ ko phải dùng lại thread cũ.

observeOn() rất hữu ích khi chúng ta muốn thực thi các tác vụ trên thread mà chúng ta mong muốn. 1 trong những tác vụ cơ bản trong lập trình mobile chính là việc execute task dưới background thread (như là network request) sau đó update kết quả lên UI. Đối với Android, để đạt được điều này chúng ta cần phải sử dụng 2 thread khác nhau, 1 thread dùng để request và main thread để update UI. Tuy nhiên RxJava không cung cấp cho chúng ta 1 Scheduler nào để sắp xếp công việc trên main thread của Android. Vậy thì khi muốn update UI chúng ta phải làm thế nào?

RxAndroid

Có lẽ nhiều bạn sẽ nghĩ rằng RxAndroid là 1 phiên bản Rx dành cho Android. Điều này cũng có phần đúng và có phần sai. Chính xác hơn thì RxAndroid là 1 phiên bản mở rộng của RxJava, nó không thể hoạt động độc lập mà không có RxJava. RxAndroid cung cấp cho RxJava thêm 1 Scheduler là AndroidSchedulers. Với hàm mainThread() từ Scheduler này, chúng ta có thể thực thi các tác vụ liên quan đến UI vì nó sẽ đảm bảo là code được execute trên main thread của Android.

Kết

Ở bài viết này chúng ta đã học thêm được về concept multi threading trong RxJava, tuy nhiên để sử dụng đúng nó đối với Android thì bạn cần chú ý những điểm sau:

  • Chọn đúng Schedulers ở subscribeOn(): Hàm này sẽ có ảnh hưởng đến việc tạo Observable gốc nên bạn cần chọn đúng dựa theo use case. Trong hầu hết các trường hợp như network request hay các công việc liên quan đến database thì chúng ta sử dụng Schedulers.io()
  • Sử dụng observeOn() đúng cách: 1 trong những điểm mạnh của hàm này là nó cho phép chúng ta thực thi tác vụ trên nhiều thread khác nhau. Tuy vậy thì đối với use case phổ biến nhất trong lập trình Android, đó là update kết quả lên UI ở trong hàm subscribe() thì bạn cần phải sử dụng RxAndroid và đảm bảo là hàm observeOn(AndroidSchedulers.mainThread()) là hàm gọi cuối cùng trước khi gọi đến subscribe()
fetchUserObservable() // This will run on thread intended for i/o
                .subscribeOn(Schedulers.io())
                .map(new Func1<String, User>() { // This will run on thread intended for i/o
                    @Override
                    public User call(String json) {
                        return parseUser(json);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread()) // Changing the current thread to main thread
                .subscribe(new Subscriber<User>() { // This will run on main thread
                            @Override
                            public void onCompleted() {
                            }

                            @Override
                            public void onError(Throwable e) {
                            }

                            @Override
                            public void onNext(User user) {
                                updateUI(user);
                            }
                        })

  • Không nhầm lẫn giữa subscribeOn()observeOn(): subscribeOn() có tác dụng với Observable. observeOn() có tác dụng với các Operator và Subscriber được gọi sau nó.

Nội dung thảo luận hôm nay đến đây là hết. Hi vọng qua bài viết các bạn sẽ hiểu thêm về concept threading của RxJava và sẽ có thể áp dụng vào trong bài học lần sau.

Cảm ơn các bạn đã theo dõi!