Java Top

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE

1. Overview

In this article, we'll present BookKeeper, a service that implements a distributed, fault-tolerant record storage system.

2. What Is BookKeeper?

BookKeeper was originally developed by Yahoo as a ZooKeeper subproject and graduated to become a top-level project in 2015. At its core, BookKeeper aims to be a reliable and high-performance system that stores sequences of Log Entries (aka Records) in data structures called Ledgers.

An important feature of ledgers is the fact that they're append-only and immutable. This makes BookKeeper a good candidate for certain applications, such as distributed logging systems, Pub-Sub messaging applications, and real-time stream processing.

3. BookKeeper Concepts

3.1. Log Entries

A log entry contains an indivisible unit of data that a client application stores to or reads from BookKeeper. When stored in a ledger, each entry contains the supplied data and a few metadata fields.

Those metadata fields include an entryId, which must be unique within a given ledger. There's also an authentication code that BookKeeper uses to detect when an entry is corrupt or has been tampered with.

BookKeeper offers no serialization features by itself, so clients must devise their own method to convert higher-level constructs to/from byte arrays.

3.2. Ledgers

A ledger is the basic storage unit managed by BookKeeper, storing an ordered sequence of log entries. As mentioned before, ledgers have append-only semantics, meaning that records can't be modified once added to them.

Also, once a client stops writing to a ledger and closes it, BookKeeper seals it and we can no longer add data to it, even at a later time. This is an important point to keep in mind when designing an application around BookKeeper. Ledgers are not a good candidate to directly implement higher-level constructs, such as a queue. Instead, we see ledgers used more often to create more basic data structures that support those higher-level concepts.

For instance, Apache's Distributed Log project uses ledgers as log segments. Those segments are aggregated into distributed logs, but the underlying ledgers are transparent to regular users.

BookKeeper achieves ledger resilience by replicating log entries across multiple server instances. Three parameters control how many servers and copies are kept:

  • Ensemble size: the number of servers used to write ledger data
  • Write quorum size: the number of servers used to replicate a given log entry
  • Ack quorum size: the number of servers that must acknowledge a given log entry write operation

By adjusting those parameters, we can tune the performance and resilience characteristics of a given ledger. When writing to a ledger, BookKeeper will only consider the operation as successful when a minimum quorum of cluster members acknowledge it.

In addition to its internal metadata, BookKeeper also supports adding custom metadata to a ledger. Those are a map of key/value pairs that clients pass at creation time and BookKeeper stores in ZooKeeper alongside its own.

3.3. Bookies

Bookies are servers that hold one or mode ledgers. A BookKeeper cluster consists of a number of bookies running in a given environment, providing services to clients over plain TCP or TLS connections.

Bookies coordinate actions using cluster services provided by ZooKeeper. This implies that, if we want to achieve a fully fault-tolerant system, we need at least a 3-instance ZooKeeper and a 3-instance BookKeeper setup. Such a setup would be able to tolerate loss if any single instance fails and still be able to operate normally, at least for the default ledger setup: 3-node ensemble size, 2-node write quorum, and 2-node ack quorum.

4. Local Setup

The basic requirements to run BookKeeper locally are quite modest. First, we need a ZooKeeper instance up and running, which provides ledger metadata storage for BookKeeper. Next, we deploy a bookie, which provides the actual services to clients.

While it's certainly possible to do those steps manually, here we'll use a docker-compose file that uses official Apache images to simplify this task:

$ cd <path to docker-compose.yml>
$ docker-compose up

This docker-compose creates three bookies and a ZooKeeper instance. Since all bookies run on the same machine, it's only useful for testing purposes. The official documentation contains the necessary steps to configure a fully fault-tolerant cluster.

Let's do a basic test to check that it's working as expected, using bookkeeper's shell command listbookies:

$ docker exec -it apache-bookkeeper_bookie_1 /opt/bookkeeper/bin/bookkeeper \
  shell listbookies -readwrite
ReadWrite Bookies :
192.168.99.101(192.168.99.101):4181
192.168.99.101(192.168.99.101):4182
192.168.99.101(192.168.99.101):3181

