Kotlin Coroutines: async/await trong Android

Thời gian gần đây tôi có tham gia vào 1 dự án nội bộ trong công ty, đó là phát triển 1 thư viện dùng để hỗ trợ kiểm thử trên smartphone. Về cơ bản thì khi tester đang test 1 ứng dụng nào đó và phát hiện ra lỗi (thường là UI) thì sẽ có thể sử dụng thư viện này để chụp ảnh màn hình và khoanh vùng lỗi lên đó, cuối cùng là gửi report lên Github hoặc Redmine. Dự án này cũng 1 phần là để thử nghiệm Kotlin sau khi Google công bố hỗ trợ chính thức ngôn ngữ này tại I/O 2017.

Ở bài viết này, tôi sẽ không đi sâu vào cách chúng tôi implement thư viện này như thế nào. Tuy vậy, có 1 vấn đề nhỏ tôi gặp phải trong quá trình phát triển mà tôi muốn chia sẻ với các bạn, đó là trong bước cuối cùng của thư viện có đề cập đến việc gửi report lên Github/Redmine. Với Github, đầu tiên chúng ta cần cho user đăng nhập để có thể lấy được access token để thực hiện các request cần thiết. Request cần thiết ở đây có thể là list danh sách các repo của user, danh sách pull request/issue hoặc cho phép user search repo theo tên. Trong bài này hãy cùng thảo luận về chức năng search nhé.

The problem

Tất cả các request trên đều có thể dễ dàng được thực hiện bởi Retrofit. Hàm search có thể được định nghĩa như sau:

@GET("search/repositories")
fun search(@Query("access_token") authorization: String, @Query("q") query: String): Call<GithubSearch>

Đây chỉ là 1 trong số các function request của Github, nhưng tất cả đều có 1 điểm chung là đều yêu cầu có access token. Vì vậy chúng ta hãy tạo 1 hàm khác để chỉ cần input query vào là có thể search được:

class DataManager(context: Context)
    fun search(query: String): Call<GithubSearch> {
        return githubService.search(sharedPref.githubAccessToken, query)
    }
}

Hàm trên sẽ tự động lấy access token trong SharedPreferences để gắn vào request cho chúng ta.

Chúng ta sử dụng hàm trên như sau:

dataManager.search("Something").enqueue(object : Callback<GithubSearch> {

    override fun onFailure(call: Call<GithubSearch>?, t: Throwable?) {
        Toast.makeText(applicationContext, "An error happened!", Toast.LENGTH_LONG).show()
    }

    override fun onResponse(call: Call<GithubSearch>?, response: Response<GithubSearch>) {
        if (response.isSuccessful) {
            list.addAll(response.body().items)
            adapter.notifyDatasetChanged()
        } else {
            Toast.makeText(applicationContext, "An error happened!", Toast.LENGTH_LONG).show()
        }
    }
})

Trông cũng OK đúng không? Thật ra thì cũng không OK lắm đâu. Như tôi đã nói ở trên, ngoài hàm search chúng ta còn có rất nhiều hàm request khác, mà nếu dùng cách này thì lúc nào cũng phải check xem response có thành công không, và object trả về là Response<GithubSearch> chứ không đơn giản chỉ là GithubSearch.

Thật ra, chúng ta có thể giải quyết việc này khá dễ dàng bằng việc sử dụng generic. Đầu tiên hãy tạo 1 interface đơn giản để chứa object chúng ta cần thay vì Response<T> của Retrofit:

interface RequestCallback<T> {
    fun onResponse(response: T)
    fun onFailure(throwable: Throwable? = null)
}

Sau đó sử dụng Extension function để thêm hàm sau cho Call<T> của Retrofit:

fun <T> Call<T>.extract(requestCallback: RequestCallback<T>) {
    enqueue(object : Callback<T> {
        override fun onFailure(call: Call<T>?, t: Throwable?) {
            requestCallback.onFailed(t)
        }

        override fun onResponse(call: Call<T>?, response: Response<T>?) {
            if (response != null && response.isSuccessful) {
                requestCallback.onSuccess(response = response.body()!!)
            } else {
                requestCallback.onFailed()
            }
        }
    })
}

