+3

RXKotlin - Giới thiệu về RxKotlin và các operator thường dùng

I. Giới thiệu ngắn gọn về Reactive Programming

Thuật ngữ reactive hiện nay đã không còn xa lạ với chúng ta. Hiện nay nó đang là xu thế và gần như là rule trong lĩnh vực phát triển phần mềm. Mỗi ngày đều có rất nhiều các blogs, presentations, emerging frameworks and libraries đang nói về và phát triển xoay quanh reactive programing .

Với những bạn programer mới có thể sẽ thắc mắc về reactive programing. Tại sao mọi người lại hứng thú với nó đến vậy? Reactive programing chính xác là cái gì? Lợi ích của việc áp dụng reactive programing là gì? Chúng ta có nên học nó không? Nếu có thì sau đó như nào? Vậy chúng ta sẽ cùng nhau tìm hiểu xem nó là gì nhé 😃

Như các bạn đã biết, Kotlin được sinh ra giúp cải tiến và giải quyết nhiều vấn đề quan trọng của Java. Với Kotlin chúng ta có thể tạo ra 1 application tuyệt vời, và nếu bạn kết hợp với reactive programming style với Kotlin thì lại càng dễ dàng hơn trong việc tạo ra 1 application tốt hơn thế nữa. Vậy reactive programming là cái gì mà nói lắm thế?

What is reactive programming?

Reactive programming là một mô hình lập trình không đồng bộ xoay quanh các luồng dữ liệu và streams của sự thay đổi. Tức là sao, bạn hãy xem ví dụ bên dưới để hiểu rõ hơn nhé.

fun main(args: Array<String>) {
 var number = 4
 var isEven = isEven(number)
 println("The number is " + (if (isEven) "Even" else "Odd"))
 number = 9
 println("The number is " + (if (isEven) "Even" else "Odd"))
 }
 fun isEven(n:Int):Boolean = ((n % 2) == 0)

Bạn sẽ thấy rằng output của chương trình trên return true cho dù number = 9. Nếu thực hiện tracking isEven = isEven(number) ngay sau number = 9 thì value sẽ return false. Reactive programing cũng hoạt động tương tự. Nó sẽ thực hiện tracking khi có 1 sự thay đổi nào đó.

Ở bài viết này, mình không thể chia sẻ hết về RxKotlin được, nhưng mình sẽ chia sẻ những gì cơ bản nhất và bạn sẽ dễ gặp phải nhất thôi nhé.

II. RxKotlin

1. RxKotlin & Coroutines?

RxKotlin là một lightweight library, là extension functions cho RxJava. RxKotlin được sinh ra với mục đích là tối ưu hóa việc sử dụng và huẩn hóa các quy ước sử dụng RxJava với Kotlin.

The most exciting feature trong Kotlincoroutines. Với việc sử dụng coroutines chúng ta có thể xử lý bất đồng bộ, non-blocking code giống như threads 1 cách đơn giản hơn thread. Coroutine còn được gọi là 1 lightweight thread.

Lưu ý rằng RxKotlin không sử dụng coroutines. Lý do khá đơn giản là cả coroutines và Schedulers trong RxKotlin đều hoạt động khá giống nhau. Trong khi coroutines mới được sinh ra, Schedulers đã có từ trước đó rất lâu cùng với RxJava, RxJs, RxSwift, ...

Coroutines phù hợp với việc phát triển áp dụng concurrency khi mà chúng ta không thể sử dụng RxKotlin Schedulers.

2. Observables, Observers

Observables and subscribers are at the base of reactive programming

Chúng ta có thể nói rằng chúng là building blocks của reactive programming.

3 thành phần trụ cột của reactive programming—Observables, Observers, and subjects:

  • Chúng ta sẽ đi vào chi tiết của việc biến đổi các loại data sources thành observable instances
  • Bạn sẽ tìm hiểu về các types của Observables
  • Cách sử dụng Observer instances và subscriptions và cuối cùng

2.1. Observables

Trong reactive programming, Observable là một tính toán cơ bản mà produces các value sẽ được consumed bởi 1 comsumer (Observer).

Điều quan trọng và cần chú ý ở đây và consumer (Observer) không pull values, mà Observable pushes the value tới consumer.

