Skip to content
Application Development II
GitLabGitHub

Coroutines

Explanation of how to manage asynchronous code using Coroutines, and how to use Coroutines with collections of data using Flows.
Overview

These notes are, in part, adapted from the Kotlinlang.org Coroutines guide

Kotlin provides only minimal low-level APIs in its standard library to enable other libraries to execute asynchronous tasks. Unlike other languages that you may be more familiar with (Java, C#, Javascript) that have asynchronous capabilities, async and await are not keywords in Kotlin and are not even part of its standard library — nor are futures and promises.

Rather, Kotlin introduces the concept of the suspending function, aiming to provide a safer and less error-prone abstraction for asynchronous operations than other languages.

kotlinx.coroutines is a rich library for coroutines developed by JetBrains. It contains a number of high-level coroutine-enabled primitives that this guide covers, including launch, async, and others.

The following resources are recommended study material for understanding Kotlin coroutines, and the lecture notes below are based on them:

This section is adapted from https://kotlinlang.org/docs/coroutines-basics.html

A coroutine is an instance of a suspendable computation. What does that mean? All code aims to perform computations — so coroutines are just computations that can be “suspended”, paused, or treated separately from the main course of computations that take place on the CPU.

Coroutines are conceptually similar to a Java thread, in the sense that it takes a block of code to run that works concurrently with the rest of the code. However, a coroutine is not bound to any particular CPU thread. It may suspend its execution in one thread and resume in another one.

The following code produces a kotlin coroutine:

import kotlinx.coroutines.*
fun main() = runBlocking { // this: CoroutineScope
launch { // launch a new coroutine and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello") // main coroutine continues while a previous one is delayed
}
import kotlinx.coroutines.*
fun main() = runBlocking { // this: CoroutineScope
launch { // launch a new coroutine and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello") // main coroutine continues while a previous one is delayed
}

Let’s dissect what this code does.

launch is a coroutine builder. It launches a new coroutine concurrently with the rest of the code, which continues to work independently. That’s why Hello has been printed first.

delay is a special suspending function. It suspends the coroutine for a specific time. Suspending a coroutine does not block the underlying thread, but allows other coroutines to run and use the underlying thread for their code.

runBlocking is also a coroutine builder that bridges the non-coroutine world of a regular fun main() and the code with coroutines inside of runBlocking { ... } curly braces. This is highlighted in an IDE by this: CoroutineScope hint right after the runBlocking opening curly brace.

If you remove or forget runBlocking in this code, you’ll get an Unresolved reference: launch error on the launch call.

The name runBlocking means that the thread that runs it (in this case the main thread) gets blocked for the duration of the call, until all the coroutines inside runBlocking { ... } complete their execution.

You will often see runBlocking used like that at the very top-level of the application and quite rarely inside the real code, as threads are expensive resources and blocking them is inefficient and often not desired.

Coroutines follow a principle of structured concurrency which means that new coroutines can only be launched in a specific CoroutineScope which delimits the lifetime of the coroutine. The above example shows that runBlocking establishes the corresponding scope and that is why the previous example waits until World! is printed after a second’s delay and only then exits.

In a real application, you will be launching a lot of coroutines. Structured concurrency ensures that they are not lost and do not leak. An outer scope cannot complete until all its children coroutines complete. Structured concurrency also ensures that any errors in the code are properly reported and are never lost.

Let’s extract the block of code inside launch { ... } into a separate function. When you perform “Extract function” refactoring1 on this code, you get a new function with the suspend modifier.

import kotlinx.coroutines.*
fun main() = runBlocking {
launch { doWorld() }
println("Hello")
}
/*
Note: this is the result of doing "Extract function" on the code
inside of the launch block from the previous section. The behavior
of the code has not changed, but now it is easier to read.
*/
suspend fun doWorld() {
delay(1000L)
println("World!")
}
import kotlinx.coroutines.*
fun main() = runBlocking {
launch { doWorld() }
println("Hello")
}
/*
Note: this is the result of doing "Extract function" on the code
inside of the launch block from the previous section. The behavior
of the code has not changed, but now it is easier to read.
*/
suspend fun doWorld() {
delay(1000L)
println("World!")
}

Functions with the suspend modifier — suspending functions — can be used inside coroutines just like regular functions, but their additional feature is that they can, in turn, use other suspending functions (like delay in this example) to suspend execution of a coroutine:

In addition to the coroutine scope provided by functions like runBuilder, it is possible to declare your own scope using the coroutineScope builder. It creates a coroutine scope and does not complete until all launched children complete.

runBlocking and coroutineScope builders may look similar because they both wait for their body and all its children to complete. The main difference is that the runBlocking method blocks the current thread for waiting, while coroutineScope just suspends, releasing the underlying thread for other usages. Because of that difference, runBlocking is a regular function and coroutineScope is a suspending function.

You can use coroutineScope from any suspending function. For example, you can move the concurrent printing of Hello and World into a suspend fun doWorld() function.

Why do this? Well, we can launch more than one suspend function, without blocking any threads. Let’s skip right away to an interesting example:

import kotlinx.coroutines.*
// Sequentially executes doWorld followed by "Done"
fun main() = runBlocking {
doWorld()
println("Done")
}
// Concurrently executes both sections
suspend fun doWorld() = coroutineScope { // this: CoroutineScope
launch {
delay(2000L)
println("World 2")
}
launch {
delay(1000L)
println("World 1")
}
println("Hello")
}
import kotlinx.coroutines.*
// Sequentially executes doWorld followed by "Done"
fun main() = runBlocking {
doWorld()
println("Done")
}
// Concurrently executes both sections
suspend fun doWorld() = coroutineScope { // this: CoroutineScope
launch {
delay(2000L)
println("World 2")
}
launch {
delay(1000L)
println("World 1")
}
println("Hello")
}

Both pieces of code inside launch { ... } blocks execute concurrently, with World 1 printed first, after a second from start, and World 2 printed next, after two seconds from start. A coroutineScope in doWorld completes only after both are complete, so doWorld returns and allows Done string to be printed only after that.

The Biblical Job is presented as a good man beset by horrendous disasters that take away all he holds dear—a scenario intended to test Job’s faith in God. Asynchronous programming may similarly test the CS student’s faith in their programs; the Job object can help them bring organization to a chaotic parallel world.

Remember how all Kotlin functions return a value? The launch function is no exception: it returns a Job object that is a handle to the launched coroutine and can be used to explicitly wait for its completion.

This API should be familiar: future and promise objects in other languages have similar interfaces.

For example, you can wait for completion of the child coroutine and then print “Done” string:

val job = launch { // launch a new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello")
job.join() // wait until child coroutine completes
println("Done")
val job = launch { // launch a new coroutine and keep a reference to its Job
delay(1000L)
println("World!")
}
println("Hello")
job.join() // wait until child coroutine completes
println("Done")

This section adapted from https://kotlinlang.org/docs/flow.html and https://www.baeldung.com/kotlin/collection-vs-sequence

A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? This is where Kotlin Flows come in.

Multiple values can be represented and processed in Kotlin using collections.

For example, we can have a simple function that returns a List of three numbers and then print them all using forEach:

fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
// Output:
// 1
// 2
// 3
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
// Output:
// 1
// 2
// 3

Operations like forEach, map, filter, etc. on collections like List are eagerly evaluated: each operation is performed on every item in the collection immediately and the result of the operation is stored in a new collection:

(1..100)
.map { it * it } // Create a new list [1,4,9,16...10000]
.filter { it % 2 == 0 } // Create a new list [4,16,...10000]
.first { it > 50 } // Break loop when 64 > 50 is found
(1..100)
.map { it * it } // Create a new list [1,4,9,16...10000]
.filter { it % 2 == 0 } // Create a new list [4,16,...10000]
.first { it > 50 } // Break loop when 64 > 50 is found

Sequences are lazily evaluatedeach operation is evaluated on demand. It does not create any intermediate new list on each operation. It emits one item at a time, passing it down and matching each item in the operation chain.

For example, we can modify the previous case by using a sequence to iterate over 100 items and find a number whose square is even number greater than 50:

(1..100).asSequence()
.map { it * it }
.filter{ it % 2 == 0 }
.first{ it > 50 }
(1..100).asSequence()
.map { it * it }
.filter{ it % 2 == 0 }
.first{ it > 50 }

During the iteration, the chain of operations is applied to each item in the sequence one by one.

When the iteration starts, the map() operation squares it and passes its result to filter(). If the filter() condition is true, it will pass the result to the first() operation, otherwise, it will skip the first() operation. For example, 1, 9, 25, and 49 are not passed down to first() because they fail the filter() condition.

Lazy evaluation is used to prevent expensive operations from taking place on an entire list, if not necessary. Eager evaluation is used in all other cases— if you need to process every member of the list, it is better to be eager.

Returning to the first example, we can use Sequence to include a delay in the processing of each item in a list like so:

fun simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(1000) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
simple().forEach { value -> println(value) }
}
// Output (1 second delay for each):
// 1
// 2
// 3
fun simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(1000) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
simple().forEach { value -> println(value) }
}
// Output (1 second delay for each):
// 1
// 2
// 3

The previous sequence computation blocks the main thread that is running the code every time Thread.sleep(1000) is called.

To make this asynchronous code, we can mark the simple function with a suspend modifier, so that it can perform its work without blocking and return the result as a list:

suspend fun simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
suspend fun simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}

This code prints the numbers after waiting for a second between each number — without blocking the main CPU thread.

Using the List<Int> result type, means we can only return all the values at once. To represent the stream of values that are being computed asynchronously, we can use a Flow<Int> type just like we would use a Sequence<Int> type for synchronously computed values.

The code below waits 1000ms before printing each number without blocking the main thread. This is verified by printing “I’m not owned” every 1000ms from a separate coroutine that is running in the main thread:

fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(1000) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a separate coroutine to make sure the main thread is not blocked
launch {
for (k in 1..3) {
println("I'm not owned $k")
delay(1000)
}
}
// Collect the flow
simple().collect { value -> println(value) }
}
/*
Output:
NOTE: what would this look like if the main thread were blocked by simple()?
(see the next tab for an example)
I'm not owned 1
1
I'm not owned 2
2
I'm not owned 3
3
*/
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(1000) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a separate coroutine to make sure the main thread is not blocked
launch {
for (k in 1..3) {
println("I'm not owned $k")
delay(1000)
}
}
// Collect the flow
simple().collect { value -> println(value) }
}
/*
Output:
NOTE: what would this look like if the main thread were blocked by simple()?
(see the next tab for an example)
I'm not owned 1
1
I'm not owned 2
2
I'm not owned 3
3
*/

Notice the following:

  • A builder function of Flow type is called flow.
  • Code inside a flow { ... } builder block can suspend.
  • The simple function is no longer marked with a suspend modifier.
  • Values are emitted from the flow using an emit function.
  • Values are collected from the flow using a collect function.

Flows are lazy streams similar to sequences — the code inside a flow builder does not run until the flow is collected using collect. This becomes clear in the following example:

fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
/* Output
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
*/
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
/* Output
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
*/

This is a key reason the simple function (which returns a flow) is not marked with suspend modifier. The simple() call itself returns quickly and does not wait for anything. The flow starts afresh every time it is collected and that is why we see “Flow started” every time we call collect again.

The above covers the big picture that is required to understand how to use Flows and coroutines. There is more to know: see https://kotlinlang.org/docs/flow.html#flow-cancellation-basics and the following sections for further details if you need them.

  1. “Extract Function” refers to a tool that Jetbrains IDEs like Android Studio and IntelliJ (and many others) provide — when you highlight a chunk of code that can be replaced by a function, the IDE can determine how exactly to generate the function that would replace that code. See https://www.jetbrains.com/help/idea/extract-method.html