[Java] Tìm hiểu các Observable/Observer của RxJava
Bài đăng này đã không được cập nhật trong 5 năm
Như chúng ta đã biết thì một Observable
sẽ phát ra một sự kiện hoặc dữ liệu nào đó, còn một Observer
sẽ tiếp nhận sự kiện/dữ liệu đó bằng cách đăng ký lắng nghe Observable
. Trong RxJava
có rất nhiều loại Observable
và cũng có nhiều cách để tạo ra một Observable
. Ở bài viết lần này chúng ta sẽ cùng nhau tìm hiểu về các Observable
để nắm rõ được cách tạo và cách sử dụng của chúng cho từng trường hợp khác nhau.
Chúng ta sẽ có 5
loại Observable
sau
- Observable
- Single
- Maybe
- Flowable
- Completable
Tuy nhiên chúng ta chỉ có 4
loại Observer
mà thôi
- Observer
- SingleObserver
- MaybeObserver
- CompletableObserver
Bảng dưới đây sẽ mô tả sự tương ứng giữa Observable
và Observer
cũng như số emissions
của từng loại
Observable | Observer | Nums of emissions |
---|---|---|
Observable | Observer | Multiple or None |
Single | SingleObserver | One |
Maybe | MaybeObserver | One or None |
Flowable | Observer | Multiple or None |
Completable | CompletableObserver | None |
Để đơn giản và dễ hiểu, với mỗi loại Observable
, mình sẽ làm một ví dụ nho nhỏ
Observable & Observer
Observable là một loại được sử dụng nhiều. Nó có thể phát ra một hoặc nhiều items. Ở ví dụ này, chúng ta sẽ có một danh sách, mội item của danh sách sẽ có kiểu String. Trước hết, chúng ta tạo một danh sách String như sau:
List<String> list = new ArrayList<>();
list.add("Ahuhu");
list.add("Ahihi");
list.add("Ahoho");
Tiếp đến chúng ta sẽ tạo một Observable
như sau
Observable<String> observable = Observable.create(emitter -> {
// emit each item
for (String s : list) {
emitter.onNext(s);
}
// all items are emitted
emitter.onComplete();
});
Chúng ta sử dụng hàm onNext()
để phát ra mỗi item, để biết khi nào hoàn thành quá trình emission, chúng ta sẽ dùng hàm onComplete()
. Như vậy bước tiếp theo chúng ta định nghĩa Observer
để handle các item được phát ra.
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
Cuối cùng chúng ta cần đăng ký Observer với Observable là xong
observable.subscribe(observer);
Chúng ta sẽ thu được kết quả như sau:
onSubscribe
onNext: Ahuhu
onNext: Ahihi
onNext: Ahoho
onComplete
Single & SingleObserver
Single
luôn luôn emit một item duy nhất hoặc ném ra một ngoại lệ nào đó. Chúng ta sẽ sử dụng dữ liệu ở phần trên để làm ví dụ. Ở đây, SingleObserver
sẽ không có hàm onNext()
, mà thay vào đó là hàm onSuccess()
, đương nhiên khi đã có onSuccess
và onError
thì hàm onComplete()
sẽ được giản lược
Khởi tạo SingleObservable
String s = "Hello";
Single<String> single = Single.create(emitter -> {
emitter.onSuccess(s);
});
Định nghĩa Observer
SingleObserver<String> singleObserver = new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onSuccess(String s) {
System.out.println("onSuccess: " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}
};
Đăng ký observer với observable
single.subscribe(singleObserver);
Kết quả
onSubscribe
onSuccess: Hello
Maybe & MaybeObserver
Maybe
là loại Observable mà có thể phát 1 item hoặc ko phát item nào cả (có 1 hoặc ko có gì). Loại này chúng ta sẽ sử dụng cho trường hợp giá trị muốn nhận là tùy biến có thể có hoặc ko.
Khởi tạo Observable
Maybe<String> maybe = Maybe.create(emitter -> {
emitter.onSuccess("Hello");
// or emitter.onComplete();
});
Nếu muốn phát ra item, chúng ta sẽ sử dụng onSuccess
, còn nếu ko muốn phát ra item thì chúng ta sẽ sử dụng onComplete
. Đây chính là điểm khác nhau với Single
observable.
Định nghĩa Observer
MaybeObserver<String> maybeObserver = new MaybeObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onSuccess(String s) {
System.out.println("onSuccess: " + s);
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
Đăng ký observer với observable
maybe.subscribe(maybeObserver);
Kết quả
onSubscribe
onSuccess: Hello
Completable & CompletableObserver
Completable là loại Observable sẽ ko phát bất kỳ item nào mà nó chỉ thực thi một nhiệm vụ nào đó và thông báo nhiệm vụ hoàn thành xong chưa mà thôi.
Khởi tạo Observable
Completable completable = Completable.create(emitter -> {
// do something
emitter.onComplete();
});
Định nghĩa Observer
CompletableObserver completableObserver = new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
Đăng ký Observer với Observable
completable.subscribe(completableObserver);
Flowable & Observer
Flowable cũng sử dụng Observer như Observable. Tuy nhiên trên document của RxJava thì Flowable được sử dụng khi số lượng item là 10k+ items mà Observer ko thể handle được hết. Ở ví dụ này, chúng ta sẽ tính tổng từ 1 đến 100, và kết quả sẽ được thông báo cho một SingleObserver.
Khởi tạo Observable
Flowable<Integer> flowable = Flowable.range(1, 100);
Định nghĩa Observer
SingleObserver<Integer> singleObserver = new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onSuccess(Integer integer) {
System.out.println("onSuccess: " + integer);
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}
};
Đăng ký Observer với Observable
flowable.reduce(0, (sum, item) -> sum += item).subscribe(singleObserver);
Kết quả
onSubscribe
onSuccess: 5050
- Chú ý: hàm
reduce()
sẽ xử lý từng item mà flowable phát ra và trả về một giá trị là tổng của tất cả items. Nó tương tự khi ta sử dụng vòng lặp để tính tổng vậy
Tổng kết
Như vậy chúng ta đã duyệt qua được hết 5 loại Observable. Mình hy vọng bài viết phần nào giúp mọi người hiểu và nắm được cách sử dụng cơ bản nhất về Observable trogn RxJava.
- Khởi tạo Observable
- Định nghĩa Observer
- Đăng ký Observer với Observable
Cảm ơn các bạn đã đọc bài viết. Happy coding!
All rights reserved