Reactive programing with Java [Part 1]

Dẫn nhập

Chào mọi người, hôm nay tôi sẽ giới thiệu một chủ đề mới, một phương pháp lập trình mới khá phổ biến gần đây mà bản thân tôi trong quá trình làm việc thấy nó cực kỳ hữu ích, hiện đại và clean - Reactive Programing. Khái niệm này chắc còn mới mẻ với khá nhiều người, và tôi cũng chỉ mới làm quen với nó chừng một năm nay, nhưng cực kỳ hứng thú với những thứ mà nó mang lại. Để hiểu rõ hơn những gì trong bài viết này giới thiệu, các bạn nên bỏ chút thời gian đọc lại về Observe Pattern mà tôi đã chia sẻ trong bài trước nhé.

Reactive Programing

Reactive Programing là gì ?

Reactive Programing mà một phương pháp lập trình tập trung vào các luồng dữ liệu và quan sát sự thay đổi của các luồng dữ liệu đó. Có nhiều cách định nghĩa khác nhau về nó và thực sự khó hiểu với những người mới bắt đầu, tuy nhiên mấu chốt cơ bản của Reactive Programing vẫn là tập trung vào sự thay đổi trên các luồng dữ liệu bất đồng bộ.

Các khái niệm quan trọng trong Reactive Programing:

  • Producer & Subscriber: Producer chính là nơi phát ra luồng dữ liệu cho nơi lắng nghe sự thay đổi của dữ liệu đó, gọi là Subscriber.
  • Luồng dữ liệu (Stream) : Khi chúng ta cần gọi một API để lấy dữ liều về, cái chúng ta cần quan tâm ở đây chính là luồng dữ liệu, bao gồm dữ liệu trả về, các lỗi có thể xảy ra và thời điểm nào kết thúc tác vụ. Và đương nhiên, luồng dữ liệu này phải là luồng bất đồng bộ để nâng cao hiệu năng cũng như trải nghiệm người dùng của ứng dụng. Reactive Programing sẽ tập trung vào việc quan sát sự thay đổi của luồng dữ liệu đó expose ra chính xác những thứ chúng ta cần ở một stream nêu trên. Reactive Programing còn cho phép chúng ta filter stream (ví như filter data theo một criteria, scan ...) trước khi expose cho Subscriber, chuyển đổi stream này thành stream khác (flatmap, map, ...) hoặc gộp nhiều stream thành một (zip, merge, ...). Tất cả những công cụ nêu trên vô cùng hữu ích trong lập trình với Reactive Programing mà tôi sẽ giới thiệu ở phần sau.

Các giá trị mang lại của Reactive Programing

  • Việc lập trình bất đồng bộ dễ dàng hơn.
  • Dễ dàng hơn trong việc xử lý lỗi.
  • Code tách biệt và tường mình hơn rất nhiều.
  • Dễ dàng trong việc triển khai các nguyên lý thiết kế hướng đối tượng.

Reactive Programing với Java

RxJava là một thư viện để triển khai Reactive Programing bằng ngôn ngữ Java, và được dùng rất nhiều trong lập trình Web, Client app... RxJava triển khai từ Observe Pattern, cung cấp đầy đủ các function giúp lập trình viên triển khai Reactive Programing trong ứng dụng của mình.

Import RxJava vào project của bạn thông qua Gradle build dependency:

compile 'io.reactivex.rxjava2:rxjava:x.y.z'

hoặc Maven build:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>x.y.z</version>
</dependency>

Các thành phần cơ bản trong RxJava.

  • Observable : Là model cơ bản trong RxJava, cho phép bạn xử lý các dòng dữ liệu bất đồng bộ, nôm na là đại diện cho một luồng dữ liệu, cung cấp đầy đủ các method để làm việc với Reactive Programing. Nó là nơi phát tín hiệu đến các Subscriber lắng nghe trên nó.
  • Subscriber : Là nơi lắng nghe sự kiện từ Observable, tập hợp các call back để xử lý luồng dữ liệu : onNext, onCompletedonError.
    • onNext : Callback được gọi khi emit data, nó có thể được gọi một hoặc nhiều lần.
    • onCompleted: Callback được gọi khi stream đã emit data hoàn tất, đánh dấu trạng thái kết thúc của luồng dữ liệu.
    • onError: Callback được gọi khi có bất kỳ lỗi nào xảy ra. Khi nó được gọi thì onComplete sẽ không được gọi nữa.
  • Subscription : Diễn tả một mối quan hệ giữa ObservableSubscriber. Khi có một Subscriber lắng nghe trên Observable thì nó tạo ra một Subscription.
  • Scheduler: Sử dụng trong multithreading với RxJava mà tôi sẽ nói rõ trong phần sau.

Implementation

  • Tạo một Observable đơn giản:

Observable.just("1", "2", "3", "4");

Luồng dữ liệu này sẽ emit lần lượt các phần tử trong mảng trên và kết thúc bằng hàm onCompleted.

  • Tạo một Observable tùy ý.
Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 0; i < 5; i++){
                    subscriber.onNext(String.valueOf(i));
                }
                subscriber.onCompleted();
            }
        });
  • Lắng nghe một luồng dữ liệu.
Observable.just("1", "2", "3", "4").subscribe(new Subscriber<String>() {
       @Override
       public void onCompleted() {
           // After emit all data
       }

       @Override
       public void onError(Throwable e) {
           // Error has occurred here
       }

       @Override
       public void onNext(String s) {
           // Data will be emitted here 4 times
       }
   });

Kết luận:

Với Reactive Programing, tuy chỉ mới tiếp xúc một thời gian không quá dài, nhưng tôi thấy nó thực sự mạnh mẽ và cần thiết trong lập trình bất đồng bộ hiện nay, đặc biệt là trong các ứng dụng điện thoại, cần thiết việc quản lý thread thật khoa học và đúng đắn để cải thiện hiệu năng ứng dụng. Trong khuôn khổ bài viết này, tôi chỉ giới thiệt sợ lược về Reactive Programing và vài ví dụ cơ bản khi làm việc với RxJava. Ở phần tiếp theo, tôi sẽ nói rõ hơn về các khái niệm như Scheduler, filter stream, chuyển đổi stream hay gộp nhiều stream. Cảm ơn các bạn.