Cùng học Kotlin Coroutine, phần 8: Flow (part 1 of 3)
Bài đăng này đã không được cập nhật trong 5 năm
1. Giới thiệu Flow trong Kotlin Coroutine
Flow về cơ bản khá giống Sequences trong Kotlin nhưng khác ở chỗ Sequences xử lý đồng bộ còn Flow xử lý bất đồng bộ. Nếu bạn chưa biết về Sequences thì khái niệm này khiến bạn khá khó hiểu đúng hơm  . Vậy nên trước tiên mình sẽ nói đôi chút về
. Vậy nên trước tiên mình sẽ nói đôi chút về Collections và Sequences trong Kotlin.
Collections vs Sequences vs Flow
Mình sẽ sử dụng Collections vs Sequences vs Flow cùng đưa ra lời giải cho một bài toán: Build hàm foo() in ra 3 số 1, 2, 3 có thời gian delay và đo thời gian thực hiện của hàm foo. Qua đó các bạn sẽ dễ thấy sự khác biệt giữa Collections vs Sequences vs Flow.
Bắt đầu với Collections, đại diện trong ví dụ này là List
suspend fun foo(): List<Int> {
    val list = mutableListOf<Int>()
    for (i in 1..3) {
        delay(1000)
        list.add(i)
    }
    return list
}
fun main() = runBlocking {
    val time = measureTimeMillis {
        foo().forEach { value -> println(value) }
    }
    println(time)
}
Output (ảnh gif):

Còn đây là khi sử dụng Sequences
fun foo(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(1000)
        yield(i)
    }
}
fun main() = runBlocking {
    val time = measureTimeMillis {
        foo().forEach { value -> println(value) }
    }
    println(time)
}
Output (ảnh gif):

Các bạn đã thấy sự khác nhau chưa  . 2 Output được in ra là giống nhau và thời gian thực hiện cũng bằng nhau, đều là 3 giây, nhưng khác ở chỗ thằng
. 2 Output được in ra là giống nhau và thời gian thực hiện cũng bằng nhau, đều là 3 giây, nhưng khác ở chỗ thằng List nó đợi add xong cả 3 phần tử rồi mới in ra, còn trong ví dụ Sequence thì cứ mỗi giây thì có phần tử được yield và phần tử đó lập tức được in ra ngay mà không phải đợi yield xong cả 3 phần tử.
Còn đây là Flow:
fun foo(): Flow<Int> = flow {
    // flow builder
    for (i in 1..3) {
        delay(1000)
        emit(i) // emit next value
    }
}
fun main() = runBlocking {
    // Collect the flow
    val a = measureTimeMillis {
        foo().collect { value -> println(value) }
    }
    println(a)
}
Output (ảnh gif):

Về cơ bản, Flow khá giống Sequence đúng không nào, thay vì sử dụng hàm yield thì Flow sử dụng hàm emit và nhận các giá trị qua hàm collect. Các bạn chưa cần phải hiểu các đoạn code ở trên về Flow vì mình sẽ giải thích ở phía dưới trong cùng bài viết này.
Ở đầu bài viết, mình có nói là:  "Flow về cơ bản khá giống Sequences trong Kotlin nhưng khác ở chỗ Sequences xử lý đồng bộ còn Flow xử lý bất đồng bộ". Bây giờ chúng ta sẽ đi làm rõ sự khác nhau này nhé.
Flow vs Sequences
Sequence block main thread:
fun foo(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(1000)
        yield(i) // yield next value
    }
}
fun main() = runBlocking {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        println(Thread.currentThread().name)
        for (k in 1..3) {
            delay(1000)
            println("I'm blocked $k")
        }
    }
    val time = measureTimeMillis {
        foo().forEach { value -> println(value) }
    }
    println("$time s")
}
Output (ảnh gif):

Mình có launch một coroutine trên main thread để kiểm tra liệu main thread có bị block không. Mình có dùng Thread.currentThread().name để in ra chữ main để chắc chắn rằng coroutine chạy trên main thread. Các bạn chú ý là coroutine chạy trên main thread nhưng nó không block main thread nhé, đây là đặc điểm của coroutine mà mình đã giới thiệu ở phần 2. Do đó coroutine và hàm foo sẽ chạy song song. Và kết quả cho ta thấy rằng hàm foo chứa Sequence đã block main thread, vì vậy mà 3 dòng I'm blocked đã phải chờ Sequence in hết 3 giá trị ra trước rồi mới đến lượt nó được in ra.
Vậy khi sử dụng Flow thì sao:
fun foo(): Flow<Int> = flow {
    // flow builder
    for (i in 1..3) {
        delay(1000)
        emit(i) // emit next value
    }
}
fun main() = runBlocking {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        println(Thread.currentThread().name)
        for (k in 1..3) {
            delay(900)
            println("I'm not blocked $k")
        }
    }
    // Collect the flow
    val time = measureTimeMillis {
        foo().collect { value -> println(value) }
    }
    println("$time s")
}
Output (ảnh gif):

