Zip() và Merge() trong RxJava
Bài đăng này đã không được cập nhật trong 3 năm
1. Zip()
Hàm zip()
trong RxJava
giúp bạn thực hiện đồng thời nhiều Observable
và gộp các kết quả của các Observable
lại cùng trong 1 kết quả trả về.
Trong RxJava cung cấp cho bạn 3 lựa chọn để thực hiện phương thức zip
<R>
Observable<R>
zip(Iterable<? extends Observable<?>> ws, FuncN<? extendsR
> zipFunction)<R>
Observable<R>
zip(Observable<? extends *Observable*<?>> ws, final FuncN<? extendsR
> zipFunction)- <
T1
,T2
,R
> Observable<R>
zip(Observable<? extendsT1
> o1, Observable<? extendsT2
> o2, final Func2<? superT1
, ? superT2
, ? extendsR
> zipFunction) "hàm zip() này cho phép bạn zip tối đa 9Observable
lại với nhau"
2. Merge()
Hàm merge()
trong RxJava
giúp bạn thực hiện đồng thời nhiều Observable
và trả về riêng lẻ các kết quả của Observable
sau khi thực hiện xong Observable
đó.
Trong RxJava cung cấp cho bạn rất nhiều nhưng ở đây mình sẽ giới thiệu với các bạn 1 số hàm merge() cơ bản và hay dùng như sau
<T>
Observable<T>
merge(Observable<? extends Observable<? extendsT
>> source)<T>
Observable<T>
merge(Observable<? extends Observable<? extendsT
>> source, int maxConcurrent) * tham số maxConcurrent cho biết số lượngObservable
cùng được thực hiện đồng thời (chạy song song)*<T>
Observable<T>
merge(Observable<? extendsT
> t1, Observable<? extendsT
> t2) "hàm merge() này cho phép bạn merge tối đa 9Observable
lại với nhau"
Ở trên mình đã giới thiệu qua về 2 hàm zip(
) và merge()
trong RxJava, dưới đây mình sẽ làm 1 ví dụ nhỏ để bạn hiểu rõ hơn về cơ chế hoạt động và sự khác nhau cơ bản giữa 2 hàm này.
3.Example
Đầu tiên trong file activity_main.xml
cá bạn khai báo 1 số View đơn giản như sau:
<LinearLayout
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical"
>
<Button
android:id="@+id/btn_zip"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_gravity="center"
android:text="zip()"
/>
<TextView
android:id="@+id/txt_zip"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_gravity="center"
/>
<Button
android:id="@+id/btn_zip_list"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_gravity="center"
android:text="zipList()"
/>
<TextView
android:id="@+id/txt_zip_list"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_gravity="center"
/>
<Button
android:id="@+id/btn_merge"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_gravity="center"
android:text="merge()"
/>
<TextView
android:id="@+id/txt_merge"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_gravity="center"
/>
<Button
android:id="@+id/btn_merge_list"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_gravity="center"
android:text="mergeList()"
/>
<TextView
android:id="@+id/txt_merge_list"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_gravity="center"
/>
</LinearLayout>
Trong onCreate()
các bạn định nghĩa các TextView
và Button
và set sự kiện onClick
cho các Button
@Override
public void onClick(View v) {
switch (v.getId()) {
case R.id.btn_zip:
zip();
break;
case R.id.btn_merge:
merge();
break;
case R.id.btn_zip_list:
zipList();
break;
case R.id.btn_merge_list:
mergeList();
break;
}
}
private void zip(){}
private void zipList(){}
private void merge(){}
private void mergeList(){}
Trong file MainActivity.java
các bạn khai báo các hàm định nghĩa Observable
private Observable<Integer> createObservable(int data) {
return Observable.just(data);
}
private List<Observable<?>> createListObservable() {
List<Observable<?>> result = new ArrayList<>();
result.add(createObservable(1));
result.add(createObservable(2));
result.add(createObservable(3));
return result;
}
Rồi tất cả các chuẩn bị đã xong bây giờ chúng ta sẽ bắt đầu xây dựng lần lượt hàm chức năng
zip()
private void zip() {
Observable.zip(createObservable(1), createObservable(2), createObservable(3),
new Func3<Integer, Integer, Integer, DataZip>() {
@Override
public DataZip call(Integer integer, Integer integer2, Integer integer3) {
return new DataZip(integer, integer2, integer3);
}
}).subscribe(new Subscriber<DataZip>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(DataZip zip) {
txtZip.setText(zip.numberOne + " " + zip.numberTwo + " " + zip.numberThree);
}
});
}
private void zipList(){}
private void merge(){}
private void mergeList(){}
class DataZip {
int numberOne;
int numberTwo;
int numberThree;
DataZip(int numberOne, int numberTwo, int numberThree) {
this.numberOne = numberOne;
this.numberTwo = numberTwo;
this.numberThree = numberThree;
}
}
Như ở đây các bạn đã thấy mình thực hiện zip()
3 Observable<Integer>
và zip 3 kết quả sau khi thực hiện xong vao class DataZip
Thực chất ở đây hàm zip()
thực hiện đồng thời (hoặc lần lượt) các Observable<Integer>
truyền vào rồi sử dụng Func3<Integer, Integer, Integer, DataZip>()
để zip các kết quả trả về thành 1 kiểu dữ liệu chung.
zipList()
private void zipList() {
Observable.zip(createListObservable(), new FuncN<DataZip>() {
@Override
public DataZip call(Object... args) {
return new DataZip((int) args[0], (int) args[1], (int) args[2]);
}
}).subscribe(new Subscriber<DataZip>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(DataZip zip) {
txtZipList.setText(zip.numberOne + " " + zip.numberTwo + " " + zip.numberThree);
}
});
}
Như các thấy ở đây mình truyền vào 1 List<Observable<?>>
do không biết trước số lượng truyền vào nên chúng ta sẽ dùng hàm FuncN<DataZip>
để thực hiện zip và kết quả trả về và kết quả trả về sẽ là dạng mảng chứ không phải là từng Object riêng lẻ như hàm Func3
ở trên.
Do ở đây mình truyền vào là List<Observable<?>>
nên kết quả truyền vào trong hàm call là Object... args
còn nếu như bạn định nghĩa rõ kiểu trong List<Observable<?>>
như là List<Observable<Integer>>
thì kết quả truyền vào trong hàm call sẽ được định nghĩa rõ ràng public DataZip call(Integer... args)
.
Rồi thế là cơ bản chúng ta đã xong ví dụ về hàm zip()
giờ thì chạy để xem kết quả hiển thị sẽ như thế nào.
Như các bạn thấy thì 2 hàm zip()
và zipList()
đều thực hiện 3 Observable
và zip kết quả trả về vào class DataZip
.
Các bạn lưu ý khi thực hiện hàm Observable.zip()
mà có 1 Observable
trả về lỗi thì khi chúng ta subscribe()
thì Rx sẽ gọi hàm public void onError(Throwable e)
.
merge()
private void merge() {
Observable.merge(createObservable(1), createObservable(2), createObservable(3))
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
txtMerge.setText(txtMerge.getText() + " " + integer);
}
});
}
ở đây trong hàm onNext()
chúng ta thấy RxJava
trả về từng phần tử sau khi đã thực hiện xong các Observable
.
mergeList()
private void mergeList() {
Observable.merge(createListObservable()).subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
txtMergeList.setText(txtMergeList.getText() + " " + o);
}
});
}
ở đây tham số truyền vào cho hàm Observable.merge()
thay vì từng Observable
mà là List<Observable<?>>
và kết quả trả về cũng không khác gì so với mình truyền từng phần tử Observable
ở trên.
Run Demo để xem kết quả nào.
Như các bạn thấy trong hàm onNext()
của Observable.merge()
mình hiện lên màn hình 1 kết quả trả về nhưng mà chúng ta vẫn hiện đủ 3 giá trị 1 2 3
chứng tỏ là hàm này đã được gọi 3 lần tương đương với 3 Observable
truyền vào. Để rõ hơn các bạn có thể đặt Log
trong hàm onNext()
để xem số lần gọi và giá trị trả về.
@Override
public void onNext(Integer integer) {
Log.i("MainActivity", " -> onNext: ----------------: " + integer);
txtMerge.setText(txtMerge.getText() + " " + integer);
}
Cũng như hàm Observable.zip()
thì khi gặp 1 Observable
lỗi thì hàm Observable.merge()
sẽ dừng ngay tại thời điểm sau khi thực hiện hàm Observable
mà bị lỗi. Nhưng RxJava
cũng cấp cho chúng ta thêm 1 hàm Observable.mergeDelayError()
.
Khi sử dụng hàm này mà gặp 1 Observable
phát ra lỗi thì các Observable
tiếp theo vẫn được thực hiện và vẫn trả ra giá trị vào hàm onNext()
. Sau khi thực hiện hết các Observable
thì Rx sẽ gọi hàm onError(Throwable e)
và trả về lỗi của Observable
sảy ra lỗi.
Ở trên mình đã giới thiệu cơ bản về 2 hàm Observable.zip()
và Observable.merge()
trong RxJava. Mình cũng đã ứng dụng 2 hàm này kết hợp với Retrofit để thực hiện các thao tác đồng bộ dữ liệu từ server cũng như thực hiện upload, download dữ liệu.
- Hàm
Observable.zip()
thực hiện lấy dữ liệu đồng thời từ server để hiển thị lên View (Activity hay Fragment). - Hàm
Observable.merge()
dùng để thực hiện download hoặc upload dữ liệu lên server. Do các kết quả khi thực hiệnObservable
được trả về riêng lẻ nên trong hàmonNext()
mình sẽ đặt 1 biến để đếm số lượngObservable
đã được thực hiện xong và hiển thị phần trăm download hoặc upload dữ liệu cho người dùng.
Hi vọng sau bài viết này các bạn có thể hiểu cơ bản về 2 hàm Observable.zip()
và Observable.merge()
trong RxJava và sử dụng chúng trong các Project sắp tới của mình.
Source Demo: https://github.com/huyquyet/DemoZipMergeRxJava
Tài liệu tham khảo:
All rights reserved