Generic Top

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this article, we'll see how we can get a lock on a specific key to prevent concurrent actions on that key without impeding actions on other keys.

In general, we'll want to implement two methods and understand how to manipulate them:

  • void lock(String key)
  • void unlock(String key)

For the simplicity of the tutorial, we'll always suppose that our keys are Strings. You can replace them with the type of objects you need under the lone condition that equals and hashCode methods are correctly defined because we'll use them as HashMap keys.

2. A Simple Mutually Exclusive Lock

First, let's suppose we want to block any requested action if the corresponding key is already in use. Here, we'll rather define a boolean tryLock(String key) method instead of the lock method we had imagined.

Concretely, we aim to maintain a Set of keys that we'll fill with the keys in use at any moment. Thus, when a new action is requested on a key, we'll just have to refuse it if we find out that the key is already used by another thread.

The problem we face here is that there is no thread-safe implementation of Set. Hence, we'll use a Set backed by a ConcurrentHashMap. Using ConcurrentHashMap guarantees us data coherency in a multi-threaded environment.

Let's see this in action:

public class SimpleExclusiveLockByKey {

    private static Set<String> usedKeys= ConcurrentHashMap.newKeySet();
    
    public boolean tryLock(String key) {
        return usedKeys.add(key);
    }
    
    public void unlock(String key) {
        usedKeys.remove(key);
    }

}

Here's how we would use this class:

String key = "key";
SimpleExclusiveLockByKey lockByKey = new SimpleExclusiveLockByKey();
try {
    lockByKey.tryLock(key);
    // insert the code that needs to be executed only if the key lock is available
} finally { // CRUCIAL
    lockByKey.unlock(key);
}

Let's insist on the presence of the finally block: It is crucial to call the unlock method inside it. This way, even if our code throws an Exception within the try brackets, we'll unlock the key.

3. Acquire and Release Locks by Keys

Now, let's dig further into the problem and say we don't want to simply refuse simultaneous actions on the same keys, but we'd rather have new incoming actions wait until the current action on the key finishes.

The application flow will be:

  • the first thread asks for a lock on a key: it acquires the lock on the key
  • the second thread asks for a lock on the same key: thread 2 is told to wait
  • the first thread releases the lock on the key
  • the second thread acquires the lock on the key and can execute its action

3.1. Define a Lock With a Thread Counter

In this case, it sounds natural to use a Lock. In brief, a Lock is an object used for thread synchronization that allows blocking threads until it can be acquired. Lock is an interface – we'll use a ReentrantLock, the base implementation for it.

Let's start by wrapping our Lock in an inner class. This class will be able to track the number of threads currently waiting to lock the key. It will expose two methods, one to increment the thread counter and another one to decrement it:

private static class LockWrapper {
    private final Lock lock = new ReentrantLock();
    private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);

    private LockWrapper addThreadInQueue() {
        numberOfThreadsInQueue.incrementAndGet(); 
        return this;
    }

    private int removeThreadFromQueue() {
        return numberOfThreadsInQueue.decrementAndGet(); 
    }

}

3.2. Let the Lock Handle Queuing Threads

Furthermore, we'll continue to use a ConcurrentHashMap. But instead of simply extracting the keys of the Map like we were doing before, we'll use LockWrapper objects as values:

private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();

When a thread wants to acquire a lock on a key, we'll need to see if a LockWrapper is already present for this key:

  •  if not, we'll instantiate a new LockWrapper for the given key with a counter set at 1
  •  if so, we'll return the existing LockWrapper and increment its associated counter

Let's see how this is done:

public void lock(String key) {
    LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
    lockWrapper.lock.lock();
}

The code is very concise due to the use of HashMap‘s compute method. Let's give some details on the functioning of this method:

  • the compute method is applied to the object locks with key as its first argument: the initial value corresponding to key in locks is retrieved
  • the BiFunction given as the second argument of compute is applied to the key and the initial value: the result gives a new value
  • the new value replaces the initial value for key key in locks

3.3. Unlock and Optionally Remove Map Entry

Additionally, when a thread releases a lock, we'll decrement the number of threads associated with the LockWrapper. If the count is down to zero, then we'll remove the key from the ConcurrentHashMap:

public void unlock(String key) {
    LockWrapper lockWrapper = locks.get(key);
    lockWrapper.lock.unlock();
    if (lockWrapper.removeThreadFromQueue() == 0) { 
        // NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
        locks.remove(key, lockWrapper);
    }
}

3.4. Summary

In a nutshell, let's see what our whole class finally looks like:

public class LockByKey {
    
    private static class LockWrapper {
        private final Lock lock = new ReentrantLock();
        private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
        
        private LockWrapper addThreadInQueue() {
            numberOfThreadsInQueue.incrementAndGet(); 
            return this;
        }
        
        private int removeThreadFromQueue() {
            return numberOfThreadsInQueue.decrementAndGet(); 
        }
        
    }
    
    private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
    
    public void lock(String key) {
        LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
        lockWrapper.lock.lock();
    }
    
    public void unlock(String key) {
        LockWrapper lockWrapper = locks.get(key);
        lockWrapper.lock.unlock();
        if (lockWrapper.removeThreadFromQueue() == 0) { 
            // NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
            locks.remove(key, lockWrapper);
        }
    }
    
}

The usage is quite similar to what we had before:

String key = "key"; 
LockByKey lockByKey = new LockByKey(); 
try { 
    lockByKey.lock(key);
    // insert your code here 
} finally { // CRUCIAL 
    lockByKey.unlock(key); 
}

4. Allow Multiple Actions at the Same Time

Last but not least, let's consider another case: Instead of allowing only one thread to make an action for a given key at a time, we want to limit the number of threads allowed to act simultaneously on the same key to some integer n. To keep it simple, we'll set n=2.

Let's describe our use case extensively:

  • the first thread wants to acquire the lock on the key: it will be allowed to do so
  • a second thread wants to acquire the same lock: it will be also be allowed
  • a third thread requests a lock on the same key: it will have to queue until one of the first two threads releases its lock

Semaphores are made for this. A Semaphore is an object used to limit the number of threads simultaneously accessing a resource.

The global functioning and the code look very similar to what we had with locks:

public class SimultaneousEntriesLockByKey {

    private static final int ALLOWED_THREADS = 2;
    
    private static ConcurrentHashMap<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
    
    public void lock(String key) {
        Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(ALLOWED_THREADS) : v);
        semaphore.acquireUninterruptibly();
    }
    
    public void unlock(String key) {
        Semaphore semaphore = semaphores.get(key);
        semaphore.release();
        if (semaphore.availablePermits() == ALLOWED_THREADS) { 
            semaphores.remove(key, semaphore);
        }  
    }
    
}

The usage is identical:

String key = "key"; 
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey(); 
try { 
    lockByKey.lock(key); 
    // insert your code here 
} finally { // CRUCIAL 
    lockByKey.unlock(key); 
}

5. Conclusion

In this article, we've seen how we could put locks on keys to either totally impede concurrent actions or limit the number of concurrent actions to one (using locks) or more (using semaphores).

As always, the code is available over on GitHub.

Generic bottom

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE
Generic footer banner
Comments are closed on this article!