Hoặc chúng ta có thể nói rằng Observable là 1 pushbased, emit các value thông qua các các operators và tới Observer:

  • Observer subscribes với Observable
  • Observable bắt đầu emit các values nó đang chứa
  • Observer phản hồi với bất kì item nào được Observable emit

How Observable works?

Observable có 3 events/methods quan trọng nhất:

  • onNext: Observable passes all items one by one tới onNext.
  • onComplete: Khi tất cả items đã đi qua onNext method, Observable gọi the onComplete method.
  • onError: Khi Observable gặp bất kì error nào, it gọi the onError method.

Chú ý là cả onError and onComplete đều là terminal events, nếu onError được gọi, thì onComplete sẽ không bao giờ được gọi, và ngược lại.

fun main(args: Array<String>) {

    val observer: Observer<Any> = object : Observer<Any> {
        override fun onSubscribe(d: Disposable?) {
            println("Subscribed to $d")
        }

        override fun onNext(item: Any?) {
            println("Next $item")
        }

        override fun onError(e: Throwable?) {
            println("Error Occured $e")
        }

        override fun onComplete() {
            println("All Completed")
        }

    }

    val observable: Observable<Any> = listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f).toObservable() //Như các bạn thấy `.toObservable()` là 1 extension giúp chúng ta biến đổi 1 list -> observable 1 cách nhanh chóng :v

    observable.subscribe(observer)

    val observableOnList: Observable<List<Any>> =
        Observable.just(  //Converts three items into an Observable that emits those items.
            listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f),
            listOf("List with Single Item"),
            listOf(1,2,3,4,5,6))//8
    observableOnList.subscribe(observer)
}

Output:

Subscribed to io.reactivex.rxjava3.internal.operators.observable.ObservableFromIterable$FromIterableDisposable@4361bd48
Next One
Next 2
Next Three
Next Four
Next 4.5
Next Five
Next 6.0
All Completed
Subscribed to io.reactivex.rxjava3.internal.operators.observable.ObservableFromArray$FromArrayDisposable@7637f22
Next [One, 2, Three, Four, 4.5, Five, 6.0]
Next [List with Single Item]
Next [1, 2, 3, 4, 5, 6]
All Completed

Process finished with exit code 0

Understanding the Observable.create method

Bạn có thể tạo 1 Observable với Observable.create method bất kì lúc nào.

Observable.create method khá là hữu dụng trong trường hợp bạn làm việc với custom data structure và muốn control over values được emitted. Bạn có thể emit values tới Observer từ một thread khác.

val observable:Observable<String> = Observable.create<String> {//1
 it.onNext("Emit 1")
 it.onNext("Emit 2")
 it.onNext("Emit 3")
 it.onNext("Emit 4")
 it.onComplete()
 }
 observable.subscribe(observer)
 val observable2:Observable<String> = Observable.create<String> {//2
 it.onNext("Emit 1")
 it.onNext("Emit 2")
 it.onNext("Emit 3")
 it.onNext("Emit 4")
 it.onError(Exception("My Custom Exception"))
 }
 observable2.subscribe(observer)
 }

Output:

Subscribed to CreateEmitter{null}
Next Emit 1
Next Emit 2
Next Emit 3
Next Emit 4
All Completed
Subscribed to CreateEmitter{null}
Next Emit 1
Next Emit 2
Next Emit 3
Next Emit 4
Error Occured java.lang.Exception: My Custom Exception

Understanding the Observable.from methods

Observable.from methods đơn giản hơn so với the Observable.create method. Bạn có thể create Observable instances từ hầu hết Kotlin structure bằng from methods.

Chú ý rằng ở RxKotlin 1, bạn sử dụng Observale.from; tuy nhiên, từ RxKotlin 2.0 (cũng như RxJava2.0), operator được đổi tên với 1 postfix, như là fromArray, fromIterable, fromFuture, ...

Understanding the Observable.just method

Observable.just cũng được sử dụng để create Observable. Tuy nhiên có hơi khác Observable.from 1 chút là: ví dụ bạn pass 1 Iterable instance, Observable.just sẽ coi đó là single parameter, nó sẽ emit tất cả item trong list như là 1 single item, trong khi đó Observable.from sẽ pass từng item trong list.

