Persistence top

Get started with Spring Data JPA through the reference Learn Spring Data JPA course:

>> CHECK OUT THE COURSE
Authors Top

If you have a few years of experience in the Java ecosystem, and you’d like to share that with the community, have a look at our Contribution Guidelines.

Spring Top – Temp

Get started with Spring 5 and Spring Boot 2, through the reference Learn Spring course:

>> LEARN SPRING
Lightrun – Third Party Code

We rely on other people’s code in our own work. Every day. It might be the language you’re writing in, the framework you’re building on, or some esoteric piece of software that does one thing so well you never found the need to implement it yourself.

The problem is, of course, when things fall apart in production - debugging the implementation of a 3rd party library you have no intimate knowledge of is, to say the least, tricky. It’s difficult to understand what talks to what and, specifically, which part of the underlying library is at fault.

Lightrun is a new kind of debugger.

It's one geared specifically towards real-life production environments. Using Lightrun, you can drill down into running applications, including 3rd party dependencies, with real-time logs, snapshots, and metrics. No hotfixes, redeployments, or restarts required.

Learn more in this quick, 5-minute Lightrun tutorial:

>> The Essential List of Spring Boot Annotations and Their Use Cases

1. Introduction

In this follow-up to our tutorial on using Couchbase in a Spring application, we explore the asynchronous nature of the Couchbase SDK and how it may be used to perform persistence operations in batches, thus allowing our application to achieve optimal use of Couchbase resources.

1.1. CrudService Interface

First, we augment our generic CrudService interface to include batch operations:

public interface CrudService<T> {
    ...
    
    List<T> readBulk(Iterable<String> ids);

    void createBulk(Iterable<T> items);

    void updateBulk(Iterable<T> items);

    void deleteBulk(Iterable<String> ids);

    boolean exists(String id);
}

1.2. CouchbaseEntity Interface

We define an interface for the entities that we want to persist:

public interface CouchbaseEntity {

    String getId();
    
    void setId(String id);
    
}

1.3. AbstractCrudService Class

Then we will implement each of these methods in a generic abstract class. This class is derived from the PersonCrudService class that we used in the previous tutorial and begins as follows:

public abstract class AbstractCrudService<T extends CouchbaseEntity> implements CrudService<T> {
    private BucketService bucketService;
    private Bucket bucket;
    private JsonDocumentConverter<T> converter;

    public AbstractCrudService(BucketService bucketService, JsonDocumentConverter<T> converter) {
        this.bucketService = bucketService;
        this.converter = converter;
    }

    protected void loadBucket() {
        bucket = bucketService.getBucket();
    }
    
    ...
}

2. The Asynchronous Bucket Interface

The Couchbase SDK provides the AsyncBucket interface for performing asynchronous operations. Given a Bucket instance, you can obtain its asynchronous version via the async() method:

AsyncBucket asyncBucket = bucket.async();

3. Batch Operations

To perform batch operations using the AsyncBucket interface, we employ the RxJava library.

3.1. Batch Read

Here we implement the readBulk method. First we use the AsyncBucket and the flatMap mechanism in RxJava to retrieve the documents asynchronously into an Observable<JsonDocument>, then we use the toBlocking mechanism in RxJava to convert these to a list of entities:

@Override
public List<T> readBulk(Iterable<String> ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable<JsonDocument> asyncOperation = Observable
      .from(ids)
      .flatMap(new Func1<String, Observable<JsonDocument>>() {
          public Observable<JsonDocument> call(String key) {
              return asyncBucket.get(key);
          }
    });

    List<T> items = new ArrayList<T>();
    try {
        asyncOperation.toBlocking()
          .forEach(new Action1<JsonDocument>() {
              public void call(JsonDocument doc) {
                  T item = converter.fromDocument(doc);
                  items.add(item);
              }
        });
    } catch (Exception e) {
        logger.error("Error during bulk get", e);
    }

    return items;
}

3.2. Batch Insert

We again use RxJava's flatMap construct to implement the createBulk method.

Since bulk mutation requests are produced faster than their responses can be generated, sometimes resulting in an overload condition, we institute a retry with exponential delay whenever a BackpressureException is encountered:

@Override
public void createBulk(Iterable<T> items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1<T, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(final T t) {
              if(t.getId() == null) {
                  t.setId(UUID.randomUUID().toString());
              }
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.insert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

3.3. Batch Update

We use a similar mechanism in the updateBulk method:

@Override
public void updateBulk(Iterable<T> items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1<T, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(final T t) {
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.upsert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

3.4. Batch Delete

And we write the deleteBulk method as follows:

@Override
public void deleteBulk(Iterable<String> ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(ids)
      .flatMap(new Func1<String, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(String key) {
              return asyncBucket.remove(key)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

4. PersonCrudService

Finally, we write a Spring service, PersonCrudService, that extends our AbstractCrudService for the Person entity.

Since all of the Couchbase interaction is implemented in the abstract class, the implementation for an entity class is trivial, as we only need to ensure that all our dependencies are injected and our bucket loaded:

@Service
public class PersonCrudService extends AbstractCrudService<Person> {

    @Autowired
    public PersonCrudService(
      @Qualifier("TutorialBucketService") BucketService bucketService,
      PersonDocumentConverter converter) {
        super(bucketService, converter);
    }

    @PostConstruct
    private void init() {
        loadBucket();
    }
}

5. Conclusion

The source code shown in this tutorial is available in the github project.

You can learn more about the Couchbase Java SDK at the official Couchbase developer documentation site.

NoSql Bottom

Build a Dashboard Using Cassandra, Astra, and Stargate

>> CHECK OUT THE ARTICLE
Spring bottom

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> THE COURSE
Persistence bottom
Get started with Spring Data JPA through the reference Learn Spring Data JPA course: >> CHECK OUT THE COURSE
Persistence footer banner
Comments are closed on this article!