+7

Zip() và Merge() trong RxJava

1. Zip()

Hàm zip() trong RxJava giúp bạn thực hiện đồng thời nhiều Observable và gộp các kết quả của các Observable lại cùng trong 1 kết quả trả về.

Trong RxJava cung cấp cho bạn 3 lựa chọn để thực hiện phương thức zip

  • <R>Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)
  • <R>Observable<R> zip(Observable<? extends *Observable*<?>> ws, final FuncN<? extends R> zipFunction)
  • <T1,T2,R> Observable<R> zip(Observable<? extendsT1> o1, Observable<? extendsT2> o2, final Func2<? super T1, ? superT2, ? extendsR> zipFunction) "hàm zip() này cho phép bạn zip tối đa 9 Observable lại với nhau"

2. Merge()

Hàm merge() trong RxJava giúp bạn thực hiện đồng thời nhiều Observable và trả về riêng lẻ các kết quả của Observable sau khi thực hiện xong Observable đó.

Trong RxJava cung cấp cho bạn rất nhiều nhưng ở đây mình sẽ giới thiệu với các bạn 1 số hàm merge() cơ bản và hay dùng như sau

  • <T>Observable<T> merge(Observable<? extends Observable<? extends T>> source)
  • <T>Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) * tham số maxConcurrent cho biết số lượng Observable cùng được thực hiện đồng thời (chạy song song)*
  • <T>Observable<T> merge(Observable<? extends T> t1, Observable<? extends T> t2) "hàm merge() này cho phép bạn merge tối đa 9 Observable lại với nhau"

Ở trên mình đã giới thiệu qua về 2 hàm zip() và merge() trong RxJava, dưới đây mình sẽ làm 1 ví dụ nhỏ để bạn hiểu rõ hơn về cơ chế hoạt động và sự khác nhau cơ bản giữa 2 hàm này.

3.Example

Đầu tiên trong file activity_main.xml cá bạn khai báo 1 số View đơn giản như sau:

<LinearLayout
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    android:orientation="vertical"
    >

    <Button
        android:id="@+id/btn_zip"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        android:text="zip()"
        />
    <TextView
        android:id="@+id/txt_zip"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        />
    <Button
        android:id="@+id/btn_zip_list"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        android:text="zipList()"
        />
    <TextView
        android:id="@+id/txt_zip_list"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        />

    <Button
        android:id="@+id/btn_merge"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        android:text="merge()"
        />
    <TextView
        android:id="@+id/txt_merge"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        />
    <Button
        android:id="@+id/btn_merge_list"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        android:text="mergeList()"
        />
    <TextView
        android:id="@+id/txt_merge_list"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        />

</LinearLayout>

Trong onCreate() các bạn định nghĩa các TextViewButton và set sự kiện onClick cho các Button

    @Override
    public void onClick(View v) {
        switch (v.getId()) {
            case R.id.btn_zip:
                zip();
                break;
            case R.id.btn_merge:
                merge();
                break;
            case R.id.btn_zip_list:
                zipList();
                break;
            case R.id.btn_merge_list:
                mergeList();
                break;
        }
    }
    
    private void zip(){}
    private void zipList(){}
    private void merge(){}
    private void mergeList(){}

Trong file MainActivity.java các bạn khai báo các hàm định nghĩa Observable

    private Observable<Integer> createObservable(int data) {
            return Observable.just(data);
        }

    private List<Observable<?>> createListObservable() {
        List<Observable<?>> result = new ArrayList<>();
        result.add(createObservable(1));
        result.add(createObservable(2));
        result.add(createObservable(3));
        return result;
    }

Rồi tất cả các chuẩn bị đã xong bây giờ chúng ta sẽ bắt đầu xây dựng lần lượt hàm chức năng

zip()

    private void zip() {
        Observable.zip(createObservable(1), createObservable(2), createObservable(3),
                new Func3<Integer, Integer, Integer, DataZip>() {
                    @Override
                    public DataZip call(Integer integer, Integer integer2, Integer integer3) {
                        return new DataZip(integer, integer2, integer3);
                    }
                }).subscribe(new Subscriber<DataZip>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(DataZip zip) {
                txtZip.setText(zip.numberOne + " " + zip.numberTwo + " " + zip.numberThree);
            }
        });
    }
    
    private void zipList(){}
    private void merge(){}
    private void mergeList(){}
    
    class DataZip {
        int numberOne;
        int numberTwo;
        int numberThree;

        DataZip(int numberOne, int numberTwo, int numberThree) {
            this.numberOne = numberOne;
            this.numberTwo = numberTwo;
            this.numberThree = numberThree;
        }
    }

Như ở đây các bạn đã thấy mình thực hiện zip() 3 Observable<Integer> và zip 3 kết quả sau khi thực hiện xong vao class DataZip Thực chất ở đây hàm zip() thực hiện đồng thời (hoặc lần lượt) các Observable<Integer> truyền vào rồi sử dụng Func3<Integer, Integer, Integer, DataZip>() để zip các kết quả trả về thành 1 kiểu dữ liệu chung.

