Cùng học Kotlin Coroutine, phần 11: Channels (part 1 of 2)

1. Giới thiệu về Channels

Channels khá giống với Flow mà mình đã giới thiệu trong phần 8, phần 9, phần 10 của series này. Nó cũng giúp chúng ta transfer một luồng giá trị (stream of values).

Channels khá giống với BlockingQueue, tức là nó cũng hoạt động như một Queue (hàng đợi) là FIFO (First In First Out), đại khái cách hoạt động của BlockingQueue là thằng nào vào Queue trước ta sẽ xử lý trước, thằng nào vào sau thì đợi thằng trước xong đã nhé 😄). Điểm khác nhau ở đây là BlockingQueue nó sử dụng 2 hàm put (thêm vào queue) và hàm take (lấy từ queue ra) và 2 hàm này là chạy blocking còn Channels sử dụng 2 hàm send (tương tự hàm put) và receive (tương tự hàm take) và 2 hàm này là suspend function tức là nó có khả năng suspend/resumenon-blocking (xem lại blocking/non-blocking và suspend function)

fun main() = runBlocking {
    val channel = Channel<Int>()
    val job = launch {
        for (x in 1..5) {
            channel.send(x * x)
        }
    }
    // print 5 giá trị, trước khi nhận cho delay 1s
    delay(1000) // delay 1s
    println(channel.receive()) // nhận giá trị thứ 1
    delay(1000) // delay 1s
    println(channel.receive()) // nhận giá trị thứ 2
    delay(1000) // delay 1s
    println(channel.receive()) // nhận giá trị thứ 3
    delay(1000) // delay 1s
    println(channel.receive()) // nhận giá trị thứ 4
    delay(1000) // delay 1s
    println(channel.receive()) // nhận giá trị thứ 5
    println("Done! Channel is empty?: ${channel.isEmpty} / Coroutine is completed?: ${job.isCompleted} / Coroutine is active?: ${job.isActive}")
}

Output: Giải thích code:

val channel = Channel<Int>(): tạo một channel để transfer một luồng giá trị kiểu Int

channel.send(value): thêm một giá trị vào channel

channel.receive(): trả về giá trị được send sớm nhất (first in), đồng thời remove giá trị đó ra khỏi channel

Các bạn có thể tưởng tượng đoạn code trên hoạt động giống như các chú gấu đang xếp hàng để mua vé xem phim trong ảnh trên. Giả sử có 5 chú gấu đang đứng đợi mua vé xem phim và chỉ có 5 chiếc vé xem phim. Sau 1s đầu tiên, 1 vé được in ra (channel send) thì lập tức chú gấu đầu tiên nhận nó (channel receive) và đi ra khỏi hàng (remove khỏi channel), 1s tiếp theo đến chú gấu tiếp theo. Cứ vậy, sau 5s thì bán hết 5 vé, cũng chẳng còn chú gấu nào đang đợi nhận (channel is empty). Mình có print ra dòng cuối cùng để kiểm tra Channel is empty? (isEmpty = true), Coroutine nhận nhiệm vụ send vé đã completed chưa (Output là đã completed (isCompleted = true) và ko còn active (isActive = false)).

Ở đây mình cho delay 1s trước khi nhận đễ dễ thấy rằng: channel sau khi in ra 1 vé (channel.send()) mà chưa thấy ai nhận thì coroutine nhận nhiệm vụ in vé đó sẽ suspend lại chờ đến khi có người nhận vé đó (channel.receive()) thì nó mới resume trở lại và in tiếp.

Vậy sẽ thế nào nếu in ra 5 vé mà không có ai nhận vé?

Để trả lời câu hỏi này, mình sẽ dùng cùng 1 bài toán này để so sánh với Flow cho dễ hiểu.

2. Channels vs Flow (Hot & Cold)

