Lấy dữ liệu từ nhiều nguồn tài nguyên với Concat() trong RxJava

Bài toán đặt ra là dữ liệu được hiển thị tới người dùng được tổng hợp từ 2 nguồn cơ bản là database local và dữ liệu từ server trả về. Cách thông thường bạn có thể gọi tuần tự 2 hàm lấy dữ liệu từ 2 nơi rồi gộp dữ liệu của 2 hàm đấy lại -> hiển thị cho người dùng. Nhưng có 1 vấn đề đặt ra là nếu như vậy sau này bạn muốn thêm 1 nguồn dữ liệu nữa thì code của bạn tương đối dài và trong không clear code cho lắm. Sau đây mình sẽ giới thiệu với các bạn phương thức trong RxJava hỗ trợ gộp 2 hàm đó lại với nhau và cùng trả về chung 1 kết quả duy nhất và chúng ta chỉ cần sử dụng kết quả đó để hiển thị tới người dùng.

1. Concat là gì

Nhìn qua về cơ chế hoạt động như này thì các bạn cũng đã hình dung ra là hàm concat() thực hiện nhiệm vụ gì rồi chứ. Nó sẽ ghép 2 hay nhiều Observable trong RxJava 1.x hoặc Flowable trong RxJava 2.x lại với nhau rồi thực hiện tuần tự từ Observable đầu tiên đến hết Observable cuối cùng và trả về chung 1 kết quả trong 1 danh sách mảng.

Trên đây mình sẽ giới thiệu cho các bạn về concat() trong RxJava 2.x còn với 1.x thì cũng tương tự như vậy.

public static <T> Flowable<T> concat(Iterable<? extends Publisher<? extends T>> sources)

public static <T> Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources) 

public static <T> Flowable<T> concat(Publisher<? extends T> source1, Publisher<? extends T> source2)

trên đây là 3 hàm concat() cơ bản.

Nói về lý thuyết là có vẻ hơi khó hiểu. Trong phần tiếp theo mình sẽ làm 1 số demo để các bạn có thể hình dung ra cơ chế hoạt động của hàm concat() trong RxJava nhé.

2. Ví dụ

Đầu tiên các bạn tạo ra 1 project cơ bản với 1 class MainActivity để mình có thể viết code và xem luồng hoạt động

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    demoConcat();
}

private void demoConcat() {
}

tiếp theo chúng ta sẽ tạo ra 2 hàm giả định để lấy dữ liệu từ 2 nguồn khác nhau.

private Flowable<List<Integer>> getDataLocal() {
    List<Integer> data = new ArrayList<>();
    data.add(1);
    data.add(2);
    data.add(3);
    return Flowable.just(data);
}

private Flowable<List<Integer>> getDataServer() {
    List<Integer> data = new ArrayList<>();
    data.add(3);
    data.add(4);
    data.add(5);
    return Flowable.just(data);
}

và sửa lại hàm demoConcat()

private void demoConcat() {
    List<Integer> data = new ArrayList<>();
    Flowable.concat(Flowable.just(getDataLocal(), getDataServer()))
            .subscribe(new DisposableSubscriber<List<Integer>>() {
                @Override
                public void onNext(List<Integer> o) {
                    data.addAll(o)
                }

                @Override
                public void onError(Throwable t) {
                    Log.i("MainActivity", "onError --------------------: ");
                }

                @Override
                public void onComplete() {
                    for (Integer o : data) {
                        Log.i("MainActivity", "onComplete --------------------: " + o);
                    }
                }
            });
}

chạy app để xem kết quả nào.

Như các bạn thấy. ở 2 hàm getDataLocal()getDataServer() kết quả trả về của nó là dữ liệu từ 2 nguồn khác nhau, nhưng kết quả chúng ta nhận được là 1 List dữ liệu được hợp từ 2 nguồn khác nhau.

Nhìn kỹ lại xem nào, ở đây chúng ta có vấn đề là nếu dữ liệu ở 2 nguồn bị trùng ở 1 số phần tử thì liệu dữ liệu chung trả về có bị trùng không. Câu trả lời là có và chúng ta sẽ xử lý vấn đề dữ liệu trùng lặp này ở hàm onNext khi subscribe

@Override
public void onNext(List<Integer> o) {
    for (Integer i : o) {
        if (!data.contains(i)) data.add(i);
    }
}

nếu bạn sử dụng Java8 với cách viết lambda thì code sẽ được ngắn gọn hơn

private void demoConcat() {
List<Integer> data = new ArrayList<>();
Flowable.concat(Flowable.just(getDataLocal(), getDataServer()))
        .subscribe(
                o -> {
                    for (Integer i : o) {
                        if (!data.contains(i)) data.add(i);
                    }
                },

                throwable -> {
                    Log.i("MainActivity", "onError --------------------: ");
                },

                () -> {
                    for (Integer o : data) {
                        Log.i("MainActivity", "onComplete --------------------: " + o);
                    }
                }
        );
}

Còn 1 vấn đề nữa là mình muốn khi lấy dữ liệu từ server về thì mình cập nhật luôn dữ liệu đó xuống local thì làm như thế nào. Để làm như vậy thì chúng ta sẽ quay lại hàm getDataServer() và viết thêm hàm cập nhật.

private Flowable<List<Integer>> getDataServer() {
    List<Integer> data = new ArrayList<>();
    data.add(4);
    data.add(5);
    data.add(6);
    return Flowable.just(data).doOnNext(
            integers -> insertDataToLocal(integers)
    );
}

private void insertDataToLocal(List<Integer> data) {
    // TODO: insert data to database
}

trên đây mình có sử dụng hàm doOnNext() các bạn có thể tìm hiểu thêm hàm [ở đây] http://reactivex.io/documentation/operators/do.html)

Ở trên mình đã giới thiệu sơ qua về hàm concat() trong RxJava và cách sử dụng cơ bản hàm này. Ngoài ra khi 2 nguồn dữ liệu có thể sảy ra lỗi khi lấy dữ liệu thì các bạn có thể tham khảo thêm hàm concatDelayError() để có thể xử lý lỗi ở từng trường hợp.

Source code demo: https://github.com/huyquyet/DemoConcatRx