Về cơ bản thì hàm này sẽ thực hiện việc kiểm tra request có thành công không và extract object mà chúng ta đang quan tâm. Hàm search từ đó sẽ được sử dụng như sau:

dataManager.search("Something").extract(object : RequestCallback<GithubSearch> {
    override fun onResponse(response: GithubSearch) {
        list.addAll(response.items)
        adapter.notifyDatasetChanged()
    }

    override fun onFailure(throwable: Throwable?) {
        Toast.makeText(applicationContext, "An error happened!", Toast.LENGTH_LONG).show()
    }
})

Đã dễ nhìn hơn 1 chút, nhưng vẫn chưa đủ. Đây mới chỉ là trường hợp tốt nhất thôi, còn nếu bạn muốn cache lại kết quả sau đó mới update list thì bạn sẽ được gặp callback hell :

dataManager.search("Something").extract(object : RequestCallback<GithubSearch> {
    override fun onResponse(response: GithubSearch) {
        dataManager.cache(response, object : Callback<List<Item>> {
            override fun onSuccess(items: List<Item>) {
                 list.addAll(items)
                 adapter.notifyDatasetChanged()
            }

            override fun onError(throwable: Throwable?) {
                  Toast.makeText(applicationContext, "An error happened!", Toast.LENGTH_LONG).show()
            }
        })
    }

    override fun onFailure(throwable: Throwable?) {
        Toast.makeText(applicationContext, "An error happened!", Toast.LENGTH_LONG).show()
    }
})

Chúng ta có thể giải quyết callback hell 1 cách dễ dàng bằng cách sử dụng RxJava. Nhưng gần đây tôi có nghe rất nhiều về 1 API đang thử nghiệm trong Kotlin gọi là Coroutines với implementation của async/await trong C#/ECMAScript. Học không bao giờ là thừa, hãy cùng xem Coroutines giải quyết bài toán trên như thế nào nhé.

Coroutines

Về cơ bản thì coroutines là 1 thuật toán có khả năng dừng (suspend) và tiếp tục (resume) việc thực thi code nằm trong nó mà không làm cho thread bị block. Trong Android, nếu bạn chạy 1 hàm phức tạp nào đó trên main thread thì nó sẽ block UI cho đến khi hàm chạy xong. Block UI sẽ làm cho app rơi vào trạng thái ANR và khả năng lớn là user sẽ uninstall app của bạn. Ngược lại, suspend tỏ ra khá hiệu quả khi bạn không cần phải quan tâm đến context switch hay cần hiểu biết về OS, những thứ đó coroutines sẽ làm giúp bạn. Chúng ta có thể dễ dàng điều khiển việc suspend của coroutines thông qua suspension point (tạm dịch - điểm dừng). Trong Kotlin, suspension point được thể hiện thông qua lời gọi đến những hàm được đánh dấu bởi từ khóa suspend.

Nếu đọc đoạn trên khó hiểu quá thì hãy xem ví dụ sau nhé, tạm thời chúng ta sẽ dùng pseudo code:

1. start coroutine
2. var json = githubApi.getUser().await()
3. var user = parse(json).await()
4. displayUser(user)

Dòng 2 và 3 sẽ làm cho thuật toán của chúng ta bị tạm dừng cho đến khi chạy xong (chú ý là 2 dòng đó sẽ được chạy trên thread khác). Sau khi chạy xong, chương trình sẽ tiếp tục ở chỗ nó đã dừng trước đó (dòng 4 sẽ được chạy) và với kết quả lấy được từ dòng 2 và 3. Nếu trong quá trình chạy 2 dòng 2 và 3 mà có lỗi xảy ra, Exception phù hợp sẽ được throw lên thread bắt đầu coroutine để chúng ta có thể catch.

Nếu bạn vẫn thấy khó hiểu thì hãy xem chúng ta áp dụng coroutine vào bài toán trên như thế nào nhé.