2.2. Subscribing and disposing

Subscribe operato được sử dụng để connect 1 Observable tới Observer. Chúng tá có thể pass 1 trong 3 methods (onNext, onComplete, onError) tới subscribe operator, hoặc pass 1 instance của Observer interface tới subscribe operator để Observable connect với Observer.

Khi bạn subscribe, nếu bạn pass methods thay vì Observer instance, subscribe operator sẽ trả về 1 instance của Disposable. Nếu bạn pass instance của Observer, bạn sẽ nhận được instance của Disposable ở trong onSubscribe method.

Bạn có thể sử dụng instance của Disposable interface to để dừng các emissions bất ký lúc nào

fun main(args: Array<String>) {
 runBlocking {
 val observale:Observable<Long> =
 Observable.interval(100,TimeUnit.MILLISECONDS) //1 we created an Observable with the Observable.interval factory method that will emit an integer after each 100 millisecond interval.
 val observer:Observer<Long> = object : Observer<Long> {
 lateinit var disposable:Disposable//2
 override fun onSubscribe(d: Disposable) {
 disposable = d//3
 }
 override fun onNext(item: Long) {
 println("Received $item")
 if(item>=10 && !disposable.isDisposed) {//4 stop emission
 disposable.dispose()//5
 println("Disposed")
 }
 }
 override fun onError(e: Throwable) {
 println("Error ${e.message}")
 }
 override fun onComplete() {
 println("Complete")
 }
 }
 observale.subscribe(observer)
 delay(1500)//6
 }
 }

Output:

Received 0
Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10
Disposed

Trên kia là những gì cơ bản nhất khi mới tiếp cận RxJava hay RxKotlin. Đến đây chúng ta cần hiểu các khái niệm Observables, Observers, Subscribingdisposing là gì đã.

3. Những operator hữu ích và hay gặp

Trước tiên, operator ở đây là gì?

Khi bắt đầu học lập trình, chúng ta học về các operators là chuỗi các ký tự được viết ra để thực hiện 1 nhiệm vụ dựa trên toán hạng và trả về kết quả.

Đối với reactive, khái niệm về operator cũng tương tự, các toán hạng ở đây như là Observable/Flowable, chúng ta biến đổi chúng và trả về kết quả là Observable/Flowable.

Có 5 loại operators:

  • Filtering/suppressing operators
  • Transforming operators
  • Reducing operators
  • Error handling operators
  • Utility operators

Tuy nhiên để đi hết thì khá là dài nên mình chỉ liệt kê một số operator mà hay gặp thôi nhé. Có thể sau này có thời gian thì mình sẽ tách chi tiết ra sau.

3.1. Filtering/suppressing operators

  • The debounce operator ()

Trì hoãn 1 khoảng thời gian sau mỗi emission, và loại bỏ emission trước đó nếu có 1 emission khác đc bắn ra trong khoảng thời gian trì hoãn và restart value trì hoãn

fun main(args: Array<String>) {
    createObservable() //(1) Tạo Observable
        .debounce(200, TimeUnit.MILLISECONDS)//(2) debounce 200 MILLISECONDS
        .subscribe {
            println(it)//(3)
        }
}

fun createObservable(): Observable<String> =
    Observable.create<String> {
        it.onNext("R")//(4)
        runBlocking { delay(100) }//(5) block thread current 100 MILLISECONDS
        it.onNext("Re")
        it.onNext("Reac")
        runBlocking { delay(130) }
        it.onNext("Reactiv")
        runBlocking { delay(140) }
        it.onNext("Reactive")
        runBlocking { delay(250) }//(6)
        it.onNext("Reactive P")
        runBlocking { delay(130) }
        it.onNext("Reactive Pro")
        runBlocking { delay(100) }
        it.onNext("Reactive Progra")
        runBlocking { delay(100) }
        it.onNext("Reactive Programming")
        runBlocking { delay(300) }
        it.onNext("Reactive Programming in")
        runBlocking { delay(100) }
        it.onNext("Reactive Programming in Ko")
        runBlocking { delay(150) }
        it.onNext("Reactive Programming in Kotlin")
        runBlocking { delay(250) }
        it.onComplete()
    }

