A Pattern for Composing Flow Operations
Kotlin Flows are powerful tools to handle streams of data in a suspending manner.
Similar to RxJava Observables and Reactor's Flux, Flows represent a sequence of values that can be emitted over time. However, Flows have several key differences: They are the Coroutines-enabled way to process streams of data, they have support for backpressure, and a much more streamlined API. With Flows, as any part of the Kotlin Coroutines library, developers can write asynchronous, non-blocking code in a more natural and intuitive way.
And, while most of the very useful operators are widely documented (such as the game-changing transform operator), the composition and extension pattern for Flows is something that I don't think I see mentioned often.
In this article, We shall explore this pattern, and hopefully add another tool to our toolbox when composing flow code.
The Problem
Consider the following: You have a function that produces a Flow of your favorite DTO, such as a JPA Repository like the following:
Later, you can query the data from this repository, and extract a Flow<CustomerEntity>
from it, and want to do some operations on it:
the flow composition works as a sequence: for every CustomerEntity
streamed in, it will call an external service to verify its eligibility to the greeting function, then generate a message, and then collect this calling another service to actually send that greeting. And while we are working in a suspenseful fashion, could we potentially make these calls happen in parallel?
The initial instinct to add async {}
or launch {}
blocks are immediately met with a problem: the flow composition functions do not have a coroutineScope
! This is intentional: Flows are meant to work sequentially. This is also highlighted in the Flow documentation in the Kotlin Docs:
For async operations, we have the ChannelFlow
type that allow us to do that. However, we hit another snag:
The ChannelFlow
type is meant to be an internal type. How could we compose our flow in such a way to leverage the power of ChannelFlow
, but having to declare our types using the generic FLow<T>
type?
The Solution
We can leverage the power of cold flows, and compose them by collecting a flow in the definition of another flow. This is a deceptively simple pattern, but it allows us to compose flows in a very powerful way.
In our case, to leverage a ChannelFlow
in our function above, we could write code as such:
If the composed flow is not collected, the original flow won't be either: the cold, inert nature of flows ensures that collection happens only when requested!
Bear in mind this pattern is not a silver bullet: Not all scenarios call for aggressive asynchronous implementations, and adding a
ChannelFlow
as above can come to have performance and business costs, because we are choosing to break away from the sequential nature of the flow. As one can see below, this composition pattern can be used in many other ways.
The Pattern
In other words, the pattern for composing flows, in more generic terms, is:
Where flowBuilder
matches the construction function for your flow (such as flow {}
or channelFlow {}
), and yield
is the function that will emit
the value to the flow (such as send
for ChannelFlow
).
More usages
Something that had eluded me for a long while I was doing aggregation functions within the Flow
API. Let's suppose we have a stream of values, and we want to emit the average of the last N
results received. We can utilize this composition pattern as such:
This is a very simple example, but it shows how we can compose flows to do more complex operations. In this case, we are using a ArrayDeque
to store the last N
values, and then emitting the average of those values, and removing the first value in the buffer to store the next emission. One could use this for different grouping and/or data massaging functions as needed.
However, depending on your usecases, there may already be one or more appropriate operators available. Make sure to always double-check the Flow documentation to see if you can leverage tried and true operators before we compose our own!
Conclusion
Flows are powerful tools, and I think this pattern is not as well known as it should be. I hope this article helps you in your journey to compose flows in a more powerful way for your usecases!
This article was originally published on Renato Costa's blog.