Đầu tiên, việc tạo ra 1 hàm extension từ Call<T> (extract trong trường hợp trên) là quyết định đúng, nên chúng ta sẽ tiếp tục theo cách đó. Nhưng để nhất quán với concept async/await thì chúng ta sẽ thay đổi tên hàm thành await:

suspend fun <T> Call<T>.await(): T {

}

Như các bạn có thể thấy thì chúng ta sẽ return luôn object mà chúng ta quan tâm từ function này và không cần phải đi qua callback nữa. Ngoài ra chúng ta còn đánh dấu hàm bằng từ khóa suspend, tôi sẽ giải thích ở bên dưới. Nội dung của hàm sẽ được viết như sau:

suspend fun <T> Call<T>.await(): T {
    return suspendCoroutine { continuation ->
        enqueue(object : Callback<T> {
            override fun onFailure(call: Call<T>?, t: Throwable) {
                continuation.resumeWithException(t)
            }
            override fun onResponse(call: Call<T>?, response: Response<T>) {
                if (response.isSuccessful && response.body() != null) {
                    continuation.resume(response.body() as T)
                } else {
                    continuation.resumeWithException(HttpException(response))
                }
            }
        })
    }
}

Nhìn qua thì không khác bản cũ là mấy đúng không? 1 số khái niệm mà bạn cần chú ý trong đoạn code trên là:

  • Từ khóa suspend: Từ khóa này sẽ làm cho những lời gọi đến hàm làm dừng 1 coroutine. Coroutine sẽ bị dừng cho đến khi await() được chạy xong. Chúng ta chỉ có thể gọi 1 hàm được đánh dấu suspend từ trong 1 coroutine. Thế mặt mũi thằng coroutine nhìn như thế nào? Tôi sẽ giải thích ở dưới nên cứ đọc tiếp nhé.

  • suspendCoroutine là 1 hàm khác cũng được đánh dấu suspend (hàm này là API của Kotlin, bạn chỉ việc dùng thôi). Thực chất chúng ta cần phải gọi hàm này thì việc thực thi của coroutine mới có thể bị dừng. Khi suspendCoroutine được gọi trong 1 coroutine, nó sẽ giữ lại state thực thi trong 1 instance của interface Continuation<T> (bạn có thể thấy argument continuation ở trên). Interface này có 2 hàm là resumeresumeWithException cũng tương tự như 2 hàm onResponseonFailure trong interface callback của chúng ta. Khi request thành công, hàm resume sẽ được gọi đến và chúng ta sẽ gửi kết quả vào hàm này. Sau đó, chương trình sẽ được tiếp tục và hàm await sẽ trả ra kết quả. Tương tự với resumeWithException, nhưng thay vào đó thì hàm này sẽ throw exception lên thread khởi chạy coroutine đó.

Giờ thì chúng ta có thể chạy coroutine rồi:

launch(CommonPool) {
    try {
      var search = dataManager.search("Something").await()
      var items = dataManager.cache(search).await()
      list.addAll(items)
      adapter.notifyDatasetChanged()
    } catch(exception: IOException) {
      Toast.makeText(applicationContext, "An error happened!", Toast.LENGTH_LONG).show()
    }
}

launch là 1 hàm thuộc API của coroutine dùng để tạo 1 coroutine mới với thread pool mà chúng ta truyền vào (ở đây chúng ta dùng CommonPool, 1 trong những context được coroutine hỗ trợ). Đoạn code nằm trong launch chính là coroutine. Giờ thì hãy thử chạy xem sao nhé.

FATAL EXCEPTION: ForkJoinPool.commonPool-worker-0
Process: framgia.com.sdksample, PID: 28413
android.view.ViewRootImpl$CalledFromWrongThreadException: Only the original thread that created a view hierarchy can touch its views.