Như các bạn đã biết Flow là một nguồn dữ liệu lạnh (cold streams). Điều đó có nghĩa là code bên trong flow { } sẽ không chạy cho đến khi Flow gọi hàm collect. Như vậy nếu ko có ai nhận vé thì nó sẽ ko in vé (so smart) 😄. Còn Channels thì khác, như mình đã nói ở trên thì khi channel send nó sẽ tạm suspend và chờ đến khi có ai đó receive thì nó mới resume và tiếp tục send. Vậy nên khi in ra 1 vé đầu tiên mà ế quá không có ai nhận thì coroutine nhận nhiệm vụ send sẽ bị suspend vĩnh viễn. Việc này rất nguy hại đến memory nên đây chính là nhược điểm lớn của thằng Channels. Vì vậy Channels còn được gọi là nguồn dữ liệu nóng (hot streams).

Thử code 1 đoạn, channel ko receive.

fun main() = runBlocking {
    val channel = Channel<Int>()
    val job = launch {
        for (x in 1..5) {
            channel.send(x * x)
        }
    }

    println("Done! Coroutine is completed?: ${job.isCompleted} / Coroutine is active?: ${job.isActive}")
}

Output:

Nhìn vào output. Bạn ko thấy Process finished with exit code 0 có nghĩa là process này vẫn chưa kết thúc, điều đó đã chứng minh coroutine job đã bị suspend forever. Hơn nữa mình còn muốn chắc chắn hơn bằng cách log ra job.isCompletedjob.isActive và kết quả trong ảnh là job.isCompleted = false (coroutine chưa hoàn thành task) và job.isActive = true (coroutine còn sống nhăn răng)

Tất nhiên ngay cả trong trường hợp: in ra 5 vé nhưng chỉ có 3 vé được nhận cũng gây ra suspend forever.

fun main() = runBlocking {
    val channel = Channel<Int>()
    val job = launch {
        for (x in 1..5) {
            channel.send(x * x)
        }
    }

    for (x in 1..3) { // 3 vé được nhận
        println(channel.receive())
    }
    println("Done! Coroutine is completed?: ${job.isCompleted} / Coroutine is active?: ${job.isActive}")
}

Output:

Channels ra đời trước Flow, có nghĩa là trước khi có Flow thì chúng ta chỉ có một cách duy nhất để transfer a stream of values là sử dụng Channels. Có thể nhược điểm lớn này của Channels cũng chính là lý do mà Flow ra đời. Vì vậy, xét về tính phổ biến, mức độ áp dụng vào các dự án thì Channels ít được áp dụng hơn Flow. Tuy nhiên, mình cũng muốn viết 1 bài về nó, để giúp chúng ta hiểu thêm về Flow và sự khác biệt giữa FlowChannels và hoàn thành series một cách trọn vẹn 😄.

Vậy nếu chỉ có 5 vé mà cần nhận tới 10 vé thì sao?

Thử ngay cho nóng:

fun main() = runBlocking { // COROUTINE NÀY MÌNH ĐẶT TÊN LÀ "run blocking cô rơ tin"
    val channel = Channel<Int>()
    val job = launch {
        for (x in 1..5) {
            channel.send(x * x)
        }
    }

    for (x in 1..10) {
        println("Coroutine is completed?: ${job.isCompleted} / Coroutine is active?: ${job.isActive}")
        println(channel.receive())
    }
    println("Done! Run blocking coroutine is active?: $isActive")
}

Output:

