Understanding Flow
This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.
Kotlin Coroutines Flow is a much simpler concept than most developers think. It is just a definition of which operations to execute. It's similar to a suspending lambda expression, but with some extra elements. In this chapter, I will show you how to implement the Flow
interface and flow
builder by transforming a lambda expression step by step. This should give you a deep understanding of how Flow works. This chapter is for curious minds who like to truly understand the tools they use. If this isn't you, feel free to skip this chapter. If you decide to continue reading it, I hope you enjoy it.
Understanding Flow
We'll start our story with a simple lambda expression. Each lambda expression can be defined once and then called multiple times.
To make it a bit spicier, let's make our lambda expression suspend
and add some delay inside it. Notice that each call of such a lambda expression is sequential, so you shouldn't make another call until the previous one is finished.
A lambda expression might have a parameter that can represent a function. We will call this parameter emit
. So, when you call the lambda expression f
, you need to specify another lambda expression that will be used as emit
.
The fact is that emit
should also be a suspending function. Our function type is already getting quite complex, so we'll simplify it by defining a FlowCollector
function interface with an abstract method named emit
. We will use this interface instead of the function type. The trick is that functional interfaces can be defined with lambda expressions, therefore we don't need to change the f
call.
Calling emit
on it
is not convenient; instead, we'll make FlowCollector
a receiver. Thanks to that, inside our lambda expression there is a receiver (this
keyword) of type FlowCollector
. This means we can call this.emit
or just emit
. The f
invocation still stays the same.
Instead of passing around lambda expressions, we prefer to have an object implementing an interface. We will call this interface Flow
and wrap our definition with an object expression.
Finally, let's define the flow
builder function to simplify our flow creation.
The last change we need is to replace String
with a generic type parameter in order to allow emitting and collecting any type of value.
That's it! This is nearly exactly how Flow
, FlowCollector
, and flow
are implemented. When you call collect
, you invoke the lambda expression from the flow
builder call. When this expression calls emit
, it calls the lambda expression specified when collect
was called. This is how it works.
The presented builder is the most basic way to create a flow. Later we'll learn about other builders, but they generally just use flow
under the hood.
How Flow
processing works
Flow
can be considered a bit more complicated than suspending lambda expressions with a receiver. However, its power lies in all the functions defined for its creation, processing, and observation. Most of them are actually very simple under the hood. We will learn about them in the next chapters, but I want you to have the intuition that most of them are very simple and can be easily constructed using flow
, collect
, and emit
.
Consider the map
function that transforms each element of a flow. It creates a new flow, so it uses the flow
builder. When its flow is started, it needs to start the flow it wraps; so, inside the builder, it calls the collect
method. Whenever an element is received, map
transforms this element and then emits it to the new flow.
The behavior of most of the methods that we'll learn about in the next chapters is just as simple. It is important to understand this because it not only helps us better understand how our code works but also teaches us how to write similar functions.
Flow is synchronous
Notice that Flow is synchronous by nature, just like suspending functions: the collect
call is suspended until the flow is completed. This also means that a flow doesn't start any new coroutines. Its concrete steps can do it, just like suspending functions can start coroutines, but this is not the default behavior for suspending functions. Most flow processing steps are executed synchronously, which is why a delay
inside onEach
introduces a delay between each element, not before all elements.
That can be changed with functions like buffer
or conflate
, that start a new coroutine for everything above, or with coroutine builders like channelFlow
, that start a new coroutine for its body. But the default behavior is synchronous.
Flow and shared state
When you implement more complex algorithms for flow processing, you should know when you need to synchronize access to variables. Let's analyze the most important use cases. When you implement some custom flow processing functions, you can define mutable states inside the flow without any mechanism for synchronization because a flow step is synchronous by nature.
Here is an example that is used inside a flow step and produces consistent results; the counter variable is always incremented to 1000.
It is a common mistake to extract a variable from outside a flow step into a function. Such a variable is shared between all the coroutines that are collecting from the same flow. It requires synchronization and is flow-specific, not flow-collection-specific. Therefore, f2.last()
returns around 2000, not 1000, because it is a result of counting elements from two flow executions in parallel.
Finally, just as suspending functions using the same variables need synchronization, a variable used in a flow needs synchronization if it's defined outside a function, on the scope of a class, or at the top-level.
Conclusion
Flow
can be considered a bit more complicated than a suspending lambda expression with a receiver, and its processing functions just decorate it with new operations. There is no magic here: how Flow
and most of its methods are defined is simple and straightforward.