The output shows the list of available bookies, consisting of three bookies. Please note that the IP addresses shown will change depending of the specifics of the local Docker installation.

5. Using the Ledger API

The Ledger API is the most basic way to interface with BookKeeper. It allows us to interact directly with Ledger objects but, on the other hand, lacks direct support for higher-level abstractions such as streams. For those use cases, the BookKeeper project offers another library, DistributedLog, which supports those features.

Using the Ledger API requires adding the bookkeeper-server dependency to our project:

<dependency>
    <groupId>org.apache.bookkeeper</groupId>
    <artifactId>bookkeeper-server</artifactId>
    <version>4.10.0</version>
</dependency>

NOTE: As stated in the documentation, using this dependency will also include dependencies for the protobuf and guava libraries. Should our project also need those libraries, but at a different version than those used by BookKeeper, we could use an alternative dependency that shades those libraries:

<dependency>
    <groupId>org.apache.bookkeeper</groupId>
    <artifactId>bookkeeper-server-shaded</artifactId>
    <version>4.10.0</version>
</dependency>

5.1. Connecting to Bookies

The BookKeeper class is the main entry point of the Ledger API, providing a few methods to connect to our BookKeeper service. In its simplest form, all we need to do is create a new instance of this class, passing the address of one of the ZooKeeper servers used by BookKeeper:

BookKeeper client = new BookKeeper("zookeeper-host:2131");

Here, zookeeper-host should be set to the IP address or hostname of the ZooKeeper server that holds BookKeeper's cluster configuration. In our case, that's usually “localhost” or the host that the DOCKER_HOST environment variable points to.

If we need more control over the several parameters available to fine-tune our client, we can use a ClientConfiguration instance and use it to create our client:

ClientConfiguration cfg = new ClientConfiguration();
cfg.setMetadataServiceUri("zk+null://zookeeper-host:2131");

// ... set other properties
 
BookKeeper.forConfig(cfg).build();

5.2. Creating a Ledger

Once we have a BookKeeper instance, creating a new ledger is straightforward:

LedgerHandle lh = bk.createLedger(BookKeeper.DigestType.MAC,"password".getBytes());

Here, we've used the simplest variant of this method. It will create a new ledger with default settings, using the MAC digest type to ensure entry integrity.

If we want to add custom metadata to our ledger, we need to use a variant that takes all parameters:

LedgerHandle lh = bk.createLedger(
  3,
  2,
  2,
  DigestType.MAC,
  "password".getBytes(),
  Collections.singletonMap("name", "my-ledger".getBytes()));

This time, we've used the full version of the createLedger() method. The three first arguments are the ensemble size, write quorum, and ack quorum values, respectively. Next, we have the same digest parameters as before. Finally, we pass a Map with our custom metadata.

In both cases above, createLedger is a synchronous operation. BookKeeper also offers asynchronous ledger creation using a callback:

bk.asyncCreateLedger(
  3,
  2,
  2,
  BookKeeper.DigestType.MAC, "passwd".getBytes(),
  (rc, lh, ctx) -> {
      // ... use lh to access ledger operations
  },
  null,
  Collections.emptyMap());

Newer versions of BookKeeper (>= 4.6) also support a fluent-style API and CompletableFuture to achieve the same goal:

CompletableFuture<WriteHandle> cf = bk.newCreateLedgerOp()
  .withDigestType(org.apache.bookkeeper.client.api.DigestType.MAC)
  .withPassword("password".getBytes())
  .execute();

Note that, in this case, we get a WriteHandle instead of a LedgerHandle. As we'll see later, we can use any of them to access our ledger as LedgerHandle implements WriteHandle.

5.3. Writing Data

Once we've acquired a LedgerHandle or WriteHandle, we write data to the associated ledger using one of the append() method variants. Let's start with the synchronous variant:

for(int i = 0; i < MAX_MESSAGES; i++) {
    byte[] data = new String("message-" + i).getBytes();
    lh.append(data);
}

Here, we're using a variant that takes a byte array. The API also supports Netty's ByteBuf and Java NIO's ByteBuffer, which allow better memory management in time-critical scenarios.