Trước khi nhận mình thử kiểm tra xem coroutine nhận nhiệm vụ send đã hoàn thành task chưa, còn active ko?. Kết quả như trong ảnh là sau khi người in vé send ra 5 vé và đã có người nhận đã nhận hết 5 vé. Nó đã hoàn thành nhiệm vụ nên khi vòng lặp for chạy tới i = 6 thì nó in ra Coroutine is completed?: true / Coroutine is active?: false . Nhưng cũng chính tại i = 6 này channel lại receive (muốn nhận tiếp vé thứ 6), trong khi 5 vé đã được bán hết rồi vậy nên người nhận này lại bị suspend vĩnh viễn. Bằng chứng là ko thấy Process finished with exit code 0 trong output có nghĩa là process này vẫn chưa kết thúc. Wait!, khi nảy vừa nói coroutine đã hoàn thành task (isCompleted = true) rồi mà sao lại có chuyện suspend nữa?. Nếu để ý lại code sẽ thấy ở trên có 2 coroutine: 1 con tên "job" nhận nhiệm vụ send, 1 con mình đặt tên là "run blocking cô rơ tin" nhận nhiệm vụ receive. Vì con coroutine "run blocking cô rơ tin" nhận nhiệm vụ receive bị suspend. Vì vậy nên chữ "Done!" không được print ra.

3. Khi nào thì cần sử dụng Channels

Bạn sẽ sử dụng Channels khi bạn cần gửi dữ liệu từ một coroutine đến một coroutine khác trong cùng hoặc từ một process đến một process khác.

1 tấm ảnh hơn triệu lời nói 😄. Cái ống đó chính là channel là nơi để con coroutine 1 send data vào ống, và cũng là nơi để con coroutine 2 chui vào ống xúc data ra xử lý 😄

Như các ví dụ trên mình đã sử dụng 2 coroutine, 1 con để send, 1 con để receive. Vậy nếu chỉ có 1 coroutine vừa send vừa receive thì có được không?. Tất nhiên là KHÔNG!. Vì lúc vừa mới send xong 1 giá trị đầu tiên thì coroutine đó đã bị suspend rồi, vì vậy nên nó chưa kịp chạy đến receive và dẫn đến bị suspend vĩnh viễn. Ví dụ:

fun main() = runBlocking { // Receive coroutine
    val channel = Channel<Int>()
    val job = launch { // Send coroutine
        for (x in 1..5) {
            channel.send(x * x)
            println("con sông ngăn cách")
            println(channel.receive())
        }
    }
    println("Done! Receive coroutine is active?: $isActive / Send coroutine is active?: ${job.isActive}")
}

Output:

Nhìn vào output sẽ thấy Send coroutine đã bị suspend vĩnh viễn. Thậm chí chữ "con sông ngăn cách" còn không được in ra thì sao chạm đến được dòng code channel.receive(). Con Send coroutine chưa hoàn thành task nên isActive = true là đúng rồi, sao con Receive coroutine cũng isActive = true, nó chạy xong code in ra được chữ "Done" rồi cơ mà 😄. Cái này liên quan đến bài cũ: Một coroutine cha luôn chờ đợi để tất cả các coroutine con của nó chạy xong hoàn thành nhiệm vụ thì nó mới hoàn thành nhiệm vụ. À phải rồi, con Send coroutine được launch bên trong con Receive coroutine nên nó là con của Receive coroutine. Và tất nhiên con chưa xong task, cha nào dám đi ngủ 😄. (Đọc lại bài cũ)

Túm cái váy lại là khi channel send, thì Send coroutine bị suspend, nó sẽ resume lại khi có có 1 coroutine khác đã receive được cái giá trị nó vừa send. Khi channel gọi hàm receive cũng vậy, Receive coroutine cũng sẽ bị suspend cho đến khi có 1 coroutine khác send giá trị để nó nhận.

4. Iteration over channel

Thay vì dùng vòng for thông thường, chúng ta có thể receive value bằng cách duyệt vòng lặp qua channel. Ví dụ:

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) {
            channel.send(x * x)
        }
    }
    for (value in channel) println(value)
    println("Done!")
}

Output:

Receive Coroutine đã bị suspend vĩnh viễn nên chữ "Done" ko được in ra. Cẩn thận khi dùng cách này for (value in channel) println(value) vì nó giống như một vòng lặp vô tận ấy, nó sẽ lặp cho đến khi channel đã bị close. Vậy làm thế nào để close channel.

5. Close channel

