flatMapMerge is... weird
Operating on Kotlin Flow is great. It is simple, supports many useful functions with name consistent with collection processing, and provides abstractions to nearly everything we need. Though one ugly thing is flatMapMerge
, that we often need to use, and it offers a couple of unpleasant surprises.
Introduction: flatMap
Let's start from saying, that there is no flatMap
function on Flow
(not anymore). It is because Flow
is cold by its nature, and it operates on a single coroutine by default, so the most consistent implementation of flatMap
should be synchronous. Though, the general expectation is that flatMap
on a stream of data should be asynchronous. So we have flatMapConcat
that is synchronous, and flatMapMerge
that is asynchronous.
This sounds like a good idea, but apparently flatMapMerge
implementation is at least surprising in a couple of ways. Here are its biggest problems:
- Transformation is synchronous.
- Has unintuitive default limit.
- Has inconsistent behavior for concurrency 1.
Let's discuss them one by one, but first, let's see its implementation.
flatMapMerge
implementation
flatMapMerge
implementation is a simple function. It is just using map
and then flattenMerge
. flattenMerge
for concurrency > 1
uses ChannelFlowMerge
.
This implementation shows all the problems, but let's see them now in their natural habitat: in real-life use cases.
flatMapMerge
starts asynchronous flows, but its transformation is synchronous
Let's say, that you need to fetch a student for each id in a flow. This is a common incorrect implementation:
The problem is, that even though flatMapMerge
starts a coroutine for each flow created by its transformation, the transformation itself is synchronous. It is visible in the implementation of flatMapMerge
: the first step is map
, which is a synchronous operation. That is why the above studentsFlow
would fetch students synchronously. Here is an executable example:
There are at least a couple of popular ways how to deal with this issue. We can use flow
builder, or a suspend lambda and asFlow
, or a custom flowOf
function that accepts suspending lambda, nevertheless each of those solutions add some extra complexity.
Here is an executable solution:
These become a common pattern that Kotlin Coroutines youngsters learn when they start using this library. People got used to that, but it does not justify unnecessary complexity it introduces.
flatMapMerge
has unintuitive default limit
When you use flatMapMerge
, you can set concurrency limit, which is a great feature. The problem is that there is a default limit, which is 16. Why 16? Why not 64 or 128? flatMapMerge
is a universal function, and I believe that the only reasonable default limit would be Int.MAX_VALUE
. 16 is a low number, much lower than the limit of most network clients, and it might easily surprise developers who just wanted to start a couple of coroutines concurrently.
flatMapMerge
has inconsistent behavior for concurrency 1
Time for the last problem with flatMapMerge
, that I haven't realized until recently (great thanks to the great group from the Commerce Media Tech who helped me realize this). flatMapMerge
guarantees that for concurrency == 1
it behaves just like flatMapConcat
. But this is inconsistent with how flatMapMerge
should behave for concurrency == 1
.
You see, flatMapMerge
uses one coroutine to transform each element to a flow, and then it starts a new coroutine for each created flow. That means, that for concurrency == 1
it should use two coroutines. One for the transformation, and one for the flow. But it doesn't. It uses one coroutine for both, and the only reason for that is that flatMapConcat
behaves like that. Let me show you an example. The below code takes 12 seconds, because it uses only one coroutine. The same coroutine waits for "A", then waits for "A_0", "A_1", "A_2", and then waits for "B", "B_0", "B_1", "B_2", and so on.
If we use flatMapMerge
with concurrency == 3
, the code takes 6 seconds, because it uses 4 coroutines: 1 to produce "A", "B", "C", and then one of the created flows. The last flow is created after 3 seconds, so then its coroutine is started, and it needs 3 seconds to finish, so the whole process takes 6 seconds.
If there were no special behavior for concurrency == 1
, the whole process would take 10 seconds, because it would use two coroutines. One for all transformations, and one for the flow. This is how output would look like:
(2 sec) A_0 (1 sec) A_1 (1 sec) A_2 (1 sec) B_0 (1 sec) B_1 (1 sec) B_2 (1 sec) C_0 (1 sec) C_1 (1 sec) C_2
However, since flatMapMerge
was designed to behave like flatMapConcat
for concurrency == 1
, it uses only one coroutine, and the whole process takes 12 seconds. This is how output looks like:
If you want to introduce a new coroutine for the below and above flow, you can use buffer
function (Beware: buffer
also has an unintuitive default limit, which is 64). With it, we can simulate how flatMapMerge
should behave for concurrency == 1
.
concurrentMap
If you are not happy with the default behavior of flatMapMerge
, you can create your own concurrentMap
function. Such a function would be optimized for suspending elements' mapping, and could have no default limit. Such a function would be a safer and more predictable alternative to flatMapMerge
.
Summary
Be careful with flatMapMerge
. It is a great function, but it has a couple of unpleasant surprises. Even though it starts coroutines for each flow created by its transformation, the transformation itself is synchronous. It has an unintuitive default limit of 16, and it has inconsistent behavior for concurrency == 1
.
If you want to learn Kotlin Coroutines well with me, you can join my Kotlin Coroutines Workshop. There is an open edition of this workshop soon that you can join, or you can organize it in your company. You can also check out my book, Kotlin Coroutines: Deep Dive.