Sau khi chạy đoạn code trên, app của chúng ta sẽ crash và ném ra Exception này. Đây là exception được ném ra khi bạn làm gì đó với view trên 1 thread không phải là main thread. Tại sao lại như vậy? Đó là bởi vì coroutine của chúng ta đang không chạy trên main thread, mà chạy trên thread cung cấp bởi CommonPool. Đến đây nếu bạn thích thì có thể dùng luôn thư viện này để cung cấp cho coroutine main thread của Android (thư viện này cũng tương tự như RxAndroid vậy). Nhưng nếu bạn chỉ cần 1 implementation đơn giản thì hãy đọc tiếp nhé.

Trước tiên hãy đọc documentation của Kotlin về cách implement 1 Continuation để thể hiện UI thread của 1 platform nhé. Mặc dù ví dụ về Swing nhưng implement cũng tương tự cho Android thôi 😄

Đầu tiên chúng ta cần phải implement interface Continuation<T> để override 2 hàm resumeresumeWithException để trả về kết quả trên main thread:

private class AndroidContinuation<T>(val continuation: Continuation<T>) : Continuation<T> by continuation {
    override fun resume(value: T) {
        if (Looper.myLooper() == Looper.getMainLooper()) continuation.resume(value)
        else Handler(Looper.getMainLooper()).post { continuation.resume(value) }
    }

    override fun resumeWithException(exception: Throwable) {
        if (Looper.myLooper() == Looper.getMainLooper()) continuation.resumeWithException(exception)
        else Handler(Looper.getMainLooper()).post { continuation.resumeWithException(exception) }
    }
}

Đoạn code trên lặp lại hơi nhiều (cụ thể là if else condition). Bạn có thể dùng Higher-order function để có thể tái sử dụng đc đoạn code lặp:

private class AndroidContinuation<in T>(val continuation: Continuation<T>) : Continuation<T> by continuation {
    override fun resume(value: T) {
        postOnMainThread { continuation.resume(value) }
    }

    override fun resumeWithException(exception: Throwable) {
        postOnMainThread { continuation.resumeWithException(exception) }
    }

    inline fun postOnMainThread(crossinline expr: () -> Unit) {
        if (Looper.myLooper() == Looper.getMainLooper()) expr()
        else Handler(Looper.getMainLooper()).post { expr() }
    }
}

Nếu bạn chưa rõ về concept của Looper thì hãy đọc bài viết của tôi nhé. Với đoạn code trên, đầu tiên chúng ta sẽ check xem looper hiện tại có phải là main looper không, qua đó chúng ta sẽ biết chúng ta có đang ở trên main thread hay không vì main looper liên kết với main thread. Nếu không thì chúng ta chỉ đơn giản là gửi runnable chứa đoạn code của resume vào trong hàng chờ của main looper để đưa nó trở lại main thread.

Sau đó chúng ta có thể định nghĩa 1 object tên Android để làm cho nó trở thành 1 context tương tự như CommonPool. Hãy implement interface ContinuationInterceptor để làm được điều đó:

object Android : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
            AndroidContinuation(continuation)
}

Giờ đã có main thread rồi, đoạn code chúng ta cần sẽ nhìn như sau:

launch(Android) {
    try {
      var search = dataManager.search("Something").await()
      var items = dataManager.cache(search).await()
      list.addAll(items)
      adapter.notifyDatasetChanged()
    } catch(exception: IOException) {
      Toast.makeText(applicationContext, "An error happened!", Toast.LENGTH_LONG).show()
    }
}

Đoạn code từ callback hell đã trở nên gọn gàng, dễ đọc, code nhìn như sync nhưng chạy lại async 😄

Kết

Qua bài viết này chúng ta đã có những hiểu biết cơ bản về coroutines trong Kotlin. Khi mang coroutine ra so sánh với RxJava (chỉ đơn thuần về thread và concurrency) thì tôi nghĩ tất cả đều đồng ý rằng RxJava mang lại cho chúng ta sự linh hoạt cao hơn, tuy nhiên với 1 số trường hợp cụ thể thì việc sử dụng coroutines là đơn giản và nếu dùng đúng cách sẽ giúp cho code của bạn dễ đọc hơn rất nhiều.