article banner (priority)

Effective Kotlin Item 2: Eliminate critical sections

This is a chapter from the book Effective Kotlin. You can find it on LeanPub or Amazon.

When multiple threads modify a shared state, it can lead to unexpected results. This problem was already discussed in the previous item, but now I want to explain it in more detail and show how to deal with it in Kotlin/JVM.

The problem with threads and shared state

While I’m writing these words, many things are happening concurrently on my computer. Music is playing, IntelliJ displays the text of this chapter, Slack is displaying messages, and my browser is downloading data. All this is possible because operating systems introduced the concept of threads. The operating system schedules the execution of threads, each of which is a separate flow. Even if I had a single-core CPU, the operating system would still be able to run multiple threads concurrently by running one thread for a short period of time, then switching to another thread, and so on. This is called time slicing. What is more, in modern computers we have multiple cores, so operating systems can actually run many operations on different threads at the same time.

The biggest problem with this process is that we cannot be sure when the operating system will switch from executing one thread to executing another. Consider the following example. We start 1000 threads, each of which increments a mutable variable; the problem is that incrementing a value has multiple steps: getting the current value, creating the new incremented value, and assigning it to the variable. If the operating system switches threads between these steps, we might lose some increments. This is why the code below is unlikely to print 1000. I just tested it, and it printed 981.

var num = 0 for (i in 1..1000) { thread { Thread.sleep(10) num += 1 } } Thread.sleep(5000) print(num) // Very unlikely to be 1000 // Every time a different number

To better understand this problem, just consider the following situation that might occur if we had started two threads. One thread gets value 0, then the CPU switches execution to the other thread, which gets the same value, increments it, and sets the variable to 1. The operating system switches to the previous thread, which then sets the variable to 1 again. In this case, we've lost one incrementation.

Losing some operations can be a serious problem in real-life applications, but this problem can have much more serious consequences. When we don’t know the order in which operations will be executed, we risk our objects having incorrect states. This often leads to bugs that are hard to reproduce and fix, as is well visualized by adding an element to a list while another thread iterates over its elements. The default collections do not support their elements being modified when they are iterated over, so we get a ConcurrentModificationException exception.

