article banner (priority)

Using BlockHound to track blocking calls in non-blocking dispatchers

When we implement repositories in Kotlin Coroutines, there is a constant tension between safety and performance. On one hand, making a blocking call directly in a suspending function is a serious mistake (on Android it can cause ANR):

class DiscSaveRepository( private val discReader: DiscReader ) : SaveRepository { // Mistake: Blocking call in a suspending function override suspend fun loadSave(name: String): SaveData = discReader.read("save/$name") }

This should be fixed by using a dispatcher that is designed for blocking calls, like Dispatchers.IO:

class DiscSaveRepository( private val discReader: DiscReader ) : SaveRepository { // Fixed: Blocking call in a suspending function override suspend fun loadSave(name: String): SaveData = withContext(Dispatchers.IO) { discReader.read("save/$name") } }

On the other hand, changing dispatcher when not needed can be a performance hit, and suspending APIs should never block:

class NetworkOfferRepository( private val offerClient: OfferClient, ) : OfferRepository { // Mistake: Unnecessary dispatcher change override suspend fun fetchOffers(userId: String): List<Offer> = withContext(Dispatchers.IO) { offerClient.fetchOffers(userId) .map { it.toOffer() } } } // Retrofit definition interface OfferClient { @GET("offers") suspend fun fetchOffers( @Query("page") sellerId: String ): List<OfferJson> }

This constant tension between safety and performance is a real problem, but there is a simple solution: We can use BlockHound to track blocking calls in non-blocking dispatchers.

BlockHound is a library that can detect blocking calls on certain threads. In Kotlin Coroutines we use it together with kotlinx-coroutines-debug to detect blocking calls in non-blocking dispatchers. This is how we can set it up. First, we need dependencies:

// build.gradle.kts implementation("org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.5.2") implementation("io.projectreactor.tools:blockhound:1.0.6.RELEASE")

In JDK 13+ you need special configuration, see BlockHound documentation. In general, using BlockHound with never versions of JDK can be more challenging.

Then, we need to install BlockHound in our application:

BlockHound.install(CoroutinesBlockHoundIntegration())

You can use it in debug mode or in E2E tests. Once it is installed, it will throw an exception when a blocking call is detected. This is how it looks like:

import reactor.blockhound.BlockHound import kotlinx.coroutines.debug.CoroutinesBlockHoundIntegration import kotlinx.coroutines.* fun main() { BlockHound.install(CoroutinesBlockHoundIntegration()) runBlocking { launch(Dispatchers.Default) { Thread.sleep(1000) // Exception } } }

Results with an exception:

Exception in thread "main" reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Thread.sleep
	at java.base/java.lang.Thread.sleep(Thread.java)
	at MainKt$main$1$1.invokeSuspend(Main.kt:12)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:584)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:793)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:697)
	at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:684)

Using Dispatchers.IO would not throw an exception, as it is designed for blocking calls.

import reactor.blockhound.BlockHound import kotlinx.coroutines.debug.CoroutinesBlockHoundIntegration import kotlinx.coroutines.* fun main() { BlockHound.install(CoroutinesBlockHoundIntegration()) val d = Dispatchers.IO.limitedParallelism(10) // or just Dispatchers.IO runBlocking { launch(d) { Thread.sleep(1000) // OK } } }

This way we can ensure that our suspending functions are safe and performant, without unnecessary dispatcher changes.