Some combining operators of RxJava

Combining operators kết hợp nhiều luồng dữ liệu để tạo ra một luồng dữ liệu.

1. concat()

Như tên của method, bạn có thể sử dụng concat() để kết hợp hai hay nhiều luồng dữ liệu để phát ra dữ liệu đồng thời nhưng không xen kẽ nhau.

Observable<Integer> observer1 = Observable.from(new Integer[]{1, 2, 3, 4, 5}); // Emit 1 to 5
Observable<Integer> observer2 = Observable.from(new Integer[]{6, 7, 8, 9, 10}); // Emit 6 to 10

Observable.concat(observer1, observer2) // Concat the output of both the operators.
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.io())
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d("Output", integer + "");
            }
        });

Có 2 luồng dữ liệu như trên, luồng 1 sẽ phát ra số nguyên từ 1 đến 5 và luồng 2 phát ra từ 6 đến 10. Nếu chúng ta kết hợp chúng, luồng 1 sẽ phát ra từ 1 đến 5 và sau đó có thể quan sát thấy luồng 2 phát ra dữ liệu từ 6 đến 10. concat() hợp nhất tất cả luồng dữ liệu đồng thời và phát ra dữ liệu theo tuần tự chứ không xen kẽ

Kết quả: 1 2 3 4 5 6 7 8 9 10

2. merge()

merge hoạt động giống như toán tử concat. Sự khác biệt duy nhất giữa merge và concat là merge có thể phát xen kẽ các kết quả đầu ra, trong khi concat sẽ đợi các luồng trước đó kết thúc trước khi xử lý các luồng sau đó. Vì vậy, không giống như toán tử concat, merge không chờ đợi dữ liệu được phát ra từ 1 luồng kết thúc. Nó phát ra dữ liệu một cách xen kẽ từ nhiều nguồn dữ liệu một khi nó sẵn sàng. Trong trường hợp một trong các luồng dữ liệu có lỗi xảy ra thì việc merge sẽ kết thúc. Để bỏ qua luồng dữ liệu phát về lỗi và tiếp tục việc merge thì có thể sử dụng toán tử mergeDelayError().

3. zip()

zip kết hợp nhiều luồng dữ liệu và tạo ra một luồng dữ liệu chung được kết hợp từ nhiều luồng dữ liệu trước đó.

//Class that combines both data streams
class ZipObject {
    int number; 
    String alphabet;
}

Observable<Integer> observable1 = Observable.from(new Integer[]{1, 2, 3, 4, 5});  //Emits integers
Observable<String> observable2 = Observable.from(new String[]{"A", "B", "C", "D", "F"});  //Emits alphabets
Observable<ZipObject> observable = Observable.zip(observable1, observable2,   
    //Function that define how to zip outputs of both the stream into single object.
    new Func2<Integer, String, ZipObject>() { 
        @Override
        public ZipObject call(Integer integer, String s) {
            ZipObject zipObject = new ZipObject();
            zipObject.alphabet = s;
            zipObject.number = integer;
            return zipObject;
        }
    });

observable
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.io())
    .subscribe(new Action1<ZipObject>() {
        @Override
        public void call(ZipObject zipObject) {
            Log.d("Observer", "Output:" + zipObject.number + " " + zipObject.alphabet);
        }
    });
}