article banner (priority)

flatMapMerge is... weird

Operating on Kotlin Flow is great. It is simple, supports many useful functions with name consistent with collection processing, and provides abstractions to nearly everything we need. Though one ugly thing is flatMapMerge, that we often need to use, and it offers a couple of unpleasant surprises.

Introduction: flatMap

Let's start from saying, that there is no flatMap function on Flow (not anymore). It is because Flow is cold by its nature, and it operates on a single coroutine by default, so the most consistent implementation of flatMap should be synchronous. Though, the general expectation is that flatMap on a stream of data should be asynchronous. So we have flatMapConcat that is synchronous, and flatMapMerge that is asynchronous.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlin.time.measureTime suspend fun main() { measureTime { flowOf("A", "B", "C") .flatMapConcat { flowFrom(it) } .collect { print(it) } // A_0 A_1 A_2 B_0 B_1 B_2 C_0 C_1 C_2 }.inWholeSeconds.let(::print) // 9 } fun flowFrom(elem: Any) = flowOf(0, 1, 2) .onEach { delay(1000) } .map { "${elem}_${it} " }
import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlin.time.measureTime suspend fun main() { measureTime { flowOf("A", "B", "C") .flatMapMerge { flowFrom(it) } .collect { print(it) } }.inWholeSeconds.let(::print) } // (order may vary) // (1 sec) B_0 A_0 C_0 (1 sec) A_1 C_1 B_1 (1 sec) A_2 C_2 B_2 3 fun flowFrom(elem: Any) = flowOf(0, 1, 2) .onEach { delay(1000) } .map { "${elem}_${it} " }

This sounds like a good idea, but apparently flatMapMerge implementation is at least surprising in a couple of ways. Here are its biggest problems:

  • Transformation is synchronous.
  • Has unintuitive default limit.
  • Has inconsistent behavior for concurrency 1.

Let's discuss them one by one, but first, let's see its implementation.

flatMapMerge implementation

flatMapMerge implementation is a simple function. It is just using map and then flattenMerge. flattenMerge for concurrency > 1 uses ChannelFlowMerge.

public fun <T, R> Flow<T>.flatMapMerge( concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R> ): Flow<R> = map(transform).flattenMerge(concurrency) public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> { require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" } return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency) } // Simplified internal class ChannelFlowMerge<T>( private val flow: Flow<Flow<T>>, private val concurrency: Int, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ) : ChannelFlow<T>(context, capacity, onBufferOverflow) { override suspend fun collectTo(scope: ProducerScope<T>) { val semaphore = Semaphore(concurrency) val collector = SendingCollector(scope) flow.collect { inner -> semaphore.acquire() scope.launch { try { inner.collect(collector) } finally { semaphore.release() } } } } // ... }

This implementation shows all the problems, but let's see them now in their natural habitat: in real-life use cases.

flatMapMerge starts asynchronous flows, but its transformation is synchronous

Let's say, that you need to fetch a student for each id in a flow. This is a common incorrect implementation:

fun studentsFlow(studentIds: Flow<Int>): Flow<Student> = studentIds .flatMapMerge { flowOf(fetchStudent(it)) } private suspend fun fetchStudent(id: Int): Student = ...

The problem is, that even though flatMapMerge starts a coroutine for each flow created by its transformation, the transformation itself is synchronous. It is visible in the implementation of flatMapMerge: the first step is map, which is a synchronous operation. That is why the above studentsFlow would fetch students synchronously. Here is an executable example:

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flowOf import kotlin.time.measureTime suspend fun produceStudent(id: Int): String { delay(1000) return "Student$id" } suspend fun main() { measureTime { flowOf(1, 2, 3) .flatMapMerge { flowOf(produceStudent(it)) } .collect { println(it) } }.inWholeSeconds.let(::print) } // (1 sec) // Student1 // (1 sec) // Student2 // (1 sec) // Student3 // 3

There are at least a couple of popular ways how to deal with this issue. We can use flow builder, or a suspend lambda and asFlow, or a custom flowOf function that accepts suspending lambda, nevertheless each of those solutions add some extra complexity.

fun studentsFlow(studentIds: Flow<Int>): Flow<Student> = studentIds .flatMapMerge { flow { emit(fetchStudent(it)) } } private suspend fun fetchStudent(id: Int): Student = ...
fun studentsFlow(studentIds: Flow<Int>): Flow<Student> = studentIds .flatMapMerge { suspend { fetchStudent(it) }.asFlow() } private suspend fun fetchStudent(id: Int): Student = ...
fun studentsFlow(studentIds: Flow<Int>): Flow<Student> = studentIds .flatMapMerge { flowOf { fetchStudent(it) } } private suspend fun fetchStudent(id: Int): Student = ... fun <T> flowOf(suspendable: suspend () -> T): Flow<T> = flow { emit(suspendable()) }

Here is an executable solution:

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlin.time.measureTime suspend fun produceStudent(id: Int): String { delay(1000) return "Student$id" } fun <T> flowOf(suspendable: suspend () -> T): Flow<T> = flow { emit(suspendable()) } suspend fun main() { measureTime { flowOf(1, 2, 3) .flatMapMerge { flow { emit(produceStudent(it)) } } // or // .flatMapMerge { suspend { produceStudent(it) }.asFlow() } // or // .flatMapMerge { flowOf { produceStudent(it) } } .collect { println(it) } }.inWholeSeconds.let(::print) // 1 } // (1 sec) // Student1 // Student2 // Student3 // 3

These become a common pattern that Kotlin Coroutines youngsters learn when they start using this library. People got used to that, but it does not justify unnecessary complexity it introduces.

flatMapMerge has unintuitive default limit

When you use flatMapMerge, you can set concurrency limit, which is a great feature. The problem is that there is a default limit, which is 16. Why 16? Why not 64 or 128? flatMapMerge is a universal function, and I believe that the only reasonable default limit would be Int.MAX_VALUE. 16 is a low number, much lower than the limit of most network clients, and it might easily surprise developers who just wanted to start a couple of coroutines concurrently.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlin.time.measureTime suspend fun main() { ('A'..'Z').take(16).last().let(::print) // P measureTime { ('A'..'Z').asFlow() .flatMapMerge { flowFrom(it) } .collect { print(it) } }.inWholeSeconds.let(::print) } // {A-P}_0 // (1 sec) // {A-P}_1 // (1 sec) // {A-P}_2 // (1 sec) // {R-Z}_0 // (1 sec) // {R-Z}_1 // (1 sec) // {R-Z}_2 // (1 sec) // 6 fun flowFrom(elem: Any) = flowOf(0, 1, 2) .onEach { delay(1000) } .map { "${elem}_${it} " }

flatMapMerge has inconsistent behavior for concurrency 1

Time for the last problem with flatMapMerge, that I haven't realized until recently (great thanks to the great group from the Commerce Media Tech who helped me realize this). flatMapMerge guarantees that for concurrency == 1 it behaves just like flatMapConcat. But this is inconsistent with how flatMapMerge should behave for concurrency == 1.

You see, flatMapMerge uses one coroutine to transform each element to a flow, and then it starts a new coroutine for each created flow. That means, that for concurrency == 1 it should use two coroutines. One for the transformation, and one for the flow. But it doesn't. It uses one coroutine for both, and the only reason for that is that flatMapConcat behaves like that. Let me show you an example. The below code takes 12 seconds, because it uses only one coroutine. The same coroutine waits for "A", then waits for "A_0", "A_1", "A_2", and then waits for "B", "B_0", "B_1", "B_2", and so on.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlin.time.measureTime suspend fun main() { measureTime { ('A'..'C').asFlow() .onEach { delay(1000) } .flatMapConcat { flowFrom(it) } .collect { print(it) } }.inWholeSeconds.let(::print) // 12 } // (2 sec) A_0 (1 sec) A_1 (1 sec) A_2 (2 sec) B_0 (1 sec) B_1 (1 sec) B_2 (2 sec) C_0 (1 sec) C_1 (1 sec) C_2 fun flowFrom(elem: Any) = flowOf(0, 1, 2) .onEach { delay(1000) } .map { "${elem}_${it} " }

If we use flatMapMerge with concurrency == 3, the code takes 6 seconds, because it uses 4 coroutines: 1 to produce "A", "B", "C", and then one of the created flows. The last flow is created after 3 seconds, so then its coroutine is started, and it needs 3 seconds to finish, so the whole process takes 6 seconds.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlin.time.measureTime suspend fun main() { measureTime { ('A'..'C').asFlow() .onEach { delay(1000) } .flatMapMerge(3) { flowFrom(it) } .collect { print(it) } }.inWholeSeconds.let(::print) // 6 } // (2 sec) // A_0 // (1 sec) // A_1 // B_0 // (1 sec) // A_2 // B_1 // C_0 // (1 sec) // B_2 // C_1 // (1 sec) // C_2 fun flowFrom(elem: Any) = flowOf(0, 1, 2) .onEach { delay(1000) } .map { "${elem}_${it} " }

If there were no special behavior for concurrency == 1, the whole process would take 10 seconds, because it would use two coroutines. One for all transformations, and one for the flow. This is how output would look like:

(2 sec) A_0 (1 sec) A_1 (1 sec) A_2 (1 sec) B_0 (1 sec) B_1 (1 sec) B_2 (1 sec) C_0 (1 sec) C_1 (1 sec) C_2

However, since flatMapMerge was designed to behave like flatMapConcat for concurrency == 1, it uses only one coroutine, and the whole process takes 12 seconds. This is how output looks like:

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlin.time.measureTime suspend fun main() { measureTime { ('A'..'C').asFlow() .onEach { delay(1000) } .flatMapMerge(1) { flowFrom(it) } .collect { print(it) } }.inWholeSeconds.let(::print) // 12 } // (2 sec) A_0 (1 sec) A_1 (1 sec) A_2 (2 sec) B_0 (1 sec) B_1 (1 sec) B_2 (2 sec) C_0 (1 sec) C_1 (1 sec) C_2 fun flowFrom(elem: Any) = flowOf(0, 1, 2) .onEach { delay(1000) } .map { "${elem}_${it} " }

If you want to introduce a new coroutine for the below and above flow, you can use buffer function (Beware: buffer also has an unintuitive default limit, which is 64). With it, we can simulate how flatMapMerge should behave for concurrency == 1.

import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlin.time.measureTime suspend fun main() { measureTime { ('A'..'C').asFlow() .onEach { delay(1000) } .buffer() .flatMapMerge(1) { flowFrom(it) } .collect { print(it) } }.inWholeSeconds.let(::print) // 12 } // (2 sec) A_0 (1 sec) A_1 (1 sec) A_2 (1 sec) B_0 (1 sec) B_1 (1 sec) B_2 (1 sec) C_0 (1 sec) C_1 (1 sec) C_2 fun flowFrom(elem: Any) = flowOf(0, 1, 2) .onEach { delay(1000) } .map { "${elem}_${it} " }

concurrentMap

If you are not happy with the default behavior of flatMapMerge, you can create your own concurrentMap function. Such a function would be optimized for suspending elements' mapping, and could have no default limit. Such a function would be a safer and more predictable alternative to flatMapMerge.

fun <T, Y> Flow<T>.concurrentMap( maxConcurrency: Int = Int.MAX_VALUE, transform: suspend (T) -> Y ): Flow<Y> = this .flatMapMerge(concurrency = maxConcurrency) { flow { emit(transform(it)) } } // usage fun studentsFlow(studentIds: Flow<Int>): Flow<Student> = studentIds .concurrentMap { fetchStudent(it) }

Summary

Be careful with flatMapMerge. It is a great function, but it has a couple of unpleasant surprises. Even though it starts coroutines for each flow created by its transformation, the transformation itself is synchronous. It has an unintuitive default limit of 16, and it has inconsistent behavior for concurrency == 1.

If you want to learn Kotlin Coroutines well with me, you can join my Kotlin Coroutines Workshop. There is an open edition of this workshop soon that you can join, or you can organize it in your company. You can also check out my book, Kotlin Coroutines: Deep Dive.