article banner (priority)

SharedFlow and StateFlow

This is a chapter from the book Kotlin Coroutines. You can find it on LeanPub or Amazon.

Flow is typically cold, so its values are calculated on demand. However, there are cases in which we want multiple receivers to be subscribed to one source of changes. This is where we use SharedFlow, which is conceptually similar to a mailing list. We also have StateFlow, which is similar to an observable value. Let's explain them both step by step.

SharedFlow

Let's start with MutableSharedFlow, which is like a broadcast channel: everyone can send (emit) messages which will be received by every coroutine that is listening (collecting).

import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableSharedFlow suspend fun main(): Unit = coroutineScope { val mutableSharedFlow = MutableSharedFlow<String>(replay = 0) // or MutableSharedFlow<String>() launch { mutableSharedFlow.collect { println("#1 received $it") } } launch { mutableSharedFlow.collect { println("#2 received $it") } } delay(1000) mutableSharedFlow.emit("Message1") mutableSharedFlow.emit("Message2") } // (1 sec) // #1 received Message1 // #2 received Message1 // #1 received Message2 // #2 received Message2 // (program never ends)

The above program never ends because the coroutineScope is waiting for the coroutines that were started with launch and which keep listening on MutableSharedFlow. Apparently, MutableSharedFlow is not closable, so the only way to fix this problem is to cancel the whole scope.

MutableSharedFlow can also keep sending messages. If we set the replay parameter (it defaults to 0), the defined number of last values will be kept. If a coroutine now starts observing, it will receive these values first. This cache can also be reset with resetReplayCache.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableSharedFlow suspend fun main(): Unit = coroutineScope { val mutableSharedFlow = MutableSharedFlow<String>( replay = 2, ) mutableSharedFlow.emit("Message1") mutableSharedFlow.emit("Message2") mutableSharedFlow.emit("Message3") println(mutableSharedFlow.replayCache) // [Message2, Message3] launch { mutableSharedFlow.collect { println("#1 received $it") } // #1 received Message2 // #1 received Message3 } delay(100) mutableSharedFlow.resetReplayCache() println(mutableSharedFlow.replayCache) // [] }

MutableSharedFlow is conceptually similar to RxJava Subjects. When the replay parameter is set to 0, it is similar to a PublishSubject. When replay is 1, it is similar to a BehaviorSubject. When replay is Int.MAX_VALUE, it is similar to ReplaySubject.

In Kotlin, we like to have a distinction between interfaces that are used to only listen and those that are used to modify. For instance, we've already seen the distinction between SendChannel, ReceiveChannel and just Channel. The same rule applies here. MutableSharedFlow inherits from both SharedFlow and FlowCollector. The former inherits from Flow and is used to observe, while FlowCollector is used to emit values.

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> { fun tryEmit(value: T): Boolean val subscriptionCount: StateFlow<Int> fun resetReplayCache() } interface SharedFlow<out T> : Flow<T> { val replayCache: List<T> } interface FlowCollector<in T> { suspend fun emit(value: T) }

These interfaces are often used to expose only functions, to emit, or only to collect.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { val mutableSharedFlow = MutableSharedFlow<String>() val sharedFlow: SharedFlow<String> = mutableSharedFlow val collector: FlowCollector<String> = mutableSharedFlow launch { mutableSharedFlow.collect { println("#1 received $it") } } launch { sharedFlow.collect { println("#2 received $it") } } delay(1000) mutableSharedFlow.emit("Message1") collector.emit("Message2") } // (1 sec) // #1 received Message1 // #2 received Message1 // #1 received Message2 // #2 received Message2

Here is an example of typical usage on Android:

class UserProfileViewModel { private val _userChanges = MutableSharedFlow<UserChange>() val userChanges: SharedFlow<UserChange> = _userChanges fun onCreate() { viewModelScope.launch { userChanges.collect(::applyUserChange) } } fun onNameChanged(newName: String) { // ... _userChanges.emit(NameChange(newName)) } fun onPublicKeyChanged(newPublicKey: String) { // ... _userChanges.emit(PublicKeyChange(newPublicKey)) } }

shareIn

Flow is often used to observe changes, like user actions, database modifications, or new messages. We already know the different ways in which these events can be processed and handled. We've learned how to merge multiple flows into one. But what if multiple classes are interested in these changes and we would like to turn one flow into multiple flows? The solution is SharedFlow, and the easiest way to turn a Flow into a SharedFlow is by using the shareIn function.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { val flow = flowOf("A", "B", "C") .onEach { delay(1000) } val sharedFlow: SharedFlow<String> = flow.shareIn( scope = this, started = SharingStarted.Eagerly, // replay = 0 (default) ) delay(500) launch { sharedFlow.collect { println("#1 $it") } } delay(1000) launch { sharedFlow.collect { println("#2 $it") } } delay(1000) launch { sharedFlow.collect { println("#3 $it") } } } // (1 sec) // #1 A // (1 sec) // #1 B // #2 B // (1 sec) // #1 C // #2 C // #3 C

The shareIn function creates a SharedFlow and sends elements from its Flow. Since we need to start a coroutine to collect elements on flow, shareIn expects a coroutine scope as the first argument. The third argument is replay, which is 0 by default. The second argument is interesting: started determines when listening for values should start, depending on the number of listeners. The following options are supported:

  • SharingStarted.Eagerly - immediately starts listening for values and sending them to a flow. Notice that if you have a limited replay value and your values appear before you start subscribing, you might lose some values (if your replay is 0, you will lose all such values).
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { val flow = flowOf("A", "B", "C") val sharedFlow: SharedFlow<String> = flow.shareIn( scope = this, started = SharingStarted.Eagerly, ) delay(100) launch { sharedFlow.collect { println("#1 $it") } } print("Done") } // (0.1 sec) // Done
  • SharingStarted.Lazily - starts listening when the first subscriber appears. This guarantees that this first subscriber gets all the emitted values, while subsequent subscribers are only guaranteed to get the most recent replay values. The upstream flow continues to be active even when all subscribers disappear, but only the most recent replay values are cached without subscribers.
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.* import kotlinx.coroutines.launch //sampleStart suspend fun main(): Unit = coroutineScope { val flow1 = flowOf("A", "B", "C") val flow2 = flowOf("D") .onEach { delay(1000) } val sharedFlow = merge(flow1, flow2).shareIn( scope = this, started = SharingStarted.Lazily, ) delay(100) launch { sharedFlow.collect { println("#1 $it") } } delay(1000) launch { sharedFlow.collect { println("#2 $it") } } } // (0.1 sec) // #1 A // #1 B // #1 C // (1 sec) // #2 D // #1 D //sampleEnd
  • WhileSubscribed() - starts listening on the flow when the first subscriber appears; it stops when the last subscriber disappears. If a new subscriber appears when our SharedFlow is stopped, it will start again. WhileSubscribed has additional optional configuration parameters: stopTimeoutMillis (how long to listen after the last subscriber disappears, 0 by default) and replayExpirationMillis (how long to keep replay after stopping, Long.MAX_VALUE by default).
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { val flow = flowOf("A", "B", "C", "D") .onStart { println("Started") } .onCompletion { println("Finished") } .onEach { delay(1000) } val sharedFlow = flow.shareIn( scope = this, started = SharingStarted.WhileSubscribed(), ) delay(3000) launch { println("#1 ${sharedFlow.first()}") } launch { println("#2 ${sharedFlow.take(2).toList()}") } delay(3000) launch { println("#3 ${sharedFlow.first()}") } } // (3 sec) // Started // (1 sec) // #1 A // (1 sec) // #2 [A, B] // Finished // (1 sec) // Started // (1 sec) // #3 A // Finished
  • It is also possible to define a custom strategy by implementing the SharingStarted interface.

Using shareIn is very convenient when multiple services are interested in the same changes. Let's say that you need to observe how stored locations change over time. This is how a DTO (Data Transfer Object) could be implemented on Android using the Room library:

@Dao interface LocationDao { @Insert(onConflict = OnConflictStrategy.IGNORE) suspend fun insertLocation(location: Location) @Query("DELETE FROM location_table") suspend fun deleteLocations() @Query("SELECT * FROM location_table ORDER BY time") fun observeLocations(): Flow<List<Location>> }

The problem is that if multiple services need to depend on these locations, then it would not be optimal for each of them to observe the database separately. Instead, we could make a service that listens to these changes and shares them into SharedFlow. This is where we will use shareIn. But how should we configure it? You need to decide for yourself. Do you want your subscribers to immediately receive the last list of locations? If so, set replay to 1. If you only want to react to change, set it to 0. How about started? WhileSubscribed() sounds best for this use case.

class LocationService( locationDao: LocationDao, scope: CoroutineScope ) { private val locations = locationDao.observeLocations() .shareIn( scope = scope, started = SharingStarted.WhileSubscribed(), ) fun observeLocations(): Flow<List<Location>> = locations }

Beware! Do not create a new SharedFlow for each call. Create one, and store it in a property.

StateFlow

StateFlow is an extension of the SharedFlow concept. It works similarly to SharedFlow when the replay parameter is set to 1. It always stores one value, which can be accessed using the value property.

interface StateFlow<out T> : SharedFlow<T> { val value: T } interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> { override var value: T fun compareAndSet(expect: T, update: T): Boolean }

Please note how the value property is overridden inside MutableStateFlow. In Kotlin, an open val property can be overridden with a var property. val only allows getting a value (getter), while var also supports setting a new value (setter).

The initial value needs to be passed to the constructor. We both access and set the value using the value property. As you can see, MutableStateFlow is like an observable holder for a value.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { val state = MutableStateFlow("A") println(state.value) // A launch { state.collect { println("Value changed to $it") } // Value changed to A } delay(1000) state.value = "B" // Value changed to B delay(1000) launch { state.collect { println("and now it is $it") } // and now it is B } delay(1000) state.value = "C" // Value changed to C and now it is C }

On Android, StateFlow is used as a modern alternative to LiveData. First, it has full support for coroutines. Second, it has an initial value, so it does not need to be nullable. So, StateFlow is often used on ViewModels to represent its state. This state is observed, and a view is displayed and updated on this basis.

class LatestNewsViewModel( private val newsRepository: NewsRepository ) : ViewModel() { private val _uiState = MutableStateFlow<NewsState>(LoadingNews) val uiState: StateFlow<NewsState> = _uiState fun onCreate() { scope.launch { _uiState.value = NewsLoaded(newsRepository.getNews()) } } }

State flow emits new values only when the value changes.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { val state = MutableStateFlow("A") state.onEach { println("Updated to $it") } .stateIn(this) // Updated to A state.value = "B" // Updated to B state.value = "B" // (nothing printed) state.emit("B") // (nothing printed) }

StateFlow is also conflated, so slower observers might not receive some intermediate state changes. To receive all events, use SharedFlow.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.test.* suspend fun main(): Unit = coroutineScope { val state = MutableStateFlow('X') launch { for (c in 'A'..'E') { delay(300) state.value = c // or state.emit(c) } } state.collect { delay(1000) println(it) } } // X // C // E

This behavior is by design. StateFlow represents the current state, and we might assume that nobody is interested in repeating or outdated state.

stateIn

stateIn is a function that transforms Flow<T> into StateFlow<T>. It can only be called with a scope, but it is a suspending function. Remember that StateFlow needs to always have a value; so, if you don't specify it, then you need to wait until the first value is calculated.

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { val flow = flowOf("A", "B", "C") .onEach { delay(1000) } .onEach { println("Produced $it") } val stateFlow: StateFlow<String> = flow.stateIn(this) println("Listening") println(stateFlow.value) stateFlow.collect { println("Received $it") } } // (1 sec) // Produced A // Listening // A // Received A // (1 sec) // Produced B // Received B // (1 sec) // Produced C // Received C

The second variant of stateIn is not suspending but it requires an initial value and a started mode. This mode has the same options as shareIn (as previously explained).

import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun main(): Unit = coroutineScope { val flow = flowOf("A", "B") .onEach { delay(1000) } .onEach { println("Produced $it") } val stateFlow: StateFlow<String> = flow.stateIn( scope = this, started = SharingStarted.Lazily, initialValue = "Empty" ) println(stateFlow.value) delay(2000) stateFlow.collect { println("Received $it") } } // Empty // (2 sec) // Received Empty // (1 sec) // Produced A // Received A // (1 sec) // Produced B // Received B

We typically use stateIn when we want to observe a value from one source of changes. On the way, these changes can be processed, and in the end they can be observed by our views.

class LocationsViewModel( locationService: LocationService ) : ViewModel() { private val location = locationService.observeLocations() .map { it.toLocationsDisplay() } .stateIn( scope = viewModelScope, started = SharingStarted.Lazily, initialValue = LocationsDisplay.Loading, ) // ... }

Summary

In this chapter, we've learned about SharedFlow and StateFlow, both of which are especially important for Android developers as they are commonly used as a part of the MVVM pattern. Remember them and consider using them, especially if you use view models in Android development.