zipList()

    private void zipList() {
        Observable.zip(createListObservable(), new FuncN<DataZip>() {
            @Override
            public DataZip call(Object... args) {
                return new DataZip((int) args[0], (int) args[1], (int) args[2]);
            }
        }).subscribe(new Subscriber<DataZip>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onNext(DataZip zip) {
                txtZipList.setText(zip.numberOne + " " + zip.numberTwo + " " + zip.numberThree);
            }
        });
    }

Như các thấy ở đây mình truyền vào 1 List<Observable<?>> do không biết trước số lượng truyền vào nên chúng ta sẽ dùng hàm FuncN<DataZip> để thực hiện zip và kết quả trả về và kết quả trả về sẽ là dạng mảng chứ không phải là từng Object riêng lẻ như hàm Func3 ở trên.

Do ở đây mình truyền vào là List<Observable<?>> nên kết quả truyền vào trong hàm call là Object... args còn nếu như bạn định nghĩa rõ kiểu trong List<Observable<?>> như là List<Observable<Integer>> thì kết quả truyền vào trong hàm call sẽ được định nghĩa rõ ràng public DataZip call(Integer... args).

Rồi thế là cơ bản chúng ta đã xong ví dụ về hàm zip() giờ thì chạy để xem kết quả hiển thị sẽ như thế nào. Như các bạn thấy thì 2 hàm zip()zipList() đều thực hiện 3 Observable và zip kết quả trả về vào class DataZip.

Các bạn lưu ý khi thực hiện hàm Observable.zip() mà có 1 Observable trả về lỗi thì khi chúng ta subscribe() thì Rx sẽ gọi hàm public void onError(Throwable e).

merge()

    private void merge() {
        Observable.merge(createObservable(1), createObservable(2), createObservable(3))
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        txtMerge.setText(txtMerge.getText() + " " + integer);
                    }
                });
    }

ở đây trong hàm onNext() chúng ta thấy RxJava trả về từng phần tử sau khi đã thực hiện xong các Observable.

mergeList()

    private void mergeList() {
        Observable.merge(createListObservable()).subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {
                txtMergeList.setText(txtMergeList.getText() + " " +  o);
            }
        });
    }

ở đây tham số truyền vào cho hàm Observable.merge() thay vì từng Observable mà là List<Observable<?>> và kết quả trả về cũng không khác gì so với mình truyền từng phần tử Observable ở trên.

Run Demo để xem kết quả nào.

Như các bạn thấy trong hàm onNext() của Observable.merge() mình hiện lên màn hình 1 kết quả trả về nhưng mà chúng ta vẫn hiện đủ 3 giá trị 1 2 3 chứng tỏ là hàm này đã được gọi 3 lần tương đương với 3 Observable truyền vào. Để rõ hơn các bạn có thể đặt Log trong hàm onNext() để xem số lần gọi và giá trị trả về.

    @Override
    public void onNext(Integer integer) {
        Log.i("MainActivity", " -> onNext: ----------------: " + integer);
        txtMerge.setText(txtMerge.getText() + " " + integer);
   }                    

Cũng như hàm Observable.zip() thì khi gặp 1 Observable lỗi thì hàm Observable.merge() sẽ dừng ngay tại thời điểm sau khi thực hiện hàm Observablemà bị lỗi. Nhưng RxJava cũng cấp cho chúng ta thêm 1 hàm Observable.mergeDelayError().

Khi sử dụng hàm này mà gặp 1 Observable phát ra lỗi thì các Observable tiếp theo vẫn được thực hiện và vẫn trả ra giá trị vào hàm onNext(). Sau khi thực hiện hết các Observable thì Rx sẽ gọi hàm onError(Throwable e) và trả về lỗi của Observable sảy ra lỗi.

Ở trên mình đã giới thiệu cơ bản về 2 hàm Observable.zip()Observable.merge() trong RxJava. Mình cũng đã ứng dụng 2 hàm này kết hợp với Retrofit để thực hiện các thao tác đồng bộ dữ liệu từ server cũng như thực hiện upload, download dữ liệu.

  • Hàm Observable.zip() thực hiện lấy dữ liệu đồng thời từ server để hiển thị lên View (Activity hay Fragment).
  • Hàm Observable.merge() dùng để thực hiện download hoặc upload dữ liệu lên server. Do các kết quả khi thực hiện Observable được trả về riêng lẻ nên trong hàm onNext() mình sẽ đặt 1 biến để đếm số lượng Observable đã được thực hiện xong và hiển thị phần trăm download hoặc upload dữ liệu cho người dùng.

Hi vọng sau bài viết này các bạn có thể hiểu cơ bản về 2 hàm Observable.zip()Observable.merge() trong RxJava và sử dụng chúng trong các Project sắp tới của mình. Source Demo: https://github.com/huyquyet/DemoZipMergeRxJava Tài liệu tham khảo:


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí