+2

Rx trong Kotlin (P1)

Kotlin đã trở thành một ngôn ngữ được Google khuyên nên dùng khi lập trình ứng dụng Android, là ngôn ngữ tuyệt vời để thay thế cho Java, hoạt động tốt trên Android, không có ; và cuối cùng thì nó được tạo ra bởi JetBrains! :metal_tone2: Vậy với Kotlin, Rx được dùng thế nào? Bài viết này mình hy vọng sẽ giới thiệu được một phần nhỏ về cách dùng Rx trong Kotlin. Trước tiên ta sẽ tìm hiểu một chút về Rx Vậy Rx là gì? Nó là reactive programming. Reactive programming nói một cách dễ hiểu là một kiểu lập trình gắn liền với mô hình Observer, trong đó Subscribers phản ứng lại với các sự kiện được phát ra từ Observables Nhìn vào diagram này, Observables "phát" dữ liệu ra cho các Subscribers, và dù có bao nhiêu Subscribers cũng có thể lắng nghe dữ liệu từ Observables Rx cũng là Functional Programming, đo đó nó thường được gọi là Functional Reactive Programming. Khi những Subscribers nhận dữ liệu, chúng có thể áp dụng các transformations lên dữ liệu nhận được, tương tự như ta có thể làm vớt Streams trong Java 8. Bạn có thể nhìn vào diagram này: Rx cũng rất flexible khi bạn có thể merge các luồng với nhau, vì vậy điều rút ra được là chúng ta có thể làm rất nhiều thứ hay ho với dữ liệu mà Subscribers nhận được từ Observables, và tất nhiên là ngay lập tức. 👍 Trong Rx, Subscribers implement 3 method để reactive lại với các Observables:

  1. onNext(Data): Nhận dữ liệu từ các Observables
  2. onError(Exception): Được gọi nếu có trả về exception
  3. onCompleted(): Được gọi khi hoàn thành nhận dữ liệu Điều này gần giống với Iterables trong Java, điều khác biệt là Iterablespull-base còn Rx observablespush-base. Các Observables có thể push dữ liệu cho các Subscribers của nó. Đây là bảng so sánh giữa IterablesRx observables Một điều bạn cần lưu ý là Rx có tính đồng bộ (synchronous), có nghĩa là bạn cần phải chỉ định nếu bạn muốn nó không đồng bộ (asynchronous). Bạn có thể làm điều này khi gọi các phương thức observeOnsubscribeOn. Để ví dụ về luồng dữ liệu bạn có thể nhìn vào diagram này: Bạn có thể thấy các vòng tròn là các data object được hiển thị trong luồng, mũi tên hiển thị flow của data object đó. Ở bên trên ta có nói rằng có thể áp dụng các transformations trên các data (stream) nhận được từ Observables , các operators như là map, filter, hay zip như diagram này: Vừa rồi là các khái niệm cơ bản, giờ chúng ta sẽ đi vào việc implement nó.

Implementing Observables

Tạo một Observables là điều không khó, có một số cách như sau để tạo một Observables:

  1. Observable.from() tạo một observable từ một Iterable, Future, hoặc một Array.
//Kotlin
Observable.from(listOf(1, 2, 3, 4, 5))
//Java
Observable.from(Arrays.asList(1, 2, 3, 4, 5));
//It will emit these numbers in order : 1 - 2 - 3 - 4 - 5 

2.Observable.just() tạo observable từ một hoặc nhiều object

Observable.just("Hello World!") 
//this will emit "Hello World!" to all its subscribers

3.Observable.create() tạo observable từ một function. Chúng ta chỉ cần tạo OnSubscribe interface sau đó nói cho Observable thấy những gì có thể gửi tới Subscribers

//Kotlin
Observable.create(object : Observable.OnSubscribe<Int> {
    override fun call(subscriber: Subscriber<in Int>) {
        for(i in 1 .. 5)
            subscriber.onNext(i)

        subscriber.onCompleted()
    }
})

//Java
Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(final Subscriber<? super Integer> 
subscriber) {
        for (int i = 1; i <= 5; i++)
            subscriber.onNext(i);

        subscriber.onCompleted();
    }
});

Implementing Subscribers

Trong Android, để subscriber một observable, trước tiên chúng ta phải cho biết những threads bạn muốn subscribers. RxAndroid tạo Schedulers để bạn chỉ định các threads. Với ví dụ sau, chúng ta sẽ subscription trên một worker thread và sẽ observation trên một main thread

//Kotlin
Observable.just("Hello World")
          .subscribeOn(Schedulers.newThread()) 
          //each subscription is going to be on a new thread.
          .observeOn(AndroidSchedulers.mainThread())) 
          //observation on the main thread
          //Now our subscriber!
          .subscribe(object:Subscriber<String>(){
            override fun onCompleted() {
             //Completed
            }

            override fun onError(e: Throwable?) {
             //TODO : Handle error here
            }

            override fun onNext(t: String?) {
             Log.e("Output",t);
            }
           })
//Java 
Observable.just("Hello World")
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                //Completion
            }

            @Override
            public void onError(final Throwable e) {
                //TODO : Handle error here
            }

            @Override
            public void onNext(final String s) {
                Log.e("Output",s);
            }
        });

Khi chạy đoạn code này sẽ trả về kết quả:

Output: Hello World!

Cảm ơn các bạn đã đọc đến đây, ở phần sau chúng ta sẽ đi vào thực hành một ví dụ Rx với Kotlin. Hẹn gặp lại! :call_me:

Nguồn


All rights reserved

Viblo
Hãy đăng ký một tài khoản Viblo để nhận được nhiều bài viết thú vị hơn.
Đăng kí