Coroutine và Bài toán nhiều Task vụ
Chào mọi người. Trong xử lý lập trình bất đồng bộ, mọi người rất hay gặp phải tình huống xử lý nhiều task vụ cùng một lúc, hoặc các task vụ xử lý lần lượt, task vụ này phụ thuộc vào kết quả của các task vụ kia. Hôm nay mình xin chia sẻ một số cách để xử lý bài toán này bằng Coroutine.
Hãy xem xét một tình huống, trong đó Task_1 đang mong đợi một id thực hiện Task_2 với id được nhận từ Response của Task_1 và dựa trên phản hồi từ Task_2, các điều kiện và tham số của Task_3 sẽ bị thay đổi.
Task_1 → Task_2 with id -> Task_3
Làm thế nào để nhiều task song song phụ thuộc được thực hiện.
Thông thường, Task đầu tiên sẽ được thực hiện và sau khi nhận được phản hồi (Response), thao tác dữ liệu thì cuộc gọi tiếp theo sẽ được thực hiện.
viewModelScope.launch {
val data1Response:BaseResponse<Data1>?
try{
val call1 = repository.getAPIcall1()
}
catch (ex: Exception) {
ex.printStackTrace()
}
processData(data1Response)
}
viewModel?.data1?.collect { dataResponse1 ->
repository.getAPIcall2()
}
viewModel?.data1?.collect { dataResponse2 ->
repository.getAPIcall3()
}
Như cách triển khai trên với các task vụ phục thuộc vào nhau. Chúng ta cần phải Sync up trước khi thao tác với dữ liệu, cũng như xử lý với task vụ tiếp theo.
Với Couroutine có một số cách tiếp cận, xử lý tối ưu hơn để xử lý dữ liệu bất đồng bộ và đội kết quả để thực hiện Logic nghiệp vụ của bài toán.
Sau đây là một số cách tiếp cận.
1. Concurrent Approach with Wait Time async-await with Kotlin Coroutines
viewModelScope.launch {
val data1Response:BaseResponse<Data1>?
val data2Response: BaseResponse<Data2>?
val data3Response: BaseResponse<Data3>?
val call1 = async { repository.getAPIcall1()}
val call2 = async { repository.getAPIcall2()}
val call3 = async { repository.getAPIcall3() }
try {
data1Response = call1.await()
data2Response = call2.await()
data3Response = call3.await()
} catch (ex: Exception) {
ex.printStackTrace()
}
processData(data1Response, data2Response, data3Response)
}
Async sẽ mong đợi Response của task vụ. Tại đây, Task_1 sẽ cung cấp dữ liệu sẽ được đồng bộ hóa với Task_2, v.v. Sau đó, thao tác dữ liệu có thể được thực hiện với Response đã nhận và xử lý. Nhưng nó sẽ có một số thời gian chờ đợi giữa mỗi cuộc gọi.
Có một cách triển khai mà chúng ta có thể kết hợp gọi nhiều task vụ cùng tại một thời điểm.
suspend fun fetchData() =
coroutineScope {
val mergedResponse = listOf(
async { getAPIcall1() },
async { getAPIcall2() }
)
mergedResponse.awaitAll()
}
2. Concurrent/ parallel Approach with thread switchingwithContext() will switch to seperate thread
Nó tương tự như async-await. Nhưng đâu đấy sẽ tiết kiệm chi phí hơn. Thay vì triển khai trên Main Thread, withcontext sẽ chuyển sang một Thread riêng và thực hiện tác vụ. Nó sẽ không có thời gian chờ như async-await.
Nếu runBlocking được thêm overlapping lên withContext(), nó sẽ đảo ngược tính chất bất đồng bộ và có thể hủy của Coroutines và chặn luồng. Cho đến khi nhiệm vụ cụ thể được hoàn thành.
Có 5 loại Dispatchers. IO, Main, Default, New Thread, Unconfined
-
Default Dispatcher: Đây là dispatcher mặc định trong hầu hết các hệ thống Coroutine. Nó sử dụng một pool thread cố định để thực thi các coroutine. Mặc dù nó hữu ích cho các hoạt động đơn giản, nhưng nó không phù hợp cho các hoạt động đòi hỏi nhiều tài nguyên hoặc chạy lâu.
-
IO Dispatcher: Dispatcher này được tối ưu hóa để thực thi các hoạt động I/O chậm, chẳng hạn như đọc/ghi dữ liệu từ đĩa hoặc mạng. Thay vì sử dụng pool thread cố định, IO Dispatcher thường sử dụng một số luồng I/O đặc biệt để tận dụng tối đa tài nguyên hệ thống.
-
Unconfined Dispatcher: Loại dispatcher này cho phép coroutine chạy trên bất kỳ luồng nào. Nó không liên kết với luồng nào cụ thể và cho phép coroutine chuyển đổi giữa các luồng trong quá trình thực thi. Điều này có thể hữu ích trong một số trường hợp đặc biệt, nhưng cũng có thể gây ra vấn đề về đồng bộ hóa và xử lý của task vụ.
-
New Thread Dispatcher: Dispatcher này tạo ra một luồng mới cho mỗi coroutine được thực thi. Điều này đảm bảo rằng mỗi coroutine sẽ chạy độc lập trên một luồng riêng biệt. Tuy nhiên, việc tạo và quản lý nhiều luồng có thể ảnh hưởng đến hiệu suất và tài nguyên hệ thống.
-
Main Dispathcher: Loại dispatcher đặc biệt được sử dụng để thực thi các coroutine trên luồng chính của ứng dụng. Main Dispatcher đảm bảo rằng các coroutine chạy trên luồng chính không bị chặn (block) trong quá trình thực thi, để đảm bảo khả năng phản hồi của giao diện người dùng.
viewModelScope.launch {
withContext(Dispatchers.Default) {
val apiResponse1 = api.getAPICall1()
val apiResponse2 = api.getAPICall2()
if (apiResponse1.isSuccessful() && apiResponse2.isSuccessful() { .. }
}
}
3. Parallel Approach with data merging
Cách tiếp cận thứ ba hơi khác một chút và nếu chúng ta muốn có hai task vụ độc lập và ghép chúng lại với nhau để có phản hồi mới, thì Zip Operator sẽ giúp chúng tôi xử lý song song chúng và đưa cho chúng ta kết quả mà chúng ta cần.
repository.getData1()
.zip(repository.getData2()) { data1, data2 ->
return@zip data1 + data2
}
.flowOn(Dispatchers.IO)
.catch { e ->
..
}
.collect { it ->
handleSuccessResponse(..)
}
Tổng kết
Trên đây mình đã giới thiệu 3 cách tiếp cận và triển khai, bạn có sử dụng cho từng bài toán, requirement cụ thể:
- Khi chúng tôi muốn gọi nhiều Task vụ song song với thời gian chờ, thì cách tiếp cận async-await sẽ phù hợp.
- Khi chúng ta muốn cách tiếp cận hiệu quả hơn với chuyển đổi các Thread, withcontext sẽ phù hợp
- Và để ghép hai Response lại với nhau và thực hiện một số thao tác dữ liệu, phương pháp toán tử Zip là phù hợp.
Hy vọng bài viết này ít nhiều giúp các bạn về một số cách, một số lựa chọn xử lý với bài toàn xử lý nhiều task vụ, cụ thể ở đây là với Couroutine. Hẹn gặp mọi người ở bài viết sắp tới.
All rights reserved