Monitor and troubleshoot Java applications and services with Datadog: 

>> Try it free!

1. Overview

In this article, we’ll look at conflict-free replicated data types (CRDT) and how to work with them in Java. For our examples, we’ll use implementations from the wurmloch-crdt library.

When we have a cluster of N replica nodes in a distributed system, we may encounter a network partition — some nodes are temporarily unable to communicate with each other. This situation is called a split-brain.

When we have a split-brain in our system, some write requests — even for the same user — can go to different replicas that are not connected with each other. When such a situation occurs, our system is still available but is not consistent.

We need to decide what to do with writes and data that are not consistent when the network between two split clusters starts working again.

2. Conflict-Free Replicated Data Types to the Rescue

Let’s consider two nodes, A and B, that have become disconnected due to a split-brain.

Let’s say that a user changes his login and that a request goes to the node A. Then he/she decides to change it again, but this time the request goes to the node B.

Because of the split-brain, the two nodes are not connected. We need to decide how the login of this user should look when the network is working again.

We can utilize a couple of strategies: we can give the opportunity for resolving conflicts to the user (as is done in Google Docs), or we can use a CRDT for merging data from diverged replicas for us.

3. Maven Dependency

First, let’s add a dependency to the library that provides a set of useful CRDTs:

<dependency>
    <groupId>com.netopyr.wurmloch</groupId>
    <artifactId>wurmloch-crdt</artifactId>
    <version>0.1.0</version>
</dependency>

The latest version can be found on Maven Central.

4. Grow-Only Set

The most basic CRDT is a Grow-Only Set. Elements can only be added to a GSet and never removed. When the GSet diverges, it can be easily merged by calculating the union of two sets.

First, let’s create two replicas to simulate a distributed data structure and connect those two replicas using the connect() method:

LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);

Once we get two replicas in our cluster, we can create a GSet on the first replica and reference it on the second replica:

GSet<String> replica1 = crdtStore1.createGSet("ID_1");
GSet<String> replica2 = crdtStore2.<String>findGSet("ID_1").get();

At this point, our cluster is working as expected, and there is an active connection between two replicas. We can add two elements to the set from two different replicas and assert that the set contains the same elements on both replicas:

replica1.add("apple");
replica2.add("banana");

assertThat(replica1).contains("apple", "banana");
assertThat(replica2).contains("apple", "banana");

Let’s say that suddenly we have a network partition and there is no connection between the first and second replicas. We can simulate the network partition using the disconnect() method:

crdtStore1.disconnect(crdtStore2);

Next, when we add elements to the data set from both replicas, those changes are not visible globally because there is no connection between them:

replica1.add("strawberry");
replica2.add("pear");

assertThat(replica1).contains("apple", "banana", "strawberry");
assertThat(replica2).contains("apple", "banana", "pear");

Once the connection between both cluster members is established again, the GSet is merged internally using a union on both sets, and both replicas are consistent again:

crdtStore1.connect(crdtStore2);

assertThat(replica1)
  .contains("apple", "banana", "strawberry", "pear");
assertThat(replica2)
  .contains("apple", "banana", "strawberry", "pear");

5. Increment-Only Counter

Increment-Only counter is a CRDT that aggregates all increments locally on each node.

When replicas synchronize, after a network partition, the resulting value is calculated by summing all increments on all nodes — this is similar to LongAdder from java.concurrent but on a higher abstraction level.

Let’s create an increment-only counter using GCounter and increment it from both replicas. We can see that the sum is calculated properly:

LocalCrdtStore crdtStore1 = new LocalCrdtStore();
LocalCrdtStore crdtStore2 = new LocalCrdtStore();
crdtStore1.connect(crdtStore2);

GCounter replica1 = crdtStore1.createGCounter("ID_1");
GCounter replica2 = crdtStore2.findGCounter("ID_1").get();

replica1.increment();
replica2.increment(2L);

assertThat(replica1.get()).isEqualTo(3L);
assertThat(replica2.get()).isEqualTo(3L);