Đơn giản chỉ là dùng hàm channel.close().

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (value in channel) println(value)
    println("Done!")
}

Output:

1
4
9
16
25
Done!

Process finished with exit code 0

Như các bạn thấy sau khi sendreceive hết 5 giá trị thì close channel nên vòng lặp for (value in channel) sẽ bị dừng lặp và chữ "Done" được in ra, process kết thúc!. Cách hoạt động của hàm close này là khi gọi channel.close() nó sẽ send 1 special token đến channel, channel receive được token này sẽ hiểu và close channel, đồng thời dừng vòng lặp for.

Có 2 lưu ý liên quan đến việc channel bị close:

1/ Nếu channel đã close nhưng vẫn cố gắng receive thì sẽ throw ClosedReceiveChannelException

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // sau khi send xong 5 phần tử thì close
    }
    for (y in 1..10) println(channel.receive()) // send có 5 mà nhận tới 10
    println("Done!")
}

Output:

1
4
9
16
25
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

2/ Nếu channel đã close nhưng vẫn cố gắng send thì sẽ throw ClosedReceiveChannelException

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (i in 1..10) {
            if (i == 5) channel.close() // nếu i = 5 thì close đi, ko được send nữa.
            channel.send(i * i) // nhưng ta vẫn cố send i = 5 -> throw ClosedSendChannelException
        }
    }
    for (y in 1..5) {
        println(channel.receive())
    }
    println("Done!")
}

Output:

1
4
9
16
Exception in thread "main" kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed

Chúng ta có thể custom lại Exception trong hàm close thay vì cho throw ClosedSendChannelException hay ClosedReceiveChannelException bằng hàm close(cause: Throwable?)

fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
        for (i in 1..10) {
            if (i == 5) channel.close(Throwable("ta cho lệnh đóng channel lại!")) // nếu i = 5 thì close đi, ko được send nữa.
            channel.send(i * i) // nhưng ta vẫn cố send i = 5 -> throw Throwable
        }
    }
    for (y in 1..5) {
        println(channel.receive())
    }
    println("Done!")
}

Output:

1
4
9
16
Exception in thread "main" java.lang.Throwable: ta cho lệnh đóng channel lại!

Kết luận

Hy vọng qua bài viết này, các bạn đã hiểu cơ bản về Channels. Phần tiếp theo mình sẽ giới thiệu producers, consumers trong Channels và các loại Channels. Cảm ơn các bạn đã theo dõi bài viết này. Hy vọng các bạn sẽ tiếp tục theo dõi những phần tiếp theo. 😄.

Nguồn tham khảo:

https://kotlinlang.org/docs/reference/coroutines/channels.html

https://medium.com/@elizarov/cold-flows-hot-channels-d74769805f9

https://www.youtube.com/watch?v=tYcqn48SMT8&feature=youtu.be

Đọc lại những phần trước:

Cùng học Kotlin Coroutine, phần 1: Giới thiệu Kotlin Coroutine và kỹ thuật lập trình bất đồng bộ

Cùng học Kotlin Coroutine, phần 2: Build first coroutine with Kotlin

Cùng học Kotlin Coroutine, phần 3: Coroutine Context và Dispatcher

Cùng học Kotlin Coroutine, phần 4: Job, Join, Cancellation and Timeouts

Cùng học Kotlin Coroutine, phần 5: Async & Await

Cùng học Kotlin Coroutine, phần 6: Coroutine Scope

Cùng học Kotlin Coroutine, phần 7: Xử lý Exception trong Coroutine, Supervision Job & Supervision Scope

Cùng học Kotlin Coroutine, phần 8: Flow (part 1 of 3)

Cùng học Kotlin Coroutine, phần 9: Flow (part 2 of 3)

Cùng học Kotlin Coroutine, phần 10: Flow (part 3 of 3)

Đọc tiếp phần 12: Cùng học Kotlin Coroutine, phần 12: Channels (part 2 of 2)


All Rights Reserved