Generic Top

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

In this tutorial, we'll learn how to read and write from a hash table data structure in a thread-safe way using the ConcurrentHashMap class.

2. Overview

ConcurrentHashMap is one implementation of the ConcurrentMap interface, and it's one of the thread-safe collections that Java provides. It is backed by a regular map and works similarly to a Hashtable, with some nuances we'll cover in the following sections.

2.2. Useful Methods

The ConcurrentHashMap API specification provides practical methods to work with the collection. In this tutorial, we'll look primarily at two of them:

  • get(K key): Retrieves the element at a given key. This is our reading method.
  • computeIfPresent(K key, BiFunction<K, V, V> remappingFunction): Applies the remappingFunction to the value at a given key if the key exists.

We'll see those methods in practice in section 3.

2.2. Why Use ConcurrentHashMap

The main difference between ConcurrentHashMap and a regular HashMap is that the first implements total concurrency for reads and high concurrency for writes.

Read operations are guaranteed not to be blocked or block a key. Write operations are blocked and block other writes at the map Entry level. These two ideas are important in environments where we want to achieve high throughput and eventual consistency.

HashTable and Collections.synchronizedMap collections also implement concurrency for reads and writes. However, they are less efficient because they lock the entire collection instead of locking just the Entry at which the thread is writing.

On the other hand, the ConcurrentHashMap class locks at a map Entry level. Thus, other threads are not blocked from writing on other map keys. Therefore, to achieve high throughput, ConcurrentHashMap in multi-thread environments is a better option when compared to HashTable and synchronizedMap collections.

3. Thread-Safe Operations

ConcurrentHashMap implements most of the guarantees that a code needs to be considered thread-safe. That helps to avoid a few common concurrency pitfalls in Java.

To illustrate how the ConcurrentHashMap works in a multi-thread environment, we'll use a Java test that retrieves and updates the frequency of a given number. Let's first define the basic structure of the test:

public class ConcurrentHashMapUnitTest {

    private Map<Integer, Integer> frequencyMap;

    @BeforeEach
    public void setup() {
        frequencyMap = new ConcurrentHashMap<>();
        frequencyMap.put(0, 0);
        frequencyMap.put(1, 0);
        frequencyMap.put(2, 0);
    }

    @AfterEach
    public void teardown() {
        frequencyMap.clear();
    }

    private static void sleep(int timeout) {
        try {
            TimeUnit.SECONDS.sleep(timeout);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

The above class defines the frequency map of numbers, a setup method to populate it with initial values, a teardown method to clear its content, and a helper method sleep to handle InterruptedException.

3.1. Reading

The ConcurrentHashMap allows full concurrency for reads, meaning that any given number of threads can read the same key simultaneously. That also means that reads don't block and are not blocked by write operations. Thus, reading from the map could get “old” or inconsistent values.

Let's take a look at one example of a thread writing to a key, a second thread reading before the write is complete, and a third thread reading after the write is complete:

@Test
public void givenOneThreadIsWriting_whenAnotherThreadReads_thenGetCorrectValue() throws Exception {
    ExecutorService threadExecutor = Executors.newFixedThreadPool(3);

    Runnable writeAfter1Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
        sleep(1);
        return frequencyMap.get(k) + 1;
    });

    Callable<Integer> readNow = () -> frequencyMap.get(1);
    Callable<Integer> readAfter1001Ms = () -> {
        TimeUnit.MILLISECONDS.sleep(1001);
        return frequencyMap.get(1);
    };

    threadExecutor.submit(writeAfter1Sec);
    List<Future<Integer>> results = threadExecutor.invokeAll(asList(readNow, readAfter1001Ms));

    assertEquals(0, results.get(0).get());
    assertEquals(1, results.get(1).get());

    if (threadExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
        threadExecutor.shutdown();
    }
}

Let's take a closer look at what's happening in the above code:

  1. We first define an ExecutorService with one writer thread and two reader threads. The write operation takes one second to finish. Thus, any reads before that should get the old result. And any reads after that (precisely one millisecond after, in this example) should get the updated value.
  2. Then, we invoke all read threads using invokeAll and sequentially collect the results to a list. Thus, position zero of the list refers to the first read, and position one to the second read.
  3. Finally, we validate the results of completed tasks using assertEquals and shut down the ExecutorService.

From that code, we conclude that reads are not blocked, even though other threads simultaneously write on the same resource. If we imagine reads and writes as transactions, the ConcurrentHashMap implements eventual consistency for reads. That means we won't always read a consistent value (the most updated one), but once the map stops receiving writes, then reads become consistent again. Check out this introduction to transactions to get more details about eventual consistency.

Hint: If you want also to make reads block and be blocked by other reads, don't use the get() method. Instead, you can implement an identity BiFunction that returns the value unmodified on a given key, and pass that function to the computeIfPresent method. Using it, we'll sacrifice reading speed to prevent problems reading old or inconsistent values.

3.2. Writing

As mentioned earlier, ConcurrentHashMap implements partial concurrency for writes, which blocks other writes at the same map key and allows writes to different keys. This is essential to achieve high throughput and consistency in writing in multi-threaded environments. To illustrate consistency, let's define a test with two threads writing at the same resource and check how the map handles that:

@Test
public void givenOneThreadIsWriting_whenAnotherThreadWritesAtSameKey_thenWaitAndGetCorrectValue() throws Exception {
    ExecutorService threadExecutor = Executors.newFixedThreadPool(2);

    Callable<Integer> writeAfter5Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
        sleep(5);
        return frequencyMap.get(k) + 1;
    });

    Callable<Integer> writeAfter1Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
        sleep(1);
        return frequencyMap.get(k) + 1;
    });

    List<Future<Integer>> results = threadExecutor.invokeAll(asList(writeAfter5Sec, writeAfter1Sec));

    assertEquals(1, results.get(0).get());
    assertEquals(2, results.get(1).get());

    if (threadExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
        threadExecutor.shutdown();
    }
}

The test above shows two write threads being submitted to the ExecutorService. The thread that comes first takes five seconds to write, and the second one takes one second to write. The first thread acquires the lock and blocks any other write activity at the map key 1. Thus, the second thread must wait five seconds until the first thread releases the lock. After the first write is done, the second thread gets the most recent value and updates it in one second.

The list of results from the ExecutorService comes in order of task submission, so the first element should return 1, and the second should return 2.

Another use case for ConcurrentHashMap is to achieve high throughput for writes in different map keys. Let's illustrate that with another unit test that uses two write threads to update different keys in the map:

@Test
public void givenOneThreadIsWriting_whenAnotherThreadWritesAtDifferentKey_thenNotWaitAndGetCorrectValue() throws Exception {
    ExecutorService threadExecutor = Executors.newFixedThreadPool(2);

    Callable<Integer> writeAfter5Sec = () -> frequencyMap.computeIfPresent(1, (k, v) -> {
        sleep(5);
        return frequencyMap.get(k) + 1;
    });

    AtomicLong time = new AtomicLong(System.currentTimeMillis());
    Callable<Integer> writeAfter1Sec = () -> frequencyMap.computeIfPresent(2, (k, v) -> {
        sleep(1);
        time.set((System.currentTimeMillis() - time.get()) / 1000);
        return frequencyMap.get(k) + 1;
    });

    threadExecutor.invokeAll(asList(writeAfter5Sec, writeAfter1Sec));

    assertEquals(1, time.get());

    if (threadExecutor.awaitTermination(2, TimeUnit.SECONDS)) {
        threadExecutor.shutdown();
    }
}

The test validates that the second thread doesn't need to wait for the first thread's completion because the write is happening at a different map key. Thus, the second write should take only one second to complete. In ConcurrentHashMap, threads can work simultaneously in different map entries, and concurrent write operations are faster as compared to other thread-safe structures.

4. Conclusion

In this article, we've seen how to write to and read from a ConcurrentHashMap to achieve high throughput for writes and reads and eventual consistency in reading. As usual, the source code is available over on GitHub.

Generic bottom

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE
Generic footer banner
Comments are closed on this article!