Some important points should clear when using RxJava 's operator
Bài đăng này đã không được cập nhật trong 6 năm
Context
Reactive Programing đang dần trở thành một xu hướng trong lập trình hiện đại bởi khả năng linh hoạt, hiệu quả mang lại và áp dụng triệt để Functional Programing trong xử lý dữ liệu. Ở Phần 1 và Phần 2, tôi đã giới thiệu về các khái niệm về Reactive Programing cũng như cách làm việc với nó thông qua RxJava
- một implementation của nó. RxJava
là một công cụ tuyệt vời để apply Reactive Programing bằng ngôn ngữ Java với hàng trăm ngàn thứ hay ho giúp cho việc lập trình của bạn trở nên hiệu quả hơn, tuy nhiên để hiểu và áp dụng đúng không phải là điều dễ dàng. Hôm nay tôi sẽ giới thiệu về một số điểm quan trọng và cần thiết khi bạn sử dụng các Operator trong RxJava
.
Flatmap after a Merge ?
Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
Flatmap
sẽ trả về một Observable
mà nó sẽ emit các item dựa trên một Function mà bạn cung cấp cho mỗi item được emit từ Source Observable
và sau nó sẽ gộp thành một stream thống nhất và dĩ nhiên "phẳng".
Đến với một ví dụ:
Observable.just(1,2,3).flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(4,5);
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
Sau khi 1 được emit, thông qua Function được provide bởi flatMap
sẽ emit lần lượt 4,5
. Tương tự như vậy khi emit 2, 3
thì kết quả cuối cùng trả về sẽ là {4,5,4,5,4,5}
Thoạt nhìn không có vấn đề gì xảy ra ở đây. Tuy nhiên, khi chúng ta làm việc với ứng dụng thực tế đòi hỏi việc mix data giữa local và remote, gộp nhiều stream lại với nhau, chúng ta thường quên mất việc flatMap
dễ gây ra một side-effect lớn rằng API sẽ bị gọi rất nhiều lần mỗi lần nhận được một item được emit từ Source Observable
. Ví dụ bên dưới.
Observable.merge(Observable.just("Save Data 1"), Observable.just("Save Data 2"),
Observable.just("Save Data 3")).flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String data) {
return Observable.just("Call API");
}
});
Mục đích của tôi ban đầu là merge 3 stream để save lại dữ liệu dưới local và sau đó mới gọi API. Tuy nhiên, cách implement trên của tôi lại dẫn đến 1 side-effect rằng việc call API sẽ được gọi đến 3 lần.
Trong các dự án thực tế, nhất là với các app đồng bộ, rất khó để nhận ra điều này nhưng nó ảnh hưởng không hề nhỏ.
Chúng ta sẽ sửa lại một chút:
Observable.merge(Observable.just("Save Data 1"), Observable.just("Save Data 2"),
Observable.just("Save Data 3")).last().flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String data) {
return Observable.just("Call API");
}
});
Operator last()
sẽ pending việc emit các items mà đợi đến khi các items đã được emit xong thì sẽ emit một item cuối cùng. Dựa vào việc này, chúng ta sẽ đảm bảo việc API sẽ chỉ được gọi 1 lần duy nhất. Tuy nhiên, cần lưu ý khi sử dụng last()
operator vì nó sẽ throw một Exception
nếu như Source Observable không emit item nào cả.
Tương tự có thể dùng với operator first()
Using merge or concat ?
Merge means Flattens three Observables into a single Observable, without any transformation.
Concat returns an Observable that emits the items emitted by three Observables, one after the other, without interleaving them.
Merge
hỗ trợ việc parallel hóa việc emit item mà không với bất kỳ transformation nào, merge
thì đồng thời. Ngược lại, concat
lại tuần tự hóa, nghĩa là Observable
này completed thì Observable
kia mới bắt đầu việc emit item.
Dựa vào đặc tính này, chúng ta sẽ dễ dàng lựa chọn cho mình operator thích hợp cho các tác vụ khác nhau. Ví dụ:
Khi muốn gọi nhiều API cùng một lúc mà không quan tâm đến dữ liệu trả về, khi nào các API này được gọi xong, ta tiến hành lưu lại gì đó duới local DB, ta sẽ làm như sau:
Observable.merge(Observable.just("Get server time"), Observable.just("Put device token"),
Observable.just("Mark login day"))
.last()
.flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String data) {
return Observable.just("Save flag that determine already using");
}
});
Merge
sẽ tăng tốc độ của tác vụ vì các API được gọi một cách đồng thời, tuy nhiên nó sẽ emit các item lần lượt và dễ gây ra side-effect đã được nêu ở trên.
Concat
lại có một lợi thế riêng bởi tính tuần tự của nó. Ví dụ bên dưới:
Observable.concat(Observable.just("Get Access Token"), Observable.just("Sync Device Token"),
Observable.just("Mark login day"))
.last()
.flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String data) {
return Observable.just("Done");
}
});
Ví dụ trên cần mọi thứ thực hiện tuần tự.
Observable.just("Get Access Token")
sẽ get access token từ server và save vào local DB chẳng hạn. Sau đó việc Sync Device Token sẽ cần access token nên sẽ được thực thi sau khi việc get access token completed, tương tự là việc Mark login day sẽ được thực thi sau cùng trước khi finish công việc.
Other Utils
- Một cách cực kỳ đơn giản khi bạn một switch từ một thread khác sang main thread mà không cần Context hoặc Main Thread Handler.
public void switchMainThread(final Action0 action) {
Observable.empty().subscribeOn(AndroidSchedulers.mainThread()).doOnCompleted(new Action0() {
@Override
public void call() {
action.call();
}
}).subscribe();
}
- Khi bạn muốn DO một action ở background thread
Observable.empty().subscribeOn(Schedulers.io()).doOnCompleted(new Action0() {
@Override
public void call() {
action.call();
}
}).subscribe();
}
Conclusion
Qua baì viết này hy vọng các bạn sẽ nắm được một số điểm cần lưu ý khi làm việc với RxJava và một số lợi ích do nó mang lại. Hẹn gặp lại ở những bài sau.
All rights reserved