Cùng học Kotlin Coroutine, phần 9: Flow (part 2 of 3)

1. Toán tử trong Flow

Nếu bạn chưa biết Flow là gì, bạn có thể tham khảo phần 1 của bài viết này tại đây. Bài viết này, mình sẽ tập trung khai thác sức mạnh thật sự của Flow, đó là các toán tử (operators). Thời điểm viết bài này, mình đang sử dụng kotlin coroutine version 1.3.3. Vậy các bạn cần update tối thiểu đến version này để đảm bảo có đầy đủ các toán tử trong bài viết này.

Toán tử take()

Sử dụng toán tử take() nếu bạn muốn nguồn thu lấy một lượng giới hạn các phần tử được phát ra từ nguồn phát. Ví dụ:

fun numbers(): Flow<Int> = flow {
    try {
        emit(1)
        emit(2)
        println("This line will not execute")
        emit(3)
    } catch (e: CancellationException) {
        println("exception")
    } finally {
        println("close resource here")
    }
}

fun main() = runBlocking {
    numbers()
        .take(2) // take only the first two
        .collect { value -> println(value) }
}

Output:

1
2
exception
close resource here

Đoạn code trên mình chỉ lấy 2 phần tử từ nguồn phát bằng hàm take(2) nên sau khi nguồn phát emit được 2 phần tử đầu, nó lập tức throw CancellationException. Vì vậy câu This line will not execute và phần tử 3 không được in ra. Mặc dù vậy, code trong khối finally vẫn được thực thi, tính năng này rất cần thiết khi bạn muốn close resource.

Toán tử transform()

Toán tử này được dùng để biến đổi giá trị được phát ra từ nguồn phát trước khi emit cho nguồn thu nhận nó. Ngoài công dụng chính là để biến đổi phần tử, nó còn có các công dụng khác như nguồn thu có thể bỏ qua (skip) các giá trị mà nó không muốn nhận từ nguồn phát hoặc chúng ta có thể emit một giá trị nhiều hơn một lần (có nghĩa là phát 10 giá trị nhưng nhận có thể tới 20 giá trị). Ví dụ dưới đây mình có một nguồn phát có nhiệm vụ phát ra data từ số 1 đến số 9 và mong muốn của mình là nguồn thu bỏ qua các giá trị lẻ, và đối với giá trị chẵn thì biến đổi chúng thành các lũy thừa bậc 2, bậc 3 của chính nó.

fun main() = runBlocking {
    (1..9).asFlow() // a flow of requests
        .transform { value ->
            if (value % 2 == 0) { // Emit only even values, but twice
                emit(value * value)
                emit(value * value * value)
            } // Do nothing if odd
        }
        .collect { response -> println(response) }
}

Output:

4
8
16
64
36
216
64
512

Vậy với 4 giá trị chẵn (2, 4, 6, 8), mỗi giá trị chẵn mình emit 2 lần nên nguồn thu sẽ nhận được tới 8 giá trị như output.

Toán tử map()

/**
 * Returns a flow containing the results of applying the given [transformer] function to each value of the original flow.
 */
@FlowPreview
public fun <T, R> Flow<T>.map(transformer: suspend (value: T) -> R): Flow<R> = transform { value -> emit(transformer(value)) }

Đoạn code toán tử map được trích dẫn từ thư viện Kotlin Coroutine. Nhìn code trên cũng có thể thấy được toán tử map có quan hệ với toán tử transform. Vậy công dụng chính của nó cũng là để biến đổi phần tử nhận được như toán tử transform nhưng khác ở chỗ: toán tử transform cho phép ta skip phần tử hoặc emit một phần tử nhiều lần còn toán tử map thì không thể skip hay emit multiple times. Với mỗi phần tử nhận được từ nguồn phát, nguồn thu sẽ xử lý biến đổi và emit một và chỉ một giá trị cho nguồn thu (tức là phát 1 thì thu 1, phát 10 thì thu 10).

nguồn phát -> toán tử map biến đổi giá trị -> nguồn thu

fun main() = runBlocking {
    (1..3).asFlow()
        .map { it * it } // squares of numbers from 1 to 5
        .collect { println(it) }
}

Output:

1
4
9

Ví dụ code trên mình phát ra 3 giá trị, nguồn thu sẽ nhận được đúng 3 giá trị sau khi được biến đổi từ toán tử map

Toán tử filter()

Toán tử này giúp chúng ta filter lọc ra các giá trị thỏa mãn điều kiện và bỏ qua các giá trị không thỏa mãn điều kiện từ nguồn phát. Ví dụ mình muốn lọc ra các giá trị chẵn:

fun main() = runBlocking {
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0
        }.collect {
            println("Collect $it")
        }
}

Output:

Filter 1
Filter 2
Collect 2
Filter 3
Filter 4
Collect 4
Filter 5

Dựa vào output chúng ta có thể thấy có 5 phần tử từ flow chạy vào hàm filter nhưng chỉ có 2 phần tử được collect24.

Ở đây chúng ta thấy công dụng lọc này giống với công dụng của hàm transform. Đúng vậy, hàm filter thực chất cũng sử dụng hàm transform nên hàm transform cũng có thể lọc phần tử y hệt hàm filter.

/**
 * Returns a flow containing only values of the original flow that matches the given [predicate].
 */
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
    if (predicate(value)) return@transform emit(value)
}

Toán tử onEach()

Toán tử này dùng khi ta muốn thực hiện một action gì đó trước khi value từ flow được emit.

/**
 * Returns a flow which performs the given [action] on each value of the original flow.
 */
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
    action(value)
    return@transform emit(value)
}

Ví dụ mình muốn mỗi phần tử bị delay 3s trước khi được emit ra.

fun main() = runBlocking {
    val nums = (1..3).asFlow().onEach { delay(3000) } // numbers 1..3 every 300 ms
    val startTime = System.currentTimeMillis()
    nums.collect { value ->
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

Output:

1 at 3006 ms from start
2 at 6008 ms from start
3 at 9009 ms from start

Dựa vào output có thể thấy mỗi phần tử bị delay 3s trước khi được emit ra.

Toán tử reduce()

Hàm reduce cực hữu ích khi chúng ta cần tính tổng cộng dồn tất cả giá trị được phát ra từ nguồn phát. Ví dụ:

fun main() = runBlocking {
    val sum = (1..3).asFlow()
        .map { it * it } // squares of numbers from 1 to 5
        .reduce { a, b -> a + b } // sum them
    println(sum)
}

Output:

14

Đoạn code trên mình phát 3 giá trị là 1, 2, 3. Sau đó qua hàm map để bình phương giá trị đó lên thành 1, 4, 9. Sau đó hàm reduce sẽ cộng dồn 3 giá trị này lại 1 + 4 + 9 = 14 và mình in cái tổng này ra như output.

Mổ xẻ ra xem toán tử reduce có gì trong đó.

/**
 * Accumulates value starting with the first element and applying [operation] to current accumulator value and each element.
 * Throws [UnsupportedOperationException] if flow was empty.
 */
@FlowPreview
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

Đầu tiên dễ thấy hàm reduce không trả về Flow nên chúng ta không cần collect. Nó chỉ trả về đúng 1 giá trị sau khi cộng dồn tất cả giá trị từ nguồn phát. Chúng ta sẽ chạy thử 1 đoạn code nữa để xem cách nó hoạt động thế nào:

fun main() = runBlocking {
    val sum = listOf("a", "b", "c", "d", "e").asFlow()
        .reduce { a, b ->
            println("Tổng đã tích lũy: $a")
            println("Giá trị mới: $b")
            a + b }
    println("Kết quả = $sum")
}

Output:

Tổng đã tích lũy: a
Giá trị mới: b
Tổng đã tích lũy: ab
Giá trị mới: c
Tổng đã tích lũy: abc
Giá trị mới: d
Tổng đã tích lũy: abcd
Giá trị mới: e
Kết quả = abcde

Mình đã in ra 2 param ab trong biểu thức lambda của hàm reduce. Nhìn vào output: dễ dàng thấy a chính là tổng tất cả giá trị đã tích lũy tính đến thời điểm nhận giá trị mới là b. Và nó tiếp tục cộng b vào và chạy tiếp cho đến khi hết giá trị.

Toán tử fold()

Toán tử này khá giống toán tử reduce(). Nó cũng có chức năng chính là tính tổng, tuy nhiên nó khác ở chỗ hàm reduce tính tổng từ con số 0 còn hàm fold tính tổng từ một giá trị được cho trước.

fun main() = runBlocking {
    val sum = (1..3).asFlow()
        .fold(initial = 10) { a, b -> // mình cho giá trị khởi tạo ban đầu là 10
            println("Tổng đã tích lũy: $a đồng")
            println("Giá trị mới: $b đồng")
            a + b } // sum them (terminal operator)
    println("Kết quả = $sum đồng")
}

Output:

Tổng đã tích lũy: 10 đồng
Giá trị mới: 1 đồng
Tổng đã tích lũy: 11 đồng
Giá trị mới: 2 đồng
Tổng đã tích lũy: 13 đồng
Giá trị mới: 3 đồng
Kết quả = 16 đồng

Vậy cái tổng này ban đầu đã được mình cho 10 đồng rồi, bây giờ nó tính cộng dồn thêm 1 đồng, 2 đồng3 đồng nữa thì kết quả cuối cùng được 16 đồng (10 + 1 + 2 + 3)

Toán tử toList(), toSet()

Toán tử này giúp chúng ta convert một flow thành một ArrayList hoặc LinkedHashSet

fun main() = runBlocking {
    val list: List<String> = listOf("a", "b", "c", "d", "e").asFlow().toList()
    val set: Set<Int> = (1..5).asFlow().toSet()
    println("${list.javaClass} $list")
    println("${set.javaClass} $set")
}

Output:

class java.util.ArrayList [a, b, c, d, e]
class java.util.LinkedHashSet [1, 2, 3, 4, 5]

Toán tử first()

Toán tử này giúp chúng ta get ra phần tử đầu tiên trong flow

fun main() = runBlocking {
    val a: Int = listOf(1, 3, 5, 7, 2, 6, 8, 4).asFlow().first()
    println(a)
}

Output:

1

Nếu chúng ta muốn lấy ra phần tử đầu tiên trong flow thỏa mãn một điều kiện nào đó. Hãy thử hàm first { }. Ví dụ chúng ta muốn get ra số chẵn đầu tiên trong flow:

fun main() = runBlocking {
    val a: Int = listOf(1, 3, 5, 7, 2, 6, 8, 4).asFlow().first { it % 2 == 0 } // in ra số chẵn đầu tiên
    println(a)
}

Output:

2

Cả hàm first()first { } đều throw NoSuchElementException nếu nó không get được phần tử nào (ví dụ trường hợp flow không có phần tử nào hoặc trong flow không có phần tử nào thỏa mãn điều kiện)

Toán tử single(), singleOrNull()

Toán tử single để check chắc chắn rằng nguồn flow chỉ có một phần tử và nó sẽ return giá trị đó. Trường hợp flow có nhiều hay ít hơn 1 phần tử đều bị throw Exception.

fun main() = runBlocking {
    val a: Int = listOf(10).asFlow().single() // trả về 10
    listOf(1, 2).asFlow().single() // throw IllegalStateException vì có nhiều hơn 1 phần tử
    listOf<Int>().asFlow().single() // throw IllegalStateException vì có ít hơn 1 phần tử
    println(a) // in ra 10
}

Để tránh bị throw Exception chúng ta có thể sử dụng toán tử singleOrNull(). Toán tử này sẽ trả về null nếu flow không có phần tử nào. Trường hợp flow có nhiều hơn một phần tử nó vẫn throw Exception như thường 😄

fun main() = runBlocking {
    val a: Int? = listOf(10).asFlow().singleOrNull() // trả về 10
    val b: Int? = listOf<Int>().asFlow().singleOrNull() // trả về null vì có ít hơn 1 phần tử
    listOf(1, 2).asFlow().singleOrNull() throw Exception vì có nhiều hơn 1 phần tử
    println(a.toString()) // in ra 10
    println(b.toString()) // in ra null
}

Toán tử zip()

Toán tử này dùng để zip 2 flow lại (giống như hàm zip trong Sequence hay List). Có nghĩa là nó sẽ lấy 1 phần tử bên flowA và 1 phần tử bên flowB để kết hợp lại tạo ra một phần tử mới.

fun main() = runBlocking<Unit> {
    val nums = (1..3).asFlow() // numbers 1..3
    val strs = flowOf("one", "two", "three") // strings
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { println(it) } // collect and print
}

Output:

1 -> one
2 -> two
3 -> three

Như vậy nó đã lấy 1 của flow nums kết hợp với one của flow strs để cho ra phần tử 1 -> one. Tương tự cho ra 2 -> two, 3 -> three

Toán tử combine()

Toán tử combine cũng tương tự như toán tử zip. Có nghĩa là nó cũng sẽ lấy 1 phần tử bên flowA và 1 phần tử bên flowB để kết hợp lại tạo ra một phần tử mới. Nhưng có 1 sự khác nhau giữa combinezip. Mình sẽ dùng 2 đoạn code để demo zip và demo combine để dễ dàng phân biệt.

Sử dụng zip

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 

Output:

1 -> one at 466 ms from start
2 -> two at 866 ms from start
3 -> three at 1266 ms from start

Toán tử onEach đã được mình giới thiệu ở phần trên của bài viết này.

Ở đây mình có 2 flownumsstrs. Flow nums delay 300ms trước khi emit phần tử, flow strs delay 400ms trước khi emit phần tử. Rõ ràng flow nums sẽ emit các phần tử ra sớm hơn flow strs nhưng hàm zip đã chờ đến khi strs emit ra phần tử rồi mới tiến hành kết hợp chúng lại. Vì vậy mà các giá trị sau khi kết hợp được in ra lần lượt sau 400ms 800ms 1200ms

Sử dụng combine

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
val startTime = System.currentTimeMillis() // remember the start time 
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 

Output:

1 -> one at 472 ms from start
2 -> one at 669 ms from start
2 -> two at 872 ms from start
3 -> two at 970 ms from start
3 -> three at 1273 ms from start

Nhìn vào output có thể thấy: 1 -> one at 472 ms from start được in ra sau 400ms. Vậy combine cũng đợi flow numsstrs emit phần tử đầu tiên xong rồi kết hợp chúng lại. Đến đây vẫn giống hàm zip 😄. Tuy nhiên ta thấy dòng thứ 2 2 -> one at 669 ms from start được in ra sau 600ms, có nghĩa là ngay sau khi nums emit phần tử thứ 2, lúc này strs chưa emit phần tử thứ 2 nhưng hàm combine đã kết hợp phần tử thứ 2 của nums và phần tử thứ nhất của strs để cho ra 2 -> one. Đây chính là sự khác nhau giữa combinezip.

Toán tử flatMapConcat(), flatMapMerge(), flatMapLatest()

Công dụng của các toán tử flatMap này đều dùng để xử lý bài toán sau: Giả sử chúng ta có rất nhiều flow là flowA, flowB, flowC, flowD,.... flowA emit data sang cho flowB, flowB nhận và tiếp tục xử lý data đó rồi emit nó sang flowC, cứ như vậy cho đến flow cuối cùng. 3 toán tử này đều là flatMap nên đều được dùng trong bài toán trên, mình sẽ so sánh sự khác nhau của nó bằng 3 đoạn code. Ví dụ chung mình đưa ra cho cả 3 toán tử là: Có một flowA sẽ emit 3 giá trị là số 1, số 2 và số 3 sang cho 1 flowB khác, trước khi nó emit nó bị delay 100ms. Với mỗi giá trị mà flowB nhận được từ flowA, flowB sẽ xử lý và emit ra 2 giá trị FirstSecond và có delay 500ms giữa 2 lần emit.

flatMapConcat

fun requestFlow(i: Int): Flow<String> = flow { // Đây là flowB
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    // Dưới đây là flowA
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapConcat { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

Output:

1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start

Nhìn vào các mốc thời gian 100ms (do delay 100ms trong flowA), 600ms (do delay 500ms tiếp theo trong flowB), 700ms (delay 100ms tiếp theo), 1200ms (delay 500ms tiếp theo), 1300ms (delay 100ms tiếp theo), 1800ms (delay 500ms tiếp theo). Vậy toán tử này sẽ chờ đợi đến khi flowB hoàn thành cả 2 emit rồi mới bắt đầu collect data tiếp theo từ flowA.

flatMapMerge

fun requestFlow(i: Int): Flow<String> = flow { // Đây là flowB
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
    // Dưới đây là flowA
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapMerge { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

Output:

1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start // 500ms sau kể từ khi phần tử first được emit
2: Second at 732 ms from start // 500ms sau kể từ khi phần tử first được emit
3: Second at 833 ms from start // 500ms sau kể từ khi phần tử first được emit

Dựa vào các mốc thời gian trong output. Dễ thấy toán tử này collect tất cả các luồng đến và hợp nhất các giá trị của chúng thành một luồng duy nhất để các giá trị được phát ra càng sớm càng tốt. Toán từ này nó không đợi flowB emit xong phần tử Second như flatMapConcat mà nó tiếp tục collect tiếp từ flowA. Vậy nên 300ms đầu tiên, cả 3 phần tử First được in ra trước. delay thêm 500ms sau thì các toán tử Second mới được in ra.

flatMapLatest

fun requestFlow(i: Int): Flow<String> = flow { // Đây là flowB
    emit("$i: First") 
    delay(500) // wait 500 ms
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
     // Dưới đây là flowA
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}

Output:

1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start

flatMapLatest đã hủy tất cả code trong khối của nó flowB khi nó gặp delay trong flowB và tiếp tục collect data từ flowA. Như vậy sau khi flowA emit ra số 1, số 1 sẽ vào tới flowB gặp delayflowA đang rất nóng vội để emit tiếp phần tử thứ 2 nên flowB sẽ bị hủy ngay tại đó. flowA tiếp tục emit tiếp số 2, số 2 lại vào tới flowB gặp delay nên flowB bị hủy ngay tại đó. flowA tiếp tục emit tiếp số 3 cũng là phần tử cuối cùng, nó lại vào tới flowB gặp delay nhưng nó không bị hủy vì flowA đã emit ra phần tử cuối cùng rồi, ko thể emit thêm được nữa.

Chính hàm delay là nguyên nhân khiến cho flowB bị hủy. Vậy nên khi chúng ta bỏ hàm delay đi thì flowB sẽ không thể bị hủy.

fun requestFlow(i: Int): Flow<String> = flow { // Đây là flowB
    emit("$i: First")  // đã xóa hàm delay(500)
    emit("$i: Second")    
}

fun main() = runBlocking<Unit> { 
    val startTime = System.currentTimeMillis() // remember the start time 
     // Dưới đây là flowA
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
        .flatMapLatest { requestFlow(it) }                                                                           
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        } 
}
1: First at 180 ms from start
1: Second at 180 ms from start
2: First at 281 ms from start
2: Second at 281 ms from start
3: First at 382 ms from start
3: Second at 382 ms from start

Chúng ta thấy cả 6 dòng được in ra, không dòng nào bị hủy.

Kết luận

Hy vọng qua bài viết này, các bạn đã nắm được các toán tử cơ bản về Flow. Trong phần tiếp theo, mình sẽ giới thiệu context và xử lý exception trong Flow. Cảm ơn các bạn đã theo dõi bài viết này. Hy vọng các bạn sẽ tiếp tục theo dõi những phần tiếp theo. 😄

Nguồn tham khảo:

https://kotlinlang.org/docs/reference/coroutines/flow.html

Đọc lại những phần trước:

Cùng học Kotlin Coroutine, phần 1: Giới thiệu Kotlin Coroutine và kỹ thuật lập trình bất đồng bộ

Cùng học Kotlin Coroutine, phần 2: Build first coroutine with Kotlin

Cùng học Kotlin Coroutine, phần 3: Coroutine Context và Dispatcher

Cùng học Kotlin Coroutine, phần 4: Job, Join, Cancellation and Timeouts

Cùng học Kotlin Coroutine, phần 5: Async & Await

Cùng học Kotlin Coroutine, phần 6: Coroutine Scope

Cùng học Kotlin Coroutine, phần 7: Xử lý Exception trong Coroutine, Supervision Job & Supervision Scope

Cùng học Kotlin Coroutine, phần 8: Flow (part 1 of 3)

Đọc tiếp phần 10: Cùng học Kotlin Coroutine, phần 10: Flow (part 3 of 3)


All Rights Reserved