When we disconnect both cluster members and perform local increment operations, we can see that the values are inconsistent:

crdtStore1.disconnect(crdtStore2);

replica1.increment(3L);
replica2.increment(5L);

assertThat(replica1.get()).isEqualTo(6L);
assertThat(replica2.get()).isEqualTo(8L);

But once the cluster is healthy again, the increments will be merged, yielding the proper value:

crdtStore1.connect(crdtStore2);

assertThat(replica1.get())
  .isEqualTo(11L);
assertThat(replica2.get())
  .isEqualTo(11L);

6. PN Counter

Using a similar rule for the increment-only counter, we can create a counter that can be both incremented and decremented. The PNCounter stores all increments and decrements separately.

When replicas synchronize, the resulting value will be equal to the sum of all increments minus the sum of all decrements:

@Test
public void givenPNCounter_whenReplicasDiverge_thenMergesWithoutConflict() {
    LocalCrdtStore crdtStore1 = new LocalCrdtStore();
    LocalCrdtStore crdtStore2 = new LocalCrdtStore();
    crdtStore1.connect(crdtStore2);

    PNCounter replica1 = crdtStore1.createPNCounter("ID_1");
    PNCounter replica2 = crdtStore2.findPNCounter("ID_1").get();

    replica1.increment();
    replica2.decrement(2L);

    assertThat(replica1.get()).isEqualTo(-1L);
    assertThat(replica2.get()).isEqualTo(-1L);

    crdtStore1.disconnect(crdtStore2);

    replica1.decrement(3L);
    replica2.increment(5L);

    assertThat(replica1.get()).isEqualTo(-4L);
    assertThat(replica2.get()).isEqualTo(4L);

    crdtStore1.connect(crdtStore2);

    assertThat(replica1.get()).isEqualTo(1L);
    assertThat(replica2.get()).isEqualTo(1L);
}

7. Last-Writer-Wins Register

Sometimes, we have more complex business rules, and operating on sets or counters is insufficient. We can use the Last-Writer-Wins Register, which keeps only the last updated value when merging diverged data sets. Cassandra uses this strategy to resolve conflicts.

We need to be very cautious when using this strategy because it drops changes that occurred in the meantime.

Let’s create a cluster of two replicas and instances of the LWWRegister class:

LocalCrdtStore crdtStore1 = new LocalCrdtStore("N_1");
LocalCrdtStore crdtStore2 = new LocalCrdtStore("N_2");
crdtStore1.connect(crdtStore2);

LWWRegister<String> replica1 = crdtStore1.createLWWRegister("ID_1");
LWWRegister<String> replica2 = crdtStore2.<String>findLWWRegister("ID_1").get();

replica1.set("apple");
replica2.set("banana");

assertThat(replica1.get()).isEqualTo("banana");
assertThat(replica2.get()).isEqualTo("banana");

When the first replica sets the value to apple and the second one changes it to banana, the LWWRegister keeps only the last value.

Let’s see what happens if the cluster disconnects:

crdtStore1.disconnect(crdtStore2);

replica1.set("strawberry");
replica2.set("pear");

assertThat(replica1.get()).isEqualTo("strawberry");
assertThat(replica2.get()).isEqualTo("pear");

Each replica keeps its local copy of data that is inconsistent. When we call the set() method, the LWWRegister internally assigns a special version value that identifies the specific update to every using a VectorClock algorithm.

When the cluster synchronizes, it takes the value with the highest version and discards every previous update:

crdtStore1.connect(crdtStore2);

assertThat(replica1.get()).isEqualTo("pear");
assertThat(replica2.get()).isEqualTo("pear");

8. Conclusion

In this article, we showed the problem of consistency of distributed systems while maintaining availability.

In case of network partitions, we need to merge the diverged data when the cluster is synchronized. We saw how to use CRDTs to perform a merge of diverged data.

All these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

Monitor and troubleshoot Java applications and services with Datadog: 

>> Try it free!