For asynchronous operations, the API differs a bit depending on the specific handle type we've acquired. WriteHandle uses CompletableFuture, whereas LedgerHandle also supports callback-based methods:

// Available in WriteHandle and LedgerHandle
CompletableFuture<Long> f = lh.appendAsync(data);

// Available only in LedgerHandle
lh.asyncAddEntry(
  data,
  (rc,ledgerHandle,entryId,ctx) -> {
      // ... callback logic omitted
  },
  null);

Which one to choose is largely a personal choice, but in general, using CompletableFuture-based APIs tends to be easier to read. Also, there's the side benefit that we can construct a Mono directly from it, making it easier to integrate BookKeeper in reactive applications.

5.4. Reading Data

Reading data from a BookKeeper ledger works in a similar way to writing. First, we use our BookKeeper instance to create a LedgerHandle:

LedgerHandle lh = bk.openLedger(
  ledgerId, 
  BookKeeper.DigestType.MAC,
  ledgerPassword);

Except for the ledgerId parameter, which we'll cover later, this code looks much like the createLedger() method we've seen before. There's an important difference, though; this method returns a read-only LedgerHandle instance. If we try to use any of the available append() methods, all we'll get is an exception.

Alternatively, a safer way is to use the fluent-style API:

ReadHandle rh = bk.newOpenLedgerOp()
  .withLedgerId(ledgerId)
  .withDigestType(DigestType.MAC)
  .withPassword("password".getBytes())
  .execute()
  .get();

ReadHandle has the required methods to read data from our ledger:

long lastId = lh.readLastConfirmed();
rh.read(0, lastId).forEach((entry) -> {
    // ... do something 
});

Here, we've simply requested all available data in this ledger using the synchronous read variant. As expected, there's also an async variant:

rh.readAsync(0, lastId).thenAccept((entries) -> {
    entries.forEach((entry) -> {
        // ... process entry
    });
});

If we choose to use the older openLedger() method, we'll find additional methods that support the callback style for async methods:

lh.asyncReadEntries(
  0,
  lastId,
  (rc,lh,entries,ctx) -> {
      while(entries.hasMoreElements()) {
          LedgerEntry e = ee.nextElement();
      }
  },
  null);

5.5. Listing Ledgers

We've seen previously that we need the ledger's id to open and read its data. So, how do we get one? One way is using the LedgerManager interface, which we can access from our BookKeeper instance. This interface basically deals with ledger metadata, but also has the asyncProcessLedgers() method. Using this method – and some help form concurrent primitives – we can enumerate all available ledgers:

public List listAllLedgers(BookKeeper bk) {
    List ledgers = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch processDone = new CountDownLatch(1);

    bk.getLedgerManager()
      .asyncProcessLedgers(
        (ledgerId, cb) -> {
            ledgers.add(ledgerId);
            cb.processResult(BKException.Code.OK, null, null);
        }, 
        (rc, s, obj) -> {
            processDone.countDown();
        },
        null,
        BKException.Code.OK,
        BKException.Code.ReadException);
 
    try {
        processDone.await(1, TimeUnit.MINUTES);
        return ledgers;
    } catch (InterruptedException ie) {
        throw new RuntimeException(ie);
    }
}

Let's digest this code, which is a bit longer than expected for a seemingly trivial task. The asyncProcessLedgers() method requires two callbacks.

The first one collects all ledgers ids in a list. We're using a synchronized list here because this callback can be called from multiple threads. Besides the ledger id, this callback also receives a callback parameter. We must call its processResult() method to acknowledge that we've processed the data and to signal that we're ready to get more data.

The second callback gets called when all ledgers have been sent to the processor callback or when there's a failure. In our case, we've omitted the error handling. Instead, we're just decrementing a CountDownLatch, which, in turn, will finish the await operation and allow the method to return with a list of all available ledgers.

6. Conclusion

In this article we've covered the Apache BookKeeper project, taking a look at its core concepts and using its low-level API to access Ledgers and perform read/write operations.

As usual, all code is available over on GitHub.

Java bottom

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE
Comments are closed on this article!