var numbers = mutableListOf<Int>() for (i in 1..1000) { thread { Thread.sleep(1) numbers.add(i) } thread { Thread.sleep(1) print(numbers.sum()) // sum iterates over the list // often ConcurrentModificationException } }

We encounter the same problem when we start multiple coroutines on a dispatcher that uses multiple threads. To deal with this problem when using coroutines, we can use the same techniques as for threads. However, coroutines also have dedicated tools, as I described in detail in the book Kotlin Coroutines: Deep Dive.

As I explained in the previous chapter, we don’t encounter all these problems if we don’t use mutability. However, in real-life applications we often cannot avoid mutability, so we need to learn how to deal with shared state2. Whenever you have a shared state that might be modified by multiple threads, you need to ensure that all the operations on this state are executed correctly. Each platform offers different tools for this, so let's learn about the most important tools for Kotlin/JVM3.

Synchronization in Kotlin/JVM

The most important tool for dealing with shared state in the Kotlin/JVM platform is synchronization. This is a mechanism that allows us to ensure that only one thread can execute a given block of code at the same time. It is based on the synchronized function, which requires a lock object and a lambda expression with the code that should be synchronized. This mechanism guarantees that only one thread can enter a synchronization block with the same lock at the same time. If a thread reaches a synchronization block but a different thread is already executing a synchronization block with the same lock, this thread will wait until the other thread finishes its execution. The following example shows how to use synchronization to ensure that the num variable is incremented correctly.

val lock = Any() var num = 0 for (i in 1..1000) { thread { Thread.sleep(10) synchronized(lock) { num += 1 } } } Thread.sleep(1000) print(num) // 1000

In real-life cases, we often wrap all the functions in a class that need to be synchronized with a synchronization block. The example below shows how to synchronize all the operations in the Counter class.

class Counter { private val lock = Any() private var num = 0 fun inc() = synchronized(lock) { num += 1 } fun dec() = synchronized(lock) { num -= 1 } // Synchronization is not necessary; however, // without it, getter might serve stale value fun get(): Int = num }

In some classes, we have multiple locks for different parts of a state, but this is much harder to implement correctly, so it’s much less common.

When we use Kotlin Coroutines, instead of using synchronized, we rather use a dispatcher limited to a single thread or Mutex, as I described that in the book Kotlin Coroutines: Deep Dive. Remember that thread-switching is not free, and in some classes it is more efficient to use a single thread instead of using multiple threads and synchronizing their execution.

Atomic objects

We started our discussion with the problem of incrementing a variable, which can produce incorrect results because regular integer incrementation has multiple steps, but the operating system can switch between threads in the middle of these. Some operations, such as a simple value assignment, are only a single processor step, so they are always executed correctly, but only very simple operations are atomic by nature. However, Java provides a set of atomic classes that represent popular Java classes but with atomic operations. You can find AtomicInteger, AtomicLong, AtomicBoolean, AtomicReference, and many more. Each of these offers methods that are guaranteed to be executed atomically. For example, AtomicInteger offers an incrementAndGet method that increments a value and returns the new value. The example below shows how to use AtomicInteger to increment a variable correctly.

val num = AtomicInteger(0) for (i in 1..1000) { thread { Thread.sleep(10) num.incrementAndGet() } } Thread.sleep(5000) print(num.get()) // 1000

Atomic objects are fast and can help us with simple cases where a state is a simple value or a couple of independent values, but these are not enough for more complex cases. For example, we cannot use atomic objects to synchronize multiple operations on multiple objects. For that, we need to use a synchronization block.

Concurrent collections

Java also provides some collections that have support for concurrency. The most important one is ConcurrentHashMap, which is a thread-safe version of HashMap. We can safely use all its operations without worrying about conflicts. When we iterate over it, we get a snapshot of the state at the moment of iteration, therefore we’ll never get a ConcurrentModificationException exception, but this doesn’t mean that we’ll get the most recent state.

val map = ConcurrentHashMap<Int, String>() for (i in 1..1000) { thread { Thread.sleep(1) map.put(i, "E$i") } thread { Thread.sleep(1) print(map.toList().sumOf { it.first }) } }

When we need a concurrent set, a popular choice is to use newKeySet from ConcurrentHashMap, which is a wrapper over ConcurrentHashMap that uses Unit as a value. It implements the MutableSet interface, so we can use it like a regular set.

val set = ConcurrentHashMap.newKeySet<Int>() for (i in 1..1000) { thread { Thread.sleep(1) set += i } } Thread.sleep(5000) println(set.size)

Instead of lists, I typically use ConcurrentLinkedQueue when I need a concurrent collection that allows duplicates. These are the essential tools that we can use on JVM to deal with the problem of mutable states.

Of course, there are also libraries that offer other tools that support code synchronization. There are even Kotlin multiplatform libraries, such as AtomicFU, which provides multiplatform atomic objects1.

// Using AtomicFU val num = atomic(0) for (i in 1..1000) { thread { Thread.sleep(10) num.incrementAndGet() } } Thread.sleep(5000) print(num.value) // 1000

Let's change our perspective back to the more general problem with mutable states and explain how to deal with it in typical situations.

Do not leak mutation points

Exposing a mutable object that is used to represent a public state, like in the following examples, is an especially dangerous situation. Take a look at this example:

data class User(val name: String) class UserRepository { private val users: MutableList<User> = mutableListOf() fun loadAll(): MutableList<User> = users //... }

One could use loadAll to modify the UserRepository private state:

val userRepository = UserRepository() val users = userRepository.loadAll() users.add(User("Kirill")) //... print(userRepository.loadAll()) // [User(name=Kirill)]

This situation is especially dangerous when such modifications are accidental. The first thing we should do is upcast the mutable objects to read-only types; in this case it means upcasting from MutableList to List.

data class User(val name: String) class UserRepository { private val users: MutableList<User> = mutableListOf() fun loadAll(): List<User> = users //... }

But beware, because the implementation above is not enough to make this class safe. First, we receive what looks like a read-only list, but it is actually a reference to a mutable list, so its values might change. This might cause developers to make serious mistakes:

data class User(val name: String) class UserRepository { private val users: MutableList<User> = mutableListOf() fun loadAll(): List<User> = users fun add(user: User) { users += user } } class UserRepositoryTest { fun `should add elements`() { val repo = UserRepository() val oldElements = repo.loadAll() repo.add(User("B")) val newElements = repo.loadAll() assert(oldElements != newElements) // This assertion will fail, because both references // point to the same object, and they are equal } }

Second, consider a situation in which one thread reads the list returned using loadAll, but another thread modifies it at the same time. It is illegal to modify a mutable collection that another thread is iterating over. Such an operation leads to an unexpected exception.

val repo = UserRepository() thread { for (i in 1..10000) repo.add(User("User$i")) } thread { for (i in 1..10000) { val list = repo.loadAll() for (e in list) { /* no-op */ } } } // ConcurrentModificationException

There are two ways of dealing with this. The first is to return a copy of an object instead of a real reference. We call this technique defensive copying. Note that when we copy, we might have a conflict if another thread is adding a new element to the list while we are copying it; so, if we want to support multithreaded access to our object, this operation needs to be synchronized. Collections can be copied with transformation functions like toList, while data classes can be copied with the copy method.

class UserRepository { private val users: MutableList<User> = mutableListOf() private val lock = Any() fun loadAll(): List<User> = synchronized(lock) { users.toList() } fun add(user: User) = synchronized(lock) { users += user } }

A simpler option is to use a read-only list as this is easier to secure and gives us more ways of tracking changes in objects.

class UserRepository { private var users: List<User> = listOf() fun loadAll(): List<User> = users fun add(user: User) { users = users + user } }

When we use this option, and we want to introduce proper support for multithreaded access, we only need to synchronize the operations that modify our list. This makes adding elements slower, but accessing the list is faster. This is a good trade-off when we have more reads than writes.

class UserRepository { private var users: List<User> = listOf() private val lock = Any() fun loadAll(): List<User> = users fun add(user: User) = synchronized(lock) { users = users + user } }

Summary

  • Multiple threads modifying the same state can lead to conflicts, thus causing lost data, exceptions, and other unexpected behavior.
  • We can use synchronization to protect a state from concurrent modifications. The most popular tool in Kotlin/JVM is a synchronized block with a lock.
  • To deal with concurrent modifications, Java also provides classes to represent atomic values and concurrent collections.
  • There are also libraries that provide multiplatform atomic objects, such as AtomicFU.
  • Classes should protect their internal state and not expose it to the outside world. We can operate on read-only objects or use defensive copying to protect a state from concurrent modifications.
1:

At the time of writing these words, AtomicFU is still in beta version, but it is already well-developed and seems rather stable.

2:

By shared state, I mean a state used by multiple threads.

3:

In Kotlin/JS, we don’t need to worry about synchronization because JavaScript execution is single-threaded: if you start a process on a different thread (for instance, using workers), it operates in a different memory space.