I usually post about Persistence on Twitter - you can follow me there:

1. Introduction

In this follow-up to our tutorial on using Couchbase in a Spring application, we explore the the asynchronous nature of the Couchbase SDK and how it may be used to perform persistence operations in batches, thus allowing our application to achieve optimal use of Couchbase resources.

1.1. CrudService Interface

First, we augment our generic CrudService interface to include batch operations:

public interface CrudService<T> {
    ...
    
    List<T> readBulk(Iterable<String> ids);

    void createBulk(Iterable<T> items);

    void updateBulk(Iterable<T> items);

    void deleteBulk(Iterable<String> ids);

    boolean exists(String id);
}

1.2. CouchbaseEntity Interface

We define an interface for the entities that we want to persist:

public interface CouchbaseEntity {

    String getId();
    
    void setId(String id);
    
}

1.3. AbstractCrudService Class

Then we will implement each of these methods in a generic abstract class. This class is derived from the PersonCrudService class that we used in the previous tutorial and begins as follows:

public abstract class AbstractCrudService<T extends CouchbaseEntity> implements CrudService<T> {
    private BucketService bucketService;
    private Bucket bucket;
    private JsonDocumentConverter<T> converter;

    public AbstractCrudService(BucketService bucketService, JsonDocumentConverter<T> converter) {
        this.bucketService = bucketService;
        this.converter = converter;
    }

    protected void loadBucket() {
        bucket = bucketService.getBucket();
    }
    
    ...
}

2. The Asynchronous Bucket Interface

The Couchbase SDK provides the AsyncBucket interface for performing asynchronous operations. Given a Bucket instance, you can obtain its asynchronous version via the async() method:

AsyncBucket asyncBucket = bucket.async();

3. Batch Operations

To perform batch operations using the AsyncBucket interface, we employ the RxJava library.

3.1. Batch Read

Here we implement the readBulk method. First we use the AsyncBucket and the flatMap mechanism in RxJava to retrieve the documents asynchronously into an Observable<JsonDocument>, then we use the toBlocking mechanism in RxJava to convert these to a list of entities:

@Override
public List<T> readBulk(Iterable<String> ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable<JsonDocument> asyncOperation = Observable
      .from(ids)
      .flatMap(new Func1<String, Observable<JsonDocument>>() {
          public Observable<JsonDocument> call(String key) {
              return asyncBucket.get(key);
          }
    });

    List<T> items = new ArrayList<T>();
    try {
        asyncOperation.toBlocking()
          .forEach(new Action1<JsonDocument>() {
              public void call(JsonDocument doc) {
                  T item = converter.fromDocument(doc);
                  items.add(item);
              }
        });
    } catch (Exception e) {
        logger.error("Error during bulk get", e);
    }

    return items;
}

3.2. Batch Insert

We again use RxJava’s flatMap construct to implement the createBulk method.

Since bulk mutation requests are produced faster than their responses can be generated, sometimes resulting in an overload condition, we institute a retry with exponential delay whenever a BackpressureException is encountered:

@Override
public void createBulk(Iterable<T> items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1<T, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(final T t) {
              if(t.getId() == null) {
                  t.setId(UUID.randomUUID().toString());
              }
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.insert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

3.3. Batch Update

We use a similar mechanism in the updateBulk method:

@Override
public void updateBulk(Iterable<T> items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1<T, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(final T t) {
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.upsert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

3.4. Batch Delete

And we write the deleteBulk method as follows:

@Override
public void deleteBulk(Iterable<String> ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(ids)
      .flatMap(new Func1<String, Observable<JsonDocument>>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable<JsonDocument> call(String key) {
              return asyncBucket.remove(key)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

4. PersonCrudService

Finally, we write a Spring service, PersonCrudService, that extends our AbstractCrudService for the Person entity.

Since all of the Couchbase interaction is implemented in the abstract class, the implementation for an entity class is trivial, as we only need to ensure that all our dependencies are injected and our bucket loaded:

@Service
public class PersonCrudService extends AbstractCrudService<Person> {

    @Autowired
    public PersonCrudService(
      @Qualifier("TutorialBucketService") BucketService bucketService,
      PersonDocumentConverter converter) {
        super(bucketService, converter);
    }

    @PostConstruct
    private void init() {
        loadBucket();
    }
}

5. Conclusion

The source code shown in this tutorial is available in the github project.

You can learn more about the Couchbase Java SDK at the official Couchbase developer documentation site.

I usually post about Persistence on Twitter - you can follow me there:


  • H J Madhu Sudan

    Really a nice blog. Thank you for sharing the knowledge. If only I had come across this article I could have saved lots of time.