Rx trong Kotlin (P1)
Bài đăng này đã không được cập nhật trong 3 năm
Kotlin đã trở thành một ngôn ngữ được Google khuyên nên dùng khi lập trình ứng dụng Android, là ngôn ngữ tuyệt vời để thay thế cho Java, hoạt động tốt trên Android, không có ;
và cuối cùng thì nó được tạo ra bởi JetBrains! :metal_tone2:
Vậy với Kotlin, Rx được dùng thế nào?
Bài viết này mình hy vọng sẽ giới thiệu được một phần nhỏ về cách dùng Rx trong Kotlin.
Trước tiên ta sẽ tìm hiểu một chút về Rx
Vậy Rx là gì? Nó là reactive programming
. Reactive programming nói một cách dễ hiểu là một kiểu lập trình gắn liền với mô hình Observer
, trong đó Subscribers
phản ứng lại với các sự kiện được phát ra từ Observables
Nhìn vào diagram này, Observables
"phát" dữ liệu ra cho các Subscribers
, và dù có bao nhiêu Subscribers
cũng có thể lắng nghe dữ liệu từ Observables
Rx cũng là Functional Programming
, đo đó nó thường được gọi là Functional Reactive Programming
. Khi những Subscribers
nhận dữ liệu, chúng có thể áp dụng các transformations
lên dữ liệu nhận được, tương tự như ta có thể làm vớt Streams trong Java 8. Bạn có thể nhìn vào diagram này:
Rx cũng rất flexible khi bạn có thể merge các luồng với nhau, vì vậy điều rút ra được là chúng ta có thể làm rất nhiều thứ hay ho với dữ liệu mà Subscribers
nhận được từ Observables
, và tất nhiên là ngay lập tức.
Trong Rx, Subscribers
implement 3 method để reactive lại với các Observables
:
- onNext(Data): Nhận dữ liệu từ các
Observables
- onError(Exception): Được gọi nếu có trả về exception
- onCompleted(): Được gọi khi hoàn thành nhận dữ liệu
Điều này gần giống với
Iterables
trong Java, điều khác biệt làIterables
làpull-base
cònRx observables
làpush-base
. CácObservables
có thểpush
dữ liệu cho cácSubscribers
của nó. Đây là bảng so sánh giữaIterables
vàRx observables
Một điều bạn cần lưu ý là Rx có tính đồng bộ (synchronous), có nghĩa là bạn cần phải chỉ định nếu bạn muốn nó không đồng bộ (asynchronous). Bạn có thể làm điều này khi gọi các phương thứcobserveOn
vàsubscribeOn
. Để ví dụ về luồng dữ liệu bạn có thể nhìn vào diagram này: Bạn có thể thấy các vòng tròn là các data object được hiển thị trong luồng, mũi tên hiển thị flow của data object đó. Ở bên trên ta có nói rằng có thể áp dụng cáctransformations
trên các data (stream) nhận được từObservables
, cácoperators
như làmap
,filter
, hayzip
như diagram này: Vừa rồi là các khái niệm cơ bản, giờ chúng ta sẽ đi vào việc implement nó.
Implementing Observables
Tạo một Observables
là điều không khó, có một số cách như sau để tạo một Observables
:
- Observable.from() tạo một observable từ một Iterable, Future, hoặc một Array.
//Kotlin
Observable.from(listOf(1, 2, 3, 4, 5))
//Java
Observable.from(Arrays.asList(1, 2, 3, 4, 5));
//It will emit these numbers in order : 1 - 2 - 3 - 4 - 5
2.Observable.just() tạo observable từ một hoặc nhiều object
Observable.just("Hello World!")
//this will emit "Hello World!" to all its subscribers
3.Observable.create() tạo observable từ một function. Chúng ta chỉ cần tạo OnSubscribe
interface sau đó nói cho Observable
thấy những gì có thể gửi tới Subscribers
//Kotlin
Observable.create(object : Observable.OnSubscribe<Int> {
override fun call(subscriber: Subscriber<in Int>) {
for(i in 1 .. 5)
subscriber.onNext(i)
subscriber.onCompleted()
}
})
//Java
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer>
subscriber) {
for (int i = 1; i <= 5; i++)
subscriber.onNext(i);
subscriber.onCompleted();
}
});
Implementing Subscribers
Trong Android, để subscriber một observable, trước tiên chúng ta phải cho biết những threads bạn muốn subscribers. RxAndroid tạo Schedulers
để bạn chỉ định các threads.
Với ví dụ sau, chúng ta sẽ subscription
trên một worker thread
và sẽ observation
trên một main thread
//Kotlin
Observable.just("Hello World")
.subscribeOn(Schedulers.newThread())
//each subscription is going to be on a new thread.
.observeOn(AndroidSchedulers.mainThread()))
//observation on the main thread
//Now our subscriber!
.subscribe(object:Subscriber<String>(){
override fun onCompleted() {
//Completed
}
override fun onError(e: Throwable?) {
//TODO : Handle error here
}
override fun onNext(t: String?) {
Log.e("Output",t);
}
})
//Java
Observable.just("Hello World")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
//Completion
}
@Override
public void onError(final Throwable e) {
//TODO : Handle error here
}
@Override
public void onNext(final String s) {
Log.e("Output",s);
}
});
Khi chạy đoạn code này sẽ trả về kết quả:
Output: Hello World!
Cảm ơn các bạn đã đọc đến đây, ở phần sau chúng ta sẽ đi vào thực hành một ví dụ Rx với Kotlin. Hẹn gặp lại! :call_me:
All rights reserved