Tương tự đoạn code ví dụ Sequence, mình cũng launch một coroutine trên main thread để kiểm tra liệu main thread có bị block không. Và kết quả cho ta thấy rằng Flow không block main thread, bằng chứng là các số 1, 2, 3 được in ra song song với I'm not blocked.
Tóm lại: Sequence xử lý đồng bộ. Nó sử dụng Iterator và block main thead trong khi chờ đợi item tiếp theo được yield. Flow xử lý bất đồng bộ. Nó sử dụng một suspend function collect để không block main thread trong khi chờ đợi item tiếp theo được emit.
Flow
Bây giờ, mình sẽ giải thích các dòng code mà mình đã sử dụng để ví dụ về Flow:
- Khối flow { }là một builder function giúp ta tạo ra 1 đối tượngFlow.
- Code bên trong flow { ... }có thể suspend, điều này có nghĩa là chúng ta có thể gọi các suspend function trong khốiflow { }. Vì vậy functionfoo()gọi khốiflow { }không cần thiết phải là suspend function nữa.
- Hàm emitdùng để emit các giá trị từFlow. Hàm này là suspend function
- Hàm collectdùng để get giá trị được emit từ hàmemit. Hàm này cũng là suspend function.
2. Flow là nguồn dữ liệu lạnh
Các Flow là các luồng lạnh (cold streams) tương tự như các Sequences. Điều đó có nghĩa là code bên trong flow { } sẽ không chạy cho đến khi Flow gọi hàm collect.
fun foo(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
fun main() = runBlocking<Unit> {
    println("Calling foo...")
    val flow = foo()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}
Output:
Calling foo...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
Chúng ta có thể thấy mặc dù gọi hàm foo() nhưng code trong Flow vẫn không chạy. Cho đến khi Flow gọi hàm collect thì code trong Flow mới chạy và code đó sẽ chạy lại khi chúng ta gọi lại hàm collect.
3. Flow cancellation
Flow tuân thủ việc các nguyên tắc cancellation chung của coroutines (xem lại phần 4). Việc collect của flow chỉ có thể bị hủy khi và chỉ khi flow đang bị suspend (chẳng hạn như gặp hàm delay) và ngược lại flow không thể bị hủy.
Đoạn code dưới đây sẽ cho các bạn thấy flow bị cancel khi hết thời gian timeout. Ta sử dụng hàm withTimeoutOrNull
fun foo(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(2000)          
        println("Emitting $i")
        emit(i)
    }
}
fun main() = runBlocking {
    withTimeoutOrNull(5000) { // Timeout after 5s 
        foo().collect { value -> println(value) } 
    }
    println("Done")
}
Output:
Emitting 1
1
Emitting 2
2
Done
Trong 4 giây đầu tiên, số 1 và số 2 được in ra. Đến giây thứ 5, đã hết thời gian timeout mà flow đang bị suspend vì hàm delay(2000) (còn 1 giây nữa tức là đến giây thứ 6 thì flow mới hết suspend) nên flow bị cancel và số 3 không được in ra.
Bây giờ mình sẽ thay hàm delay bằng hàm Thread.sleep để kiểm tra liệu flow không thể bị hủy khi nó không suspend?
fun foo(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(2000)
        println("Emitting $i")
        emit(i)
    }
}
fun main() = runBlocking {
    withTimeout(1000) { // Timeout after 1s
        foo().collect { value -> println(value) }
    }
    println("Done")
}
Output:
Emitting 1
1
Emitting 2
2
Emitting 3
3
Done
Như các bạn thấy, flow vẫn in ra cả 3 số 1, 2, 3 mặc dù đã hết thời gian timeout là 1 giây. Vậy, flow không thể bị cancel khi đang chạy hay nói các khác là khi nó không ở trạng thái suspend.
4. Các cách tạo ra Flow
Ngoài cách sử dụng khối flow { } như các đoạn code trên mình đã sử dụng để tạo ra một Flow thì còn có những cách khác để tạo ra đối tượng Flow như:
Hàm flowOf
public fun <T> flowOf(vararg elements: T): Flow<T>
Code ví dụ:
fun main() = runBlocking {
    val data = flowOf(1,"abc", 3.4, "def")
    data.collect { println(it) }
}
Output:
1
abc
3.4
def
.asFlow() extension function
Các Collections, Arrays, Sequences hay một kiểu T gì đó đều có thể convert sang Flow thông qua extension function là asFlow(). Hình dưới đây liệt kê đầy đủ các extension function asFlow()
 Code ví dụ:
Code ví dụ:
fun main() = runBlocking {
    listOf(1, "abc", 3.4, "def").asFlow().collect { println(it) }
}
Output:
1
abc
3.4
def
Kết luận
Flow thật sự là một thứ rất powerful trong Kotlin Coroutine. Hy vọng qua bài viết này, các bạn đã hiểu biết phần nào đó về Flow. Trong phần tiếp theo, mình sẽ giới thiệu sức mạnh thật sự của nó - đó chính là các toán tử (operators). Flow có rất nhiều toán tử không thua kém gì Rx đâu nha  . Cảm ơn các bạn đã theo dõi bài viết này. Hy vọng các bạn sẽ tiếp tục theo dõi những phần tiếp theo.
. Cảm ơn các bạn đã theo dõi bài viết này. Hy vọng các bạn sẽ tiếp tục theo dõi những phần tiếp theo. 
Nguồn tham khảo:
https://kotlinlang.org/docs/reference/coroutines/flow.html
Đọc lại những phần trước:
Cùng học Kotlin Coroutine, phần 1: Giới thiệu Kotlin Coroutine và kỹ thuật lập trình bất đồng bộ
Cùng học Kotlin Coroutine, phần 2: Build first coroutine with Kotlin
Cùng học Kotlin Coroutine, phần 3: Coroutine Context và Dispatcher
Cùng học Kotlin Coroutine, phần 4: Job, Join, Cancellation and Timeouts
Cùng học Kotlin Coroutine, phần 5: Async & Await
Cùng học Kotlin Coroutine, phần 6: Coroutine Scope
Đọc tiếp phần 9: Cùng học Kotlin Coroutine, phần 9: Flow (part 2 of 3)
All rights reserved
 
  
 