flatMapMerge is... weird
Operating on Kotlin Flow is great. It is simple, supports many useful functions with name consistent with collection processing, and provides abstractions to nearly everything we need. Though one ugly thing is flatMapMerge, that we often need to use, and it offers a couple of unpleasant surprises.
Introduction: flatMap
Let's start from saying, that there is no flatMap function on Flow (not anymore). It is because Flow is cold by its nature, and it operates on a single coroutine by default, so the most consistent implementation of flatMap should be synchronous. Though, the general expectation is that flatMap on a stream of data should be asynchronous. So we have flatMapConcat that is synchronous, and flatMapMerge that is asynchronous.
This sounds like a good idea, but apparently flatMapMerge implementation is at least surprising in a couple of ways. Here are its biggest problems:
- Transformation is synchronous.
- Has unintuitive default limit.
- Has inconsistent behavior for concurrency 1.
Let's discuss them one by one, but first, let's see its implementation.
flatMapMerge implementation
flatMapMerge implementation is a simple function. It is just using map and then flattenMerge. flattenMerge for concurrency > 1 uses ChannelFlowMerge.
This implementation shows all the problems, but let's see them now in their natural habitat: in real-life use cases.
flatMapMerge starts asynchronous flows, but its transformation is synchronous
Let's say, that you need to fetch a student for each id in a flow. This is a common incorrect implementation:
The problem is, that even though flatMapMerge starts a coroutine for each flow created by its transformation, the transformation itself is synchronous. It is visible in the implementation of flatMapMerge: the first step is map, which is a synchronous operation. That is why the above studentsFlow would fetch students synchronously. Here is an executable example:
There are at least a couple of popular ways how to deal with this issue. We can use flow builder, or a suspend lambda and asFlow, or a custom flowOf function that accepts suspending lambda, nevertheless each of those solutions add some extra complexity.
Here is an executable solution:
These become a common pattern that Kotlin Coroutines youngsters learn when they start using this library. People got used to that, but it does not justify unnecessary complexity it introduces.
flatMapMerge has unintuitive default limit
When you use flatMapMerge, you can set concurrency limit, which is a great feature. The problem is that there is a default limit, which is 16. Why 16? Why not 64 or 128? flatMapMerge is a universal function, and I believe that the only reasonable default limit would be Int.MAX_VALUE. 16 is a low number, much lower than the limit of most network clients, and it might easily surprise developers who just wanted to start a couple of coroutines concurrently.
flatMapMerge has inconsistent behavior for concurrency 1
Time for the last problem with flatMapMerge, that I haven't realized until recently (great thanks to the great group from the Commerce Media Tech who helped me realize this). flatMapMerge guarantees that for concurrency == 1 it behaves just like flatMapConcat. But this is inconsistent with how flatMapMerge should behave for concurrency == 1.
You see, flatMapMerge uses one coroutine to transform each element to a flow, and then it starts a new coroutine for each created flow. That means, that for concurrency == 1 it should use two coroutines. One for the transformation, and one for the flow. But it doesn't. It uses one coroutine for both, and the only reason for that is that flatMapConcat behaves like that. Let me show you an example. The below code takes 12 seconds, because it uses only one coroutine. The same coroutine waits for "A", then waits for "A_0", "A_1", "A_2", and then waits for "B", "B_0", "B_1", "B_2", and so on.
If we use flatMapMerge with concurrency == 3, the code takes 6 seconds, because it uses 4 coroutines: 1 to produce "A", "B", "C", and then one of the created flows. The last flow is created after 3 seconds, so then its coroutine is started, and it needs 3 seconds to finish, so the whole process takes 6 seconds.
If there were no special behavior for concurrency == 1, the whole process would take 10 seconds, because it would use two coroutines. One for all transformations, and one for the flow. This is how output would look like:
(2 sec) A_0 (1 sec) A_1 (1 sec) A_2 (1 sec) B_0 (1 sec) B_1 (1 sec) B_2 (1 sec) C_0 (1 sec) C_1 (1 sec) C_2However, since flatMapMerge was designed to behave like flatMapConcat for concurrency == 1, it uses only one coroutine, and the whole process takes 12 seconds. This is how output looks like:
If you want to introduce a new coroutine for the below and above flow, you can use buffer function (Beware: buffer also has an unintuitive default limit, which is 64). With it, we can simulate how flatMapMerge should behave for concurrency == 1.
concurrentMap
If you are not happy with the default behavior of flatMapMerge, you can create your own concurrentMap function. Such a function would be optimized for suspending elements' mapping, and could have no default limit. Such a function would be a safer and more predictable alternative to flatMapMerge.
Summary
Be careful with flatMapMerge. It is a great function, but it has a couple of unpleasant surprises. Even though it starts coroutines for each flow created by its transformation, the transformation itself is synchronous. It has an unintuitive default limit of 16, and it has inconsistent behavior for concurrency == 1.
If you want to learn Kotlin Coroutines well with me, you can join my Kotlin Coroutines Workshop. There is an open edition of this workshop soon that you can join, or you can organize it in your company. You can also check out my book, Kotlin Coroutines: Deep Dive.