Output

Reactive
Reactive Programming
Reactive Programming in Kotlin

Process finished with exit code 0
  • The distinct operators

Operator này khá đơn giản, nó giúp bạn lọc những emission bị duplicated từ upstream

fun main(args: Array<String>) {
 listOf(1,2,2,3,4,5,5,5,6,7,8,9,3,10)//(1)
 .toObservable()//(2)
 .distinct()//(3)
 .subscribe { println("Received $it") }//(4)
 }

Output:

Received 1
Received 2
Received 3
Received 4
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10

Process finished with exit code 0
  • Filtering emissions - filter operator

Filter operator được cho là được sử dụng nhiều nhất trong nhóm filtering/suppressing operator. Nó cho phép bạn implement custom logic để lọc emissions.

fun main(args: Array<String>) {
    Observable.range(1,20)//(1)
        .filter{//(2) lọc chỉ cho output ra là số chẵn
            it%2==0
        }
        .subscribe {
            println("Received $it")
        }
}

Output:

Received 2
Received 4
Received 6
Received 8
Received 10
Received 12
Received 14
Received 16
Received 18
Received 20

Process finished with exit code 0

Ngoài ra các operators trên bạn có thể tìm hiểu thêm distinctUntilChanged, elementAt, first, last, ignoreElements operator

3.2. Transforming operators

  • The map operator

map operator thực hiện 1 task trên từng emitted items từ upstream và emits chúng tới downstream map operator sẽ biến đổi emitted item type T -> type R bằng việc apply lambda function của nó. Dễ hiểu hơn thì nó dùng để chuyển đối 1 item thành 1 item khác.

fun main(args: Array<String>) {
    val observable = listOf(10, 9, 8, 7, 6, 5, 4, 3, 2, 1).toObservable()
    observable.map {
            number ->
        "Transforming Int to String $number"
    }.subscribe { item ->
        println("Received $item")
    }
}

Output:

Received Transforming Int to String 10
Received Transforming Int to String 9
Received Transforming Int to String 8
Received Transforming Int to String 7
Received Transforming Int to String 6
Received Transforming Int to String 5
Received Transforming Int to String 4
Received Transforming Int to String 3
Received Transforming Int to String 2
Received Transforming Int to String 1

Process finished with exit code 0
  • The flatMap operator

flatMap operator tạo mới 1 producer, apply function bạn pass đối với từng emission của source producer. FlatMap sẽ chuyển đổi các item phát ra bởi một Observable thành các Observable khác

fun main(args: Array<String>) {
    val observable = listOf(10,9,8,7,6,5,4,3,2,1).toObservable()

    observable.flatMap {
            number-> Observable.just("Transforming Int to String $number")
    }.subscribe {
            item-> println("Received $item")
    }

Nếu sử dụng như trên thì output giống với ví dụ của map operator.

fun main(args: Array<String>) {
    val observable = listOf(10,9,8,7,6,5,4,3,2,1).toObservable()

    observable.flatMap {
            number->
        Observable.create<String> {//(1)
            it.onNext("The Number $number")
            it.onNext("number/2: ${number/2}")
            it.onNext("number%2: ${number%2}")
            it.onComplete()//(2)
        }
    }.subscribeBy (
        onNext = {
                item-> println("Received $item")
        },
        onComplete = {
            println("Complete")
        }
    )
}

Output:

Received The Number 10
Received number/2: 5
Received number%2: 0
Received The Number 9
Received number/2: 4
Received number%2: 1
Received The Number 8
Received number/2: 4
Received number%2: 0
Received The Number 7
Received number/2: 3
Received number%2: 1
Received The Number 6
Received number/2: 3
Received number%2: 0
Received The Number 5
Received number/2: 2
Received number%2: 1
Received The Number 4
Received number/2: 2
Received number%2: 0
Received The Number 3
Received number/2: 1
Received number%2: 1
Received The Number 2
Received number/2: 1
Received number%2: 0
Received The Number 1
Received number/2: 0
Received number%2: 1
Complete

Process finished with exit code 0

Nếu các bạn để ý thì sẽ thấy điểm khác biệt chính giữa mapflatMapFlatMap bản thân nó sẽ trả về một Observable.

Ngoài map và flatMap ra thì các bạn có thể tìm hiểu thêm cast, defaultIfEmpty, switchIfEmpty, startWith, sorted, scan operator.

3.3. Reducing operators

  • Counting emissions (count operator)

count operator subscribes với producer, đếm the emissions, và emits 1 Single chứa số lượng của emissions được emit bởi producer.

fun main(args: Array<String>) {
    listOf("A", "B", "C", "D", "E").toObservable()
        .count()
        .subscribeBy { println("count $it") }
}

Output:

count 5

Process finished with exit code 0
  • Accumulating emissions – reduce operator

Reduce là một accumulating operator khá thú vị. Nó cộng dồn tất cả các emissions được emit từ producer.

fun main(args: Array<String>) {
     Observable.range(1,10)
     .reduce { previousAccumulation, newEmission ->
     previousAccumulation+newEmission }
     .subscribeBy { println("accumulation $it") }
     Observable.range(1,5)
     .reduce { previousAccumulation, newEmission ->
     previousAccumulation*10+newEmission }
     .subscribeBy { println("accumulation $it") }
 }

Output:

accumulation 55
accumulation 12345

Process finished with exit code 0

3.4. Error handling operators

Chúng ta đều biết về onError event ở Subscriber/Observer. Tuy nhiên, vấn đề là onError event chỉ emitted tới downstream consumer và subscription ngay lập tức vị chấm dứt. Vì vậy, trong trường hợp chúng ta muốn handle error khi value đang được emit ở upstream thì sao? Thật may mắn là chúng ta có các handling operators.

  • onErrorReturn – return a default value on error

onErrorReturn sẽ return 1 default value tới downstream khi một error occurred in the upstream.

fun main(args: Array<String>) {
Observable.just(1, 2, 3, 4, 5)
        .map { it / (3 - it) }
        .onErrorReturn { -1 } //onErrorReturn – return a default value on error
        .subscribe {
            println("Received $it")
        }
}

Output:

Received 0
Received 2
Received -1

Process finished with exit code 0
  • The onErrorResumeNext operator

onErrorResumeNext operator giúp bạn subscribe với một producer khác khi có error xảy ra.

fun main(args: Array<String>) {
Observable.just(1, 2, 3, 4, 5)
        .map { it / (3 - it) }
        .onErrorResumeNext{Observable.range(10,5)}//(1) subscribe the another observable.
        .subscribe {
            println("Received $it")
        }
}

Output:

Received 0
Received 2
Received 10
Received 11
Received 12
Received 13
Received 14

Process finished with exit code 0
  • Retrying on error

retry operator cũng là một error handling operator khác cho phép bạn retry/resubscribe tới producer khi 1 error xảy ra. Bạn chỉ cần cung cấp số lần retry và điều kiện retry.

fun main(args: Array<String>) {
    Observable.just(1, 2, 3, 4, 5)
        .map { it / (3 - it) }
        .retry(3)//(1)
        .subscribeBy(
            onNext = { println("Received $it") },
            onError = { println("Error") }
        )
        
        
    println("\n With Predicate \n")
    var retryCount = 0
    Observable.just(1, 2, 3, 4, 5)
        .map { it / (3 - it) }
        .retry {//(2)
                _, _ ->
            (++retryCount) < 3
        }
        .subscribeBy(
            onNext = { println("Received $it") },
            onError = { println("Error") }
        )
}

Output:

Received 0
Received 2
Received 0
Received 2
Received 0
Received 2
Received 0
Received 2
Error

 With Predicate 

Received 0
Received 2
Received 0
Received 2
Received 0
Received 2
Error

Process finished with exit code 0

3.5. Utility operators

Dưới đây là list utility operators:

  • doOnNext, doOnComplete, and doOnError
  • doOnSubscribe, doOnDispose, and doOnSuccess
  • serialize
  • cache

Chúng sẽ giúp chúng ta thực hiện đa dạng các utility operations như là thực hiện action trên các emission (doOnNext), ghi nhớ timestamps, caching, ...

4. Schedulers

  • What is a scheduler? Nói là RxKotlin giúp xử lý bất đồng bộ, từ nãy giờ chưa thấy bất đồng bộ ở đâu nhỉ. Scheduler chính là cái giúp chúng ta làm điều đó. Scheduler có thể được hiểu giống như thread pool. ReactiveX có thể pool 1 thread và execute task trên thread đó.

  • Types of scheduler

    • Schedulers.io(): Khi dùng cái này thì sẽ không dùng đến CPU, nó thực hiện các công việc chuyên sâu như networks call, đọc đĩa/file, database, … Nó duy trì được pool của thread.
    • Schedulers.computation(): Có thể đòi hỏi đến đòi hỏi nhiều CPU như xử lý dữ liệu lớn, xử lý bitmap, … Số lượng các thread được tạo ra bằng cách sử dụng Scheduler này hoàn toàn phụ thuộc vào số lõi CPU.
    • Schedulers.newThread(): Sử dụng cái này thì mỗi thread sẽ được tạo ra mỗi lần nhiệm vụ được xếp lịch. Thường thì không khuyến cáo sử dụng cách này trừ khi công việc rất dài. Thread được tạo qua newThread() sẽ không được dùng lại.
    • Schedulers.single(): Scheduler này sẽ thực hiện tất cả các nhiệm vụ theo thứ tự tuần tự mà chúc được add vào. Việc này có thể cần thiết trong một số trường hợp cần tuần tự.
    • Schedulers.trampoline(): Nó thực hiện các nhiệm vụ theo Last In - First Out. Tất cả các nhiệm vụ được xếp lịch sẽ được thực hiện từng cái một bằng cách giới hạn số lượng các background thread thành một.
    • Schedulers.from(): Cách này cho phép tạo ra một Scheduler từ một Executor bởi giới hạn số lượng các thread được tạo ra. Khi thread pool bị full, các nhiệm vụ sẽ xếp hàng đợi.
fun main(args: Array<String>) {
    Observable.range(1, 10)
        .subscribeOn(Schedulers.computation())//(1) chúng ta sẽ lựa chọn Scheduler phù hợp khi subscribeOn
        .subscribe {
            runBlocking { delay(200) }
            println("${Thread.currentThread().name} - Observable1 Item Received $it")
        }
    Observable.range(21, 10)
        .subscribeOn(Schedulers.computation())//(2)
        .subscribe {
            runBlocking { delay(100) }
            println("${Thread.currentThread().name} - Observable2 Item Received $it")
        }
    runBlocking { delay(2100) }//(3)
}

Output:

RxComputationThreadPool-2 - Observable2 Item Received 21
RxComputationThreadPool-1 - Observable1 Item Received 1
RxComputationThreadPool-2 - Observable2 Item Received 22
RxComputationThreadPool-2 - Observable2 Item Received 23
RxComputationThreadPool-1 - Observable1 Item Received 2
RxComputationThreadPool-2 - Observable2 Item Received 24
RxComputationThreadPool-2 - Observable2 Item Received 25
RxComputationThreadPool-1 - Observable1 Item Received 3
RxComputationThreadPool-2 - Observable2 Item Received 26
RxComputationThreadPool-2 - Observable2 Item Received 27
RxComputationThreadPool-1 - Observable1 Item Received 4
RxComputationThreadPool-2 - Observable2 Item Received 28
RxComputationThreadPool-2 - Observable2 Item Received 29
RxComputationThreadPool-1 - Observable1 Item Received 5
RxComputationThreadPool-2 - Observable2 Item Received 30
RxComputationThreadPool-1 - Observable1 Item Received 6
RxComputationThreadPool-1 - Observable1 Item Received 7
RxComputationThreadPool-1 - Observable1 Item Received 8
RxComputationThreadPool-1 - Observable1 Item Received 9
RxComputationThreadPool-1 - Observable1 Item Received 10

Process finished with exit code 0

Nội dung bài viết cũng tương đối dài rồi nên mình xin dừng lại tại đây. Mình mong là qua bài này mọi người có cái nhìn tổng quan về RxKotlin. Cảm ơn các bạn đã đọc bài viết.


All Rights Reserved

Viblo
Let's register a Viblo Account to get more interesting posts.