Observable/ Observer trong RxJava (Rx in Android Part 2)
Chào các bạn. Mình xin tiếp tục với chuỗi bài tìm hiểu Rx trong lập trình Android, cụ thể ở đây là RxJava. Hôm nay mình xin giới thiệu chi tiết hơn về 2 thành phần quan trọng, gần như là cốt lõi trong RxJava đó là Observable và Observer.
1. Observable
Observable trong RxJava là một thành phần quan trọng cho việc xử lý luồng dữ liệu trong phát triển ứng dụng Android. Observable đại diện cho một luồng dữ liệu có thể phát ra các sự kiện hoặc giá trị dữ liệu theo thời gian.
Trong RxJava, bạn có thể tạo một Observable từ các nguồn dữ liệu khác nhau như danh sách, tập hợp, sự kiện giao diện người dùng, kết quả truy vấn cơ sở dữ liệu, gọi API mạng, và nhiều nguồn dữ liệu khác.
Chúng ta sẽ có 5 loại Observable sau:
- Observable
- Single
- Maybe
- Flowable
- Completable
Để tạo Observable trong RxJava, bạn có thể sử dụng các phương thức như:
-
Observable.create(): Tạo một Observable từ mã logic tùy chỉnh. Bạn có thể sử dụng các phương thức của Observer để phát ra các sự kiện hoặc giá trị dữ liệu.
-
Observable.just(): Tạo một Observable từ một hoặc nhiều giá trị cụ thể. Observable này sẽ phát ra các giá trị này và hoàn thành sau đó.
-
Observable.interval(): Tạo một Observable phát ra các số nguyên liên tục sau một khoảng thời gian nhất định.
-
Observable.fromIterable(): Tạo một Observable từ một danh sách, một tập hợp hoặc một iterable.
-
Observable.fromCallable(): Tạo một Observable từ một Callable, nơi bạn có thể thực hiện các tác vụ bất đồng bộ và trả về một giá trị.
Khi bạn đã tạo Observable, bạn có thể sử dụng các toán tử để biến đổi, lọc và xử lý dữ liệu trong Observable theo nhu cầu của bạn. Sau đó, bạn có thể đăng ký (Subscribe) một Observer với Observable để nhận và xử lý các sự kiện và giá trị được phát ra từ Observable.
Các Observable trong RxJava cho phép bạn xử lý dữ liệu một cách linh hoạt, thực hiện các tác vụ bất đồng bộ và tương tác với các thành phần Android khác trong việc phát triển ứng dụng Android.
2. Observer
Observer trong RxJava là một thành phần quan trọng để nhận và xử lý các sự kiện hoặc giá trị từ một Observable trong phát triển ứng dụng Android. Observer đăng ký (Subscribe) với một Observable để nhận thông báo về các sự kiện và giá trị được phát ra từ Observable đó.
Chúng ta sẽ có 5 loại Observer sau:
- Observer
- SingleObserver
- MaybeObserver
- CompletableObserver
Trong RxJava, bạn có thể tạo một Observer bằng cách triển khai đối tượng Observer<T>. Đối tượng này định nghĩa các phương thức mà bạn cần triển khai để xử lý các sự kiện và giá trị từ Observable.
Các phương thức chính trong giao diện Observer bao gồm:
-
onNext(T value): Phương thức này được gọi khi một giá trị mới được phát ra từ Observable. Bạn có thể định nghĩa các hành động xử lý khi nhận được giá trị này.
-
onError(Throwable throwable): Phương thức này được gọi khi có một lỗi xảy ra trong quá trình phát ra giá trị từ Observable. Bạn có thể xử lý và báo cáo lỗi trong phương thức này.
-
onComplete(): Phương thức này được gọi khi Observable hoàn thành việc phát ra các giá trị. Bạn có thể thực hiện các hành động dọn dẹp hoặc xử lý cuối cùng trong phương thức này.
Khi bạn đã triển khai giao diện Observer, bạn có thể đăng ký Observer với một Observable bằng cách sử dụng phương thức subscribe() trên Observable. Khi đăng ký thành công, Observer sẽ nhận các sự kiện và giá trị từ Observable và thực hiện các hành động xử lý tương ứng.
Ví dụ:
val observable: Observable<String> = Observable.just("Android", "RxJava", "RxAndroid");
val observer: Observer<String> = object : Observer<String> {
override fun onSubscribe(d: Disposable) {}
override fun onError(e: Throwable) {
// Handle when an error occurs during value generation
}
override fun onComplete() {
// Handle when Observable finishes emitting value
}
override fun onNext(t: String) {
// Handle value which receive from Observable
}
};
observable.subscribe(observer)
Trên đây là cách sử dụng Observer trong RxJava trong phát triển ứng dụng Android. Observer giúp bạn nhận và xử lý các sự kiện và giá trị từ Observable một cách linh hoạt và dễ dàng.
Sau đây mình sẽ nói về sự kết hợp và lấy ví dụ cho từng loại Observable và Observer với nhau.
3. Các loại và triển khai của Observable/ Observer
Như chúng ta đã đề cập ở trên có 5 loại Observable và 4 loại Observer. Bảng dưới đây sẽ mô tả sự tương ứng giữa Observable và Observer cũng như số emissions của từng loại
Observable | Observer | Nums of emissions |
---|---|---|
Observable | Observer | Multiple or None |
Single | SingleObserver | One |
Maybe | SingleObserver | One or None |
Flowable | Observer | Multiple or None |
Completable | CompletableObserver | None |
3.1. Observable & Observer
Observable là một loại được sử dụng khá phổ biến. Nó có thể phát ra một hoặc nhiều items. Mình sẽ triển khai 1 ví dụ minh hoạ sau:
Đầu tiên, chúng ta sẽ tạo một Observable:
val observableList = arrayListOf("RxJava", "RxAndroid", "Coroutine")
val observable: Observable<String> = Observable.create { emitter ->
// emit each item
for (item in observableList) {
Log.i("PhongPN3", "emitter: $item - ${Thread.currentThread().name}")
emitter.onNext(item)
}
// all items are emitted
emitter.onComplete()
}
Chúng ta sử dụng hàm onNext() để phát ra mỗi item. Khi nào hoàn thành quá trình emission, chúng ta sẽ dùng hàm onComplete(). Bước tiếp theo chúng ta định nghĩa Observer để handle các item được phát ra.
val observer: Observer<String> = object : Observer<String> {
override fun onSubscribe(d: Disposable) {
Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}")
}
override fun onNext(t: String) {
Log.i("PhongPN3", "onNext: $t - ${Thread.currentThread().name}")
}
override fun onError(e: Throwable) {
Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}")
}
override fun onComplete() {
Log.i("PhongPN3", "onComplete - ${Thread.currentThread().name}")
}
}
Cuối cùng là subscribe việc lắng nghe dữ liệu từ 1 Observable.
observable.subscribe(observer)
Kết quả sẽ là:
onSubscribe - main
emitter: RxJava - main
onNext: RxJava - main
emitter: RxAndroid - main
onNext: RxAndroid - main
emitter: Coroutine - main
onNext: Coroutine - main
onComplete - main
3.2. Single & SingleObserver
Single luôn luôn emit một item duy nhất hoặc ném ra một ngoại lệ nào đó.
val s = "RxJava"
val singleObservable: Single<String> = Single.create { emitter ->
emitter.onSuccess(s)
}
SingleObserver cũng sẽ khác với Observer bình thường, cụ thể nó sẽ không có hàm onNext() và onComple(), thay đó sẽ làm hàm onSuccess().
val singleObserver: SingleObserver<String> = object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}")
}
override fun onError(e: Throwable) {
Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}")
}
override fun onSuccess(t: String) {
Log.i("PhongPN3", "onSuccess: $t - ${Thread.currentThread().name}")
}
}
Cuối cùng là subscribe việc lắng nghe dữ liệu từ 1 Observable.
singleObservable.subscribe(singleObserver)
Kết quả sẽ là:
onSubscribe - main
onSuccess: RxJava - main
3.3. Maybe & MaybeObserver
Maybe là loại Observable mà có thể phát 1 item hoặc ko phát item nào cả (có 1 hoặc ko có gì). Với Maybe chúng ta sẽ sử dụng cho trường hợp giá trị muốn nhận là tùy biến có thể có hoặc ko. Ví dụ chúng ta query note by Id trong database nó có thể có hoặc cũng có thể không.
val s = "RxJava"
val maybeObservable = Maybe.create { emitter: MaybeEmitter<String> ->
emitter.onSuccess(s)
}
Nếu muốn phát ra item, chúng ta sẽ sử dụng onSuccess, còn nếu ko muốn phát ra item thì chúng ta sẽ sử dụng onComplete. Đây chính là điểm khác nhau với Single observable.
val maybeObserver: MaybeObserver<String> = object : MaybeObserver<String> {
override fun onSubscribe(d: Disposable) {
Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}")
}
override fun onError(e: Throwable) {
Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}")
}
override fun onSuccess(t: String) {
Log.i("PhongPN3", "onSuccess: $t - ${Thread.currentThread().name}")
}
override fun onComplete() {
Log.i("PhongPN3", "onComplete - ${Thread.currentThread().name}")
}
}
Cuối cùng là subscribe việc lắng nghe dữ liệu từ 1 Observable.
maybeObservable.subscribe(maybeObserver)
Kết quả sẽ là:
onSubscribe - main
onSuccess: RxJava - main
3.4. Completable & CompletableObserver
Completable là loại Observable sẽ ko phát bất kỳ item nào mà nó chỉ thực thi một nhiệm vụ nào đó và thông báo nhiệm vụ hoàn thành hoặc chưa hoàn thành.
Khởi tạo Observable:
val completableObservable = Completable.create { emitter: CompletableEmitter ->
// do something
emitter.onComplete()
}
Định nghĩa Observer:
val completeObserver: CompletableObserver = object : CompletableObserver {
override fun onSubscribe(d: Disposable) {
Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}")
}
override fun onError(e: Throwable) {
Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}")
}
override fun onComplete() {
Log.i("PhongPN3", "onComplete - ${Thread.currentThread().name}")
}
}
Cuối cùng là subscribe việc lắng nghe dữ liệu từ Observable.
completableObservable.subscribe(completeObserver)
Kết quả sẽ là:
onSubscribe - main
onComplete - main
3.5. Flowable & SingleObsever
Được sử dụng khi một Observable tạo ra số lượng lớn các sự kiện / dữ liệu mà Observer có thể xử lý. Flowable có thể được sử dụng khi nguồn tạo ra rất nhiều sự kiện (theo nhiều tài liệu là khoảng 10k+ sự kiện) và Onserver không thể tiêu thụ tất cả. Flowable sử dụng phương pháp Backpressure để xử lý dữ liệu tránh lỗi MissingBackpressureException và OutOfMemoryError.
Ở ví dụ này, chúng ta sẽ tính tổng từ 1 đến 10, và kết quả sẽ được thông báo cho một SingleObserver.
val flowable = Flowable.range(1, 10)
val singleObserver: SingleObserver<Int> = object : SingleObserver<Int> {
override fun onSubscribe(d: Disposable) {
Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}")
}
override fun onError(e: Throwable) {
Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}")
}
override fun onSuccess(t: Int) {
Log.i("PhongPN3", "onSuccess: $t - ${Thread.currentThread().name}")
}
}
flowable.reduce(0) { sum: Int, item: Int ->
sum + item
}.subscribe(singleObserver)
Hàm reduce có tác dụng xử lý từng item mà flowable phát ra và trả về một giá trị là tổng của tất cả items.
Kết quả sẽ là:
onSubscribe - main
onSuccess: 55 - main
Lưu ý : Ở các ví dụ source code tham khảo, mình hay để lại Log để các bạn có thể tiện thử chạy và ra output giống kết quả mà mình trình bày.
Tổng kết
Trên đây là các loại và cách triển khai của các loại Observable và Observer tương ứng. Mình hy vọng bài viết phần nào giúp mọi người hiểu và nắm được cách sử dụng cơ bản nhất về 2 thành phần này RxJava.
Bài viết sắp tới mình sẽ tiếp tục với các Operator trong RxJava. Hẹn mọi người ở bài viết sắp tới.
All Rights Reserved