I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE COURSE

1. Overview

Simply put, rxjava-jdbc is an API for interacting with relational databases which allows fluent-style method calls. In this quick tutorial, we’re going to have a look at the library and how we can make use of some of its common features.

If you want to discover RxJava’s basics, check out this article.

Further reading:

Introduction to RxJava

Discover RxJava - a library for composing asynchronous and event-based programs.

Read more

Dealing with Backpressure with RxJava

A guide demonstrating several strategies of handling backpressure in RxJava

Read more

Observable Utility Operators in RxJava

Learn how to use various RxJava utility operators.

Read more

2. Maven Dependency

Let’s start with the Maven dependency we need to add to our pom.xml:

<dependency>
    <groupId>com.github.davidmoten</groupId>
    <artifactId>rxjava-jdbc</artifactId>
    <version>0.7.11</version>
</dependency>

We can find the latest version of the API on Maven Central.

3. Main Components

The Database class is the main entry point for running all common types of database interactions. To create a Database object, we can pass an instance of an implementation of the ConnectionProvider interface to the from() static method:

public static ConnectionProvider connectionProvider
  = new ConnectionProviderFromUrl(
  DB_CONNECTION, DB_USER, DB_PASSWORD);
Database db = Database.from(connectionProvider);

ConnectionProvider has several implementations worth looking at – such as ConnectionProviderFromContext, ConnectionProviderFromDataSource, ConnectionProviderFromUrl and ConnectionProviderPooled.

In order to do basic operations, we can use the following APIs of Database:

  • select() – used for SQL select queries
  • update() – used for DDL statements such as create and drop, as well as insert, update and delete

4. Starting Up

In the next quick example, we’re going to show how we can do all the basic database operations:

public class BasicQueryTypesTest {
    
    Observable<Integer> create,
      insert1, 
      insert2, 
      insert3, 
      update, 
      delete = null;
    
    @Test
    public void whenCreateTableAndInsertRecords_thenCorrect() {
        create = db.update(
          "CREATE TABLE IF NOT EXISTS EMPLOYEE("
          + "id int primary key, name varchar(255))")
          .count();
        insert1 = db.update(
          "INSERT INTO EMPLOYEE(id, name) VALUES(1, 'John')")
          .dependsOn(create)
          .count();
        update = db.update(
          "UPDATE EMPLOYEE SET name = 'Alan' WHERE id = 1")
          .dependsOn(create)
          .count();
        insert2 = db.update(
          "INSERT INTO EMPLOYEE(id, name) VALUES(2, 'Sarah')")
          .dependsOn(create)
          .count();
        insert3 = db.update(
          "INSERT INTO EMPLOYEE(id, name) VALUES(3, 'Mike')")
          .dependsOn(create)
          .count();
        delete = db.update(
          "DELETE FROM EMPLOYEE WHERE id = 2")
          .dependsOn(create)
          .count();
        List<String> names = db.select(
          "select name from EMPLOYEE where id < ?")
          .parameter(3)
          .dependsOn(create)
          .dependsOn(insert1)
          .dependsOn(insert2)
          .dependsOn(insert3)
          .dependsOn(update)
          .dependsOn(delete)
          .getAs(String.class)
          .toList()
          .toBlocking()
          .single();
        
        assertEquals(Arrays.asList("Alan"), names);
    }
}

A quick note here – we’re calling dependsOn() to determine the order of running of queries.

Otherwise, the code will fail or produce unpredictable results, unless we specify in what sequence we want the queries to be executed.

5. Automap

The automap feature allows us to map selected database records to objects.

Let’s have a look at the two ways of automapping database records.

5.1. Automapping using an Interface

We can automap() database records to objects using annotated interfaces. To do this, we can create an annotated interface:

public interface Employee {

    @Column("id")
    int id();

    @Column("name")
    String name();
}

Then, we can run our test:

@Test
public void whenSelectFromTableAndAutomap_thenCorrect() {
    List<Employee> employees = db.select("select id, name from EMPLOYEE")
      .dependsOn(create)
      .dependsOn(insert1)
      .dependsOn(insert2)
      .autoMap(Employee.class)
      .toList()
      .toBlocking()
      .single();
    
    assertThat(
      employees.get(0).id()).isEqualTo(1);
    assertThat(
      employees.get(0).name()).isEqualTo("Alan");
    assertThat(
      employees.get(1).id()).isEqualTo(2);
    assertThat(
      employees.get(1).name()).isEqualTo("Sarah");
}

5.2. Automapping using a Class

We can also automap database records to objects using concrete classes. Let’s see what the class can look like:

public class Manager {

    private int id;
    private String name;

    // standard constructors, getters, and setters
}

Now, we can run our test:

@Test
public void whenSelectManagersAndAutomap_thenCorrect() {
    List<Manager> managers = db.select("select id, name from MANAGER")
      .dependsOn(create)
      .dependsOn(insert1)
      .dependsOn(insert2)
      .autoMap(Manager.class)
      .toList()
      .toBlocking()
      .single();
    
    assertThat(
      managers.get(0).getId()).isEqualTo(1);
    assertThat(
     managers.get(0).getName()).isEqualTo("Alan");
    assertThat(
      managers.get(1).getId()).isEqualTo(2);
    assertThat(
      managers.get(1).getName()).isEqualTo("Sarah");
}

A few notes here:

  • create, insert1 and insert2 are references to Observables returned by creating the Manager table and inserting records into it
  • The number of the selected columns in our query must match the number of parameters in the Manager class constructor
  • The columns must be of types that can be automatically mapped to the types in the constructor

For more information on automapping, visit the rxjava-jdbc repository on GitHub

6. Working with Large Objects

The API supports working with Large Objects like CLOBs and BLOBS. In the next subsections, we’re going to see how we can make use of this functionality.

6.1. CLOBs

Let’s see how we can insert and select a CLOB:

@Before
public void setup() throws IOException {
    create = db.update(
      "CREATE TABLE IF NOT EXISTS " + 
      "SERVERLOG (id int primary key, document CLOB)")
        .count();
    
    InputStream actualInputStream
      = new FileInputStream("src/test/resources/actual_clob");
    actualDocument = getStringFromInputStream(actualInputStream);

    InputStream expectedInputStream = new FileInputStream(
      "src/test/resources/expected_clob");
 
    expectedDocument = getStringFromInputStream(expectedInputStream);
    insert = db.update(
      "insert into SERVERLOG(id,document) values(?,?)")
        .parameter(1)
        .parameter(Database.toSentinelIfNull(actualDocument))
      .dependsOn(create)
      .count();
}

@Test
public void whenSelectCLOB_thenCorrect() throws IOException {
    db.select("select document from SERVERLOG where id = 1")
      .dependsOn(create)
      .dependsOn(insert)
      .getAs(String.class)
      .toList()
      .toBlocking()
      .single();
    
    assertEquals(expectedDocument, actualDocument);
}

Note that getStringFromInputStream() is a method that converts the content of an InputStream to a String.

6.2. BLOBs

We can use the API to work with BLOBs in a very similar way. The only difference is, instead of passing a String to the toSentinelIfNull() method, we have to pass a byte array.

Here’s how we can do that:

@Before
public void setup() throws IOException {
    create = db.update(
      "CREATE TABLE IF NOT EXISTS " 
      + "SERVERLOG (id int primary key, document BLOB)")
        .count();
    
    InputStream actualInputStream
      = new FileInputStream("src/test/resources/actual_clob");
    actualDocument = getStringFromInputStream(actualInputStream);
    byte[] bytes = this.actualDocument.getBytes(StandardCharsets.UTF_8);
    
    InputStream expectedInputStream = new FileInputStream(
      "src/test/resources/expected_clob");
    expectedDocument = getStringFromInputStream(expectedInputStream);
    insert = db.update(
      "insert into SERVERLOG(id,document) values(?,?)")
      .parameter(1)
      .parameter(Database.toSentinelIfNull(bytes))
      .dependsOn(create)
      .count();
}

Then, we can reuse the same test in the previous example.

7. Transactions

Next, let’s have a look at the support for transactions.

Transaction management allows us to handle transactions which are used to group multiple database operations in a single transaction so that they can all be committed – permanently saved to the database, or rolled back altogether.

Let’s see a quick example:

@Test
public void whenCommitTransaction_thenRecordUpdated() {
    Observable<Boolean> begin = db.beginTransaction();
    Observable<Integer> createStatement = db.update(
      "CREATE TABLE IF NOT EXISTS EMPLOYEE(id int primary key, name varchar(255))")
      .dependsOn(begin)
      .count();
    Observable<Integer> insertStatement = db.update(
      "INSERT INTO EMPLOYEE(id, name) VALUES(1, 'John')")
      .dependsOn(createStatement)
      .count();
    Observable<Integer> updateStatement = db.update(
      "UPDATE EMPLOYEE SET name = 'Tom' WHERE id = 1")
      .dependsOn(insertStatement)
      .count();
    Observable<Boolean> commit = db.commit(updateStatement);
    String name = db.select("select name from EMPLOYEE WHERE id = 1")
      .dependsOn(commit)
      .getAs(String.class)
      .toBlocking()
      .single();
    
    assertEquals("Tom", name);
}

In order to start a transaction, we call the method beginTransaction(). After this method is called, every database operation is run in the same transaction until any of the methods commit() or rollback() is called.

We can use the rollback() method while catching an Exception to roll back the whole transaction in case the code fails for any reason. We can do so for all Exceptions or particular expected Exceptions.

8. Returning Generated Keys

If we set the auto_increment field in the table we’re working on, we might need to retrieve the generated value. We can do this by calling the returnGeneratedKeys() method.

Let’s see a quick example:

@Test
public void whenInsertAndReturnGeneratedKey_thenCorrect() {
    Integer key = db.update("INSERT INTO EMPLOYEE(name) VALUES('John')")
      .dependsOn(createStatement)
      .returnGeneratedKeys()
      .getAs(Integer.class)
      .count()
      .toBlocking()
      .single();
 
    assertThat(key).isEqualTo(1);
}

9. Conclusion

In this tutorial, we’ve seen how to make use of rxjavajdbc’s fluent-style methods. We’ve also walked through some of the features it provides such as automapping, working with Large Objects and transactions.

As always, the full version of the code is available over on GitHub.

I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE LESSONS

newest oldest most voted
Zakaria
Guest
Zakaria

JDBC is known to be a blocking/synchronous, unlike drivers for NoSQL databases like MongoDB and CoucheBase. is there actually any benefit for having a reactive API on top of a non reactive one?

Jan Vladimir Mostert
Guest

I can’t see how, somewhere a thread will be blocking