Eliminating coroutine races
It is not an uncommon situation when we start two coroutines at (more or less) the same time, but we need one of them finished before the other one can be executed. Let's see a few real-life cases, and learn how to deal with them.
The problem
When you call two functions, each launching a coroutine, you cannot be sure which one will be executed first. That is why the below code will sometimes print "ab" and sometimes "ba".
import kotlinx.coroutines.*
fun main() {
repeat(100) { GlobalScope.launch { Thread.sleep(100) } } // To make processor busy
test() // sometimes "ab" and sometimes "ba"
runBlocking { scope.coroutineContext.job.children.forEach { it.join() } }
}
fun test() {
a()
b()
}
val scope = CoroutineScope(Dispatchers.Default)
fun a() = scope.launch {
print("a")
}
fun b() = scope.launch {
print("b")
}
Regular processes
The simplest way to make one process wait until another one is finished is to store this first process Job
(which is the result from launch
or async
) and use its join
function, that suspends until the process is finished (I describe it in detail in the book Kotlin Coroutines: Deep Dive). The below code uses this trick, and so it always prints "ab".
import kotlinx.coroutines.*
fun main() {
repeat(100) { GlobalScope.launch { Thread.sleep(100) } } // To make processor busy
test() // ab
runBlocking { scope.coroutineContext.job.children.forEach { it.join() } }
}
fun test() {
a()
b()
}
val scope = CoroutineScope(Dispatchers.Default)
val job: Job? = null
fun a() {
job = scope.launch {
print("a")
}
}
fun b() = scope.launch {
job?.join()
print("b")
}
Listener and handler
Consider a situation where you need to use one coroutine to set up a listener for events, and another to send an event. This is a typical situation in Android when we follow MVI pattern. Consider that BaseViewModel
starts listening for events in its constructor, and we need to send the first event in a constructor of some of its implementations.
class BaseViewModel : ViewModel() {
private val eventFlow = MutableSharedFlow<Event>()
init {
eventFlow
// ...
.onEach(::handleIntent)
// ...
.launchIn(viewModelScope)
}
fun sendEvent(event: Event) {
viewModelScope.launch {
intentFlow.emit(event)
}
}
fun handleIntent(event: Event) {
// ...
}
}
class SomeViewModel : BaseViewModel() {
init {
sendEvent(Event()) // Will it be handled or not?
}
}
When we initialize SomeViewModel
, we first call its parent constructor, and we start a coroutine that should listen on eventFlow
, but than straight after that, we start another coroutine to send an event to this flow. So the race begins. Will the event be handled or not? Sometimes it will be, but sometimes event will be sent before the listener is set up, and it will be lost, because MutableSharedFlow
has no replay by default.
Here we cannot use our Job
solution, because the coroutine listening for events will never complete. The coroutine listening for events will stay active until it is explicitly cancelled.
class BaseViewModel : ViewModel() {
private val eventFlow = MutableSharedFlow<Event>()
private val eventFlowJob: Job
init {
eventFlowJob = eventFlow
// ...
.onEach(::handleIntent)
// ...
.launchIn(viewModelScope)
}
fun sendEvent(event: Event) {
viewModelScope.launch {
eventFlowJob.join() // WAITING FOREVER!
intentFlow.emit(event)
}
}
fun handleIntent(event: Event) {
// ...
}
}
class SomeViewModel : BaseViewModel() {
init {
sendEvent(Event()) // Will it be handled or not?
}
}
The simplest functional solution is to give our MutableSharedFlow
some replay. Then, each new observer will receive past events. Beware, however, that this solution changes the behavior of our flow, and it might be problematic if we plan to use other observers. Each new observer will receive all past events.
class BaseViewModel : ViewModel() {
private val eventFlow = MutableSharedFlow<Event>(replay = Int.MAX_VALUE)
init {
eventFlow
// ...
.onEach(::handleIntent)
// ...
.launchIn(viewModelScope)
}
fun sendEvent(event: Event) {
viewModelScope.launch {
intentFlow.emit(event)
}
}
fun handleIntent(event: Event) {
// ...
}
}
class SomeViewModel : BaseViewModel() {
init {
sendEvent(Event()) // Will it be handled or not?
}
}
There is another solution, that should certainly be known to those using SharedFlow
intensively. We use CompletableDeferred
(more intuitive) or Job
(lighter) to wait for the listener to be set up. For those objects we can await completion using join
, and we can complete them using complete
function. The best place to complete them is in onSubscription
operator, that is called when the first observer subscribes to the flow. We could also use onStart
operator, that is called for each new observer.
class BaseViewModel : ViewModel() {
private val eventFlow = MutableSharedFlow<Event>()
private val eventFlowListenerStarted = CompletableDeferred<Unit>()
init {
eventFlow
.onSubscription { eventFlowListenerStarted.complete(Unit) }
// ...
.onEach(::handleIntent)
// ...
.launchIn(viewModelScope)
}
fun sendEvent(event: Event) {
viewModelScope.launch {
eventFlowListenerStarted.join()
intentFlow.emit(event)
}
}
fun handleIntent(event: Event) {
// ...
}
}
class BaseViewModel : ViewModel() {
private val eventFlow = MutableSharedFlow<Event>()
private val eventFlowListenerStarted = Job()
init {
eventFlow
.onSubscription { eventFlowListenerStarted.complete() }
// ...
.onEach(::handleIntent)
// ...
.launchIn(viewModelScope)
}
fun sendEvent(event: Event) {
viewModelScope.launch {
eventFlowListenerStarted.join()
intentFlow.emit(event)
}
}
fun handleIntent(event: Event) {
// ...
}
}
Technically speaking, there is one more suction, but I find it overly complicated. We can use subscriptionCount
from MutableSharedFlow
.
class BaseViewModel : ViewModel() {
private val eventFlow = MutableSharedFlow<Event>()
init {
eventFlow
// ...
.onEach(::handleIntent)
// ...
.launchIn(viewModelScope)
}
fun sendEvent(event: Event) {
viewModelScope.launch {
waitForListener()
intentFlow.emit(event)
}
}
fun handleIntent(event: Event) {
// ...
}
suspend fun waitForListener() {
val subscriptionCount = eventFlow.subscriptionCount
if (subscriptionCount.value == 0) {
subscriptionCount
.filter { it > 0 }
.first()
}
}
}
I would like to thank Damian Koźlak and Krzysztof Dąbrowski from Codequest for inspiration for the article.
Marcin Moskala is a highly experienced developer and Kotlin instructor as the founder of Kt. Academy, an official JetBrains partner specializing in Kotlin training, Google Developers Expert, known for his significant contributions to the Kotlin community. Moskala is the author of several widely recognized books, including "Effective Kotlin," "Kotlin Coroutines," "Functional Kotlin," "Advanced Kotlin," "Kotlin Essentials," and "Android Development with Kotlin."
Beyond his literary achievements, Moskala is the author of the largest Medium publication dedicated to Kotlin. As a respected speaker, he has been invited to share his insights at numerous programming conferences, including events such as Droidcon and the prestigious Kotlin Conf, the premier conference dedicated to the Kotlin programming language.