article banner (priority)

Understanding Flow

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

Kotlin Coroutines Flow is a much simpler concept than most developers think. It is just a definition of which operations to execute. It's similar to a suspending lambda expression, but with some extra elements. In this chapter, I will show you how to implement the Flow interface and flow builder by transforming a lambda expression step by step. This should give you a deep understanding of how Flow works. This chapter is for curious minds who like to truly understand the tools they use. If this isn't you, feel free to skip this chapter. If you decide to continue reading it, I hope you enjoy it.

Understanding Flow

We'll start our story with a simple lambda expression. Each lambda expression can be defined once and then called multiple times.

fun main() { val f: () -> Unit = { print("A") print("B") print("C") } f() // ABC f() // ABC }

To make it a bit spicier, let's make our lambda expression suspend and add some delay inside it. Notice that each call of such a lambda expression is sequential, so you shouldn't make another call until the previous one is finished.

import kotlinx.coroutines.delay suspend fun main() { val f: suspend () -> Unit = { print("A") delay(1000) print("B") delay(1000) print("C") } f() f() } // A // (1 sec) // B // (1 sec) // C // A // (1 sec) // B // (1 sec) // C

A lambda expression might have a parameter that can represent a function. We will call this parameter emit. So, when you call the lambda expression f, you need to specify another lambda expression that will be used as emit.

suspend fun main() { val f: suspend ((String) -> Unit) -> Unit = { emit -> emit("A") emit("B") emit("C") } f { print(it) } // ABC f { print(it) } // ABC }

The fact is that emit should also be a suspending function. Our function type is already getting quite complex, so we'll simplify it by defining a FlowCollector function interface with an abstract method named emit. We will use this interface instead of the function type. The trick is that functional interfaces can be defined with lambda expressions, therefore we don't need to change the f call.

import kotlin.* fun interface FlowCollector { suspend fun emit(value: String) } suspend fun main() { val f: suspend (FlowCollector) -> Unit = { it.emit("A") it.emit("B") it.emit("C") } f { print(it) } // ABC f { print(it) } // ABC }

Calling emit on it is not convenient; instead, we'll make FlowCollector a receiver. Thanks to that, inside our lambda expression there is a receiver (this keyword) of type FlowCollector. This means we can call this.emit or just emit. The f invocation still stays the same.

import kotlin.* fun interface FlowCollector { suspend fun emit(value: String) } suspend fun main() { val f: suspend FlowCollector.() -> Unit = { emit("A") emit("B") emit("C") } f { print(it) } // ABC f { print(it) } // ABC }

Instead of passing around lambda expressions, we prefer to have an object implementing an interface. We will call this interface Flow and wrap our definition with an object expression.

import kotlin.* fun interface FlowCollector { suspend fun emit(value: String) } interface Flow { suspend fun collect(collector: FlowCollector) } suspend fun main() { val builder: suspend FlowCollector.() -> Unit = { emit("A") emit("B") emit("C") } val flow: Flow = object : Flow { override suspend fun collect( collector: FlowCollector ) { collector.builder() } } flow.collect { print(it) } // ABC flow.collect { print(it) } // ABC }

Finally, let's define the flow builder function to simplify our flow creation.

import kotlin.* fun interface FlowCollector { suspend fun emit(value: String) } interface Flow { suspend fun collect(collector: FlowCollector) } fun flow( builder: suspend FlowCollector.() -> Unit ) = object : Flow { override suspend fun collect(collector: FlowCollector) { collector.builder() } } suspend fun main() { val f: Flow = flow { emit("A") emit("B") emit("C") } f.collect { print(it) } // ABC f.collect { print(it) } // ABC }

The last change we need is to replace String with a generic type parameter in order to allow emitting and collecting any type of value.

import kotlin.* fun interface FlowCollector<T> { suspend fun emit(value: T) } interface Flow<T> { suspend fun collect(collector: FlowCollector<T>) } fun <T> flow( builder: suspend FlowCollector<T>.() -> Unit ) = object : Flow<T> { override suspend fun collect( collector: FlowCollector<T> ) { collector.builder() } } suspend fun main() { val f: Flow<String> = flow { emit("A") emit("B") emit("C") } f.collect { print(it) } // ABC f.collect { print(it) } // ABC }

That's it! This is nearly exactly how Flow, FlowCollector, and flow are implemented. When you call collect, you invoke the lambda expression from the flow builder call. When this expression calls emit, it calls the lambda expression specified when collect was called. This is how it works.

The presented builder is the most basic way to create a flow. Later we'll learn about other builders, but they generally just use flow under the hood.

public fun <T> Iterator<T>.asFlow(): Flow<T> = flow { forEach { value -> emit(value) } } public fun <T> Sequence<T>.asFlow(): Flow<T> = flow { forEach { value -> emit(value) } } public fun <T> flowOf(vararg elements: T): Flow<T> = flow { for (element in elements) { emit(element) } }

How Flow processing works

Flow can be considered a bit more complicated than suspending lambda expressions with a receiver. However, its power lies in all the functions defined for its creation, processing, and observation. Most of them are actually very simple under the hood. We will learn about them in the next chapters, but I want you to have the intuition that most of them are very simple and can be easily constructed using flow, collect, and emit.

Consider the map function that transforms each element of a flow. It creates a new flow, so it uses the flow builder. When its flow is started, it needs to start the flow it wraps; so, inside the builder, it calls the collect method. Whenever an element is received, map transforms this element and then emits it to the new flow.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun <T, R> Flow<T>.map( transformation: suspend (T) -> R ): Flow<R> = flow { collect { emit(transformation(it)) } } suspend fun main() { flowOf("A", "B", "C") .map { delay(1000) it.lowercase() } .collect { println(it) } } // (1 sec) // a // (1 sec) // b // (1 sec) // c

The behavior of most of the methods that we'll learn about in the next chapters is just as simple. It is important to understand this because it not only helps us better understand how our code works but also teaches us how to write similar functions.

fun <T> Flow<T>.filter( predicate: suspend (T) -> Boolean ): Flow<T> = flow { collect { if (predicate(it)) { emit(it) } } } fun <T> Flow<T>.onEach( action: suspend (T) -> Unit ): Flow<T> = flow { collect { action(it) emit(it) } } // simplified implementation fun <T> Flow<T>.onStart( action: suspend () -> Unit ): Flow<T> = flow { action() collect { emit(it) } }

Flow is synchronous

Notice that Flow is synchronous by nature, just like suspending functions: the collect call is suspended until the flow is completed. This also means that a flow doesn't start any new coroutines. Its concrete steps can do it, just like suspending functions can start coroutines, but this is not the default behavior for suspending functions. Most flow processing steps are executed synchronously, which is why a delay inside onEach introduces a delay between each element, not before all elements.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun main() { flowOf("A", "B", "C") .onEach { delay(1000) } .collect { println(it) } } // (1 sec) // A // (1 sec) // B // (1 sec) // C

That can be changed with functions like buffer or conflate, that start a new coroutine for everything above, or with coroutine builders like channelFlow, that start a new coroutine for its body. But the default behavior is synchronous.

Flow and shared state

When you implement more complex algorithms for flow processing, you should know when you need to synchronize access to variables. Let's analyze the most important use cases. When you implement some custom flow processing functions, you can define mutable states inside the flow without any mechanism for synchronization because a flow step is synchronous by nature.

fun <T, K> Flow<T>.distinctBy( keySelector: (T) -> K ) = flow { val sentKeys = mutableSetOf<K>() collect { value -> val key = keySelector(value) if (key !in sentKeys) { sentKeys.add(key) emit(value) } } }

Here is an example that is used inside a flow step and produces consistent results; the counter variable is always incremented to 1000.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.random.Random fun Flow<*>.counter() = flow<Int> { var counter = 0 collect { counter++ // to make it busy for a while List(100) { Random.nextLong() }.shuffled().sorted() emit(counter) } } suspend fun main(): Unit = coroutineScope { val f1 = List(1000) { "$it" }.asFlow() val f2 = List(1000) { "$it" }.asFlow() .counter() launch { println(f1.counter().last()) } // 1000 launch { println(f1.counter().last()) } // 1000 launch { println(f2.last()) } // 1000 launch { println(f2.last()) } // 1000 }

It is a common mistake to extract a variable from outside a flow step into a function. Such a variable is shared between all the coroutines that are collecting from the same flow. It requires synchronization and is flow-specific, not flow-collection-specific. Therefore, f2.last() returns around 2000, not 1000, because it is a result of counting elements from two flow executions in parallel.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.random.Random fun Flow<*>.counter(): Flow<Int> { var counter = 0 return this.map { counter++ // to make it busy for a while List(100) { Random.nextLong() }.shuffled().sorted() counter } } suspend fun main(): Unit = coroutineScope { val f1 = List(1_000) { "$it" }.asFlow() val f2 = List(1_000) { "$it" }.asFlow() .counter() launch { println(f1.counter().last()) } // 1000 launch { println(f1.counter().last()) } // 1000 launch { println(f2.last()) } // less than 2000 launch { println(f2.last()) } // less than 2000 }

Finally, just as suspending functions using the same variables need synchronization, a variable used in a flow needs synchronization if it's defined outside a function, on the scope of a class, or at the top-level.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.random.Random var counter = 0 fun Flow<*>.counter(): Flow<Int> = this.map { counter++ // to make it busy for a while List(100) { Random.nextLong() }.shuffled().sorted() counter } suspend fun main(): Unit = coroutineScope { val f1 = List(1_000) { "$it" }.asFlow() val f2 = List(1_000) { "$it" }.asFlow() .counter() launch { println(f1.counter().last()) } // less than 4000 launch { println(f1.counter().last()) } // less than 4000 launch { println(f2.last()) } // less than 4000 launch { println(f2.last()) } // less than 4000 }

Conclusion

Flow can be considered a bit more complicated than a suspending lambda expression with a receiver, and its processing functions just decorate it with new operations. There is no magic here: how Flow and most of its methods are defined is simple and straightforward.