<

I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE COURSE

1. Overview

InfluxDB is a high-performance store for time-series data. It supports insertion and real-time querying of data via a SQL-like query language.

In this introductory article, we’ll demonstrate how to connect to an InfluxDb server, create a database, write time-series information, and then query the database.

2. Setup

To connect to the database, we’ll need to add an entry to our pom.xml file:

<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.8</version>
</dependency>

The latest version of this dependency can be found on Maven Central.

We’ll also need an InfluxDB instance. Instructions for downloading and installing a database can be found on the InfluxData website.

3. Connecting to a Server

3.1. Creating a Connection

Creating a database connection requires passing a URL String and user credentials to a connection factory:

InfluxDB influxDB = InfluxDBFactory.connect(databaseURL, userName, password);

3.2. Verifying the Connection

Communications with the database are performed over a RESTful API, so they aren’t persistent.

The API offers a dedicated “ping” service to confirm that the connection is functional. If the connection is good, the response contains a database version. If not, it contains “unknown”.

So after creating a connection, we can verify it by doing:

Pong response = this.influxDB.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
    log.error("Error pinging server.");
    return;
} 

3.3. Creating a Database

Creating an InfluxDB database is similar to creating a database on most platforms. But we need to create at least one retention policy before using it.

A retention policy tells the database how long a piece of data should be stored. Time series, such as CPU or memory statistics, tend to accumulate in large datasets.

A typical strategy for controlling the size of time series databases is downsampling. “Raw” data is stored at a high rate, summarized, and then removed after a short time.

Retention policies simplify this by associating a piece of data with an expiration time. InfluxData has an in-depth explanation on their site.

After creating the database, we’ll add a single policy named defaultPolicy. It will simply retain data for 30 days:

influxDB.createDatabase("baeldung");
influxDB.createRetentionPolicy(
  "defaultPolicy", "baeldung", "30d", 1, true);

To create a retention policy, we’ll need a name, the database, an interval, a replication factor (which should be 1 for a single-instance database), and a boolean indicating it’s a default policy.

3.4. Setting a Logging Level

Internally, InfluxDB API uses Retrofit and exposes an interface to Retrofit’s logging facility, via a logging interceptor.

So, we can set the logging level using:

influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);

And now we can see messages when we open a connection and ping it:

Dec 20, 2017 5:38:10 PM okhttp3.internal.platform.Platform log
INFO: --> GET http://127.0.0.1:8086/ping

The available levels are BASIC, FULL, HEADERS, and NONE.

4. Adding and Retrieving Data

4.1. Points

So now we’re ready to start inserting and retrieving data.

The basic unit of information in InfluxDB is a Point, which is essentially a timestamp and a key-value map.

Let’s have a look at a point holding memory utilization data:

Point point = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743656L)
  .addField("used", 1015096L)
  .addField("buffer", 1010467L)
  .build();

We’ve created an entry that contains three Longs as memory statistics, a hostname, and a timestamp.

Let’s see how to add this to the database.

4.2. Writing Batches

Time series data tends to consist of many small points, and writing those records one at a time would be very inefficient. The preferred method is to collect records into batches.

The InfluxDB API provides a BatchPoint object:

BatchPoints batchPoints = BatchPoints
  .database(dbName)
  .retentionPolicy("defaultPolicy")
  .build();

Point point1 = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1") 
  .addField("free", 4743656L)
  .addField("used", 1015096L) 
  .addField("buffer", 1010467L)
  .build();

Point point2 = Point.measurement("memory")
  .time(System.currentTimeMillis() - 100, TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743696L)
  .addField("used", 1016096L)
  .addField("buffer", 1008467L)
  .build();

batchPoints.point(point1);
batchPoints.point(point2);
influxDB.write(batchPoints);

We create a BatchPoint and then add Points to it. We set the timestamp for our second entry to 100 milliseconds in the past since the timestamps are a primary index. If we send two points with the same timestamp, only one will be kept.

Note that we must associate BatchPoints with a database and a retention policy.

4.3. Writing One at a Time

Batching may be impractical for some use-cases.

Let’s enable batch mode with a single call to an InfluxDB connection:

influxDB.enableBatch(100, 200, TimeUnit.MILLISECONDS);

We enabled batching of 100 for insertion into the server or sending what it has every 200 milliseconds.

With batch mode enabled, we can still write one at a time. However, some additional setup is required:

influxDB.setRetentionPolicy("defaultPolicy");
influxDB.setDatabase(dbName);

Moreover, now we can write individuals points, and they are being collected in batches by a background thread:

influxDB.write(point);

Before we enqueue individual points, we need to set a database (similar to the use command in SQL) and set a default retention policy. Therefore, if we wish to take advantage of downsampling with multiple retention policies, creating batches is the way to go.

Batch mode utilizes a separate thread pool. So it’s a good idea to disable it when it’s no longer needed:

influxDB.disableBatch();

Closing the connection will also shut down the thread pool:

influxDB.close();

4.4. Mapping Query Results

Queries return a QueryResult, which we can map to POJOs.

Before we look at the query syntax, let’s create a class to hold our memory statistics:

@Measurement(name = "memory")
public class MemoryPoint {

    @Column(name = "time")
    private Instant time;

    @Column(name = "name")
    private String name;

    @Column(name = "free")
    private Long free;

    @Column(name = "used")
    private Long used;

    @Column(name = "buffer")
    private Long buffer;
}

The class is annotated with @Measurement(name = “memory”), corresponding to the Point.measurement(“memory”) we used to create our Points.

For each field in our QueryResult, we add the @Column(name = “XXX”) annotation with the name of the corresponding field.

QueryResults are mapped to POJOs with an InfluxDBResultMapper.

4.5. Querying InfluxDB

So let’s use our POJO with the points we added to the database in our two-point batch:

QueryResult queryResult = connection
  .performQuery("Select * from memory", "baeldung");

InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List<MemoryPoint> memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743696L == memoryPointList.get(0).getFree());

The query illustrates how our measurement named memory is stored as a table of Points that we can select from.

InfluxDBResultMapper accepts a reference to MemoryPoint.class with the QueryResult and returns a list of points.

After we map the results, we verify that we received two by checking the length of the List we received from the query. Then we look at the first entry in the list and see the free memory size of the second point we inserted. The default ordering of query results from InfluxDB is ascending by timestamp.

Let’s change that:

queryResult = connection.performQuery(
  "Select * from memory order by time desc", "baeldung");
memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743656L == memoryPointList.get(0).getFree());

Adding order by time desc reverses the order of our results.

InfluxDB queries look very similar to SQL. There is an extensive reference guide on their site.

5. Conclusion

We’ve connected to an InfluxDB server, created a database with a retention policy, and then inserted and retrieved data from the server.

The full source code of the examples is over on GitHub.

I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE LESSONS