1. Introduction

Kinesis is a tool for collecting, processing, and analyzing data streams in real-time, developed at Amazon. One of its main advantages is that it helps with the development of event-driven applications.

In this tutorial, we’ll explore a few libraries that enable our Spring application to produce and consume records from a Kinesis Stream. The code examples will show the basic functionality but don’t represent the production-ready code.

2. Prerequisite

Before we go any further, we need to do two things.

The first is to create a Spring project, as the goal here is to interact with Kinesis from a Spring project.

The second one is to create a Kinesis Data Stream. We can do this from a web browser in our AWS account. One alternative for the AWS CLI fans among us is to use the command line. Because we’ll interact with it from code, we also must have at hand the AWS IAM Credentials, the access key and secret key, and the region.

All our producers will create dummy IP address records, while the consumers will read those values and list them in the application console.

3. AWS SDK for Java

The very first library we’ll use is the AWS SDK for Java. Its advantage is that it allows us to manage many parts of working with Kinesis Data Streams. We can read data, produce data, create data streams, and reshard data streams. The drawback is that in order to have production-ready code, we’ll have to code aspects like resharding, error handling, or a daemon to keep the consumer alive.

3.1. Maven Dependency

The amazon-kinesis-client Maven dependency will bring everything we need to have working examples. We’ll now add it to our pom.xml file:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-kinesis</artifactId>
    <version>1.12.380</version>
</dependency>

3.2. Spring Setup

Let’s reuse the AmazonKinesis object needed to interact with our Kinesis Stream. We’ll create it as a @Bean inside our @SpringBootApplication class:

@Bean
public AmazonKinesis buildAmazonKinesis() {
    BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
    return AmazonKinesisClientBuilder.standard()
      .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
      .withRegion(Regions.EU_CENTRAL_1)
      .build();
}

Next, let’s define the aws.access.key and aws.secret.key, needed for the local machine, in application.properties:

aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here

And we’ll read them using the @Value annotation:

@Value("${aws.access.key}")
private String accessKey;

@Value("${aws.secret.key}")
private String secretKey;

For the sake of simplicity, we’re going to rely on @Scheduled methods to create and consume records.

3.3. Consumer

The AWS SDK Kinesis Consumer uses a pull model, meaning our code will draw records from the shards of the Kinesis data stream:

GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);

GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
    recordsResult.getRecords().stream()
      .map(record -> new String(record.getData().array()))
      .forEach(System.out::println);

    recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
    recordsResult = kinesis.getRecords(recordsRequest);
}

The GetRecordsRequest object builds the request for stream data. In our example, we’ve defined a limit of 25 records per request, and we keep reading until there’s nothing more to read.

We can also notice that, for our iteration, we’ve used a GetShardIteratorResult object. We created this object inside a @PostConstruct method so that we’ll begin tracking records straight away:

private GetShardIteratorResult shardIterator;

@PostConstruct
private void buildShardIterator() {
    GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
    readShardsRequest.setStreamName(IPS_STREAM);
    readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
    readShardsRequest.setShardId(IPS_SHARD_ID);

    this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}

3.4. Producer

Let’s now see how to handle the creation of records for our Kinesis data stream.

We insert data using a PutRecordsRequest object. For this new object, we add a list that comprises multiple PutRecordsRequestEntry objects:

List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
    PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
    entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
    entry.setPartitionKey(IPS_PARTITION_KEY);
    return entry;
}).collect(Collectors.toList());

PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName(IPS_STREAM);
createRecordsRequest.setRecords(entries);

kinesis.putRecords(createRecordsRequest);

We’ve created a basic consumer and a producer of simulated IP records. All that’s left to do now is to run our Spring project and see IPs listed in our application console.

4. KCL and KPL

Kinesis Client Library (KCL) is a library that simplifies the consuming of records. It’s also a layer of abstraction over the AWS SDK Java APIs for Kinesis Data Streams. Behind the scenes, the library handles load balancing across many instances, responding to instance failures, checkpointing processed records, and reacting to resharding.

Kinesis Producer Library (KPL) is a library useful for writing to a Kinesis data stream. It also provides a layer of abstraction that sits over the AWS SDK Java APIs for Kinesis Data Streams. For better performance, the library automatically handles batching, multi-threading, and retry logic.

KCL and KPL both have the main advantage that they’re easy to use so that we can focus on producing and consuming records.

4.1. Maven Dependencies

The two libraries can be brought separately in our project if needed. To include KPL and KCL in our Maven project, we need to update our pom.xml file:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-producer</artifactId>
    <version>0.13.1</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.14.9</version>
</dependency>

4.2. Spring Setup

The only Spring preparation we need is to make sure we have the IAM credentials at hand. The values for aws.access.key and aws.secret.key are defined in our application.properties file so we can read them with @Value when needed.

4.3. Consumer

First, we’ll create a class that implements the IRecordProcessor interface and defines our logic for how to handle Kinesis data stream records, which is to print them in the console:

public class IpProcessor implements IRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) { }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.getRecords()
          .forEach(record -> System.out.println(new String(record.getData().array())));
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) { }
}

The next step is to define a factory class that implements the IRecordProcessorFactory interface and returns a previously created IpProcessor object:

public class IpProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new IpProcessor();
    }
}

And now for the final step, we’ll use a Worker object to define our consumer pipeline. We need a KinesisClientLibConfiguration object that will define, if needed, the IAM Credentials and AWS Region.

We’ll pass the KinesisClientLibConfiguration, and our IpProcessorFactory object, to our Worker and then start it in a separate thread. We keep this logic of consuming records always alive with the use of the Worker class, so we’re continuously reading new records now:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
  "KinesisKCLConsumer",
  IPS_STREAM,
  "",
  "",
  DEFAULT_INITIAL_POSITION_IN_STREAM,
  new AWSStaticCredentialsProvider(awsCredentials),
  new AWSStaticCredentialsProvider(awsCredentials),
  new AWSStaticCredentialsProvider(awsCredentials),
  DEFAULT_FAILOVER_TIME_MILLIS,
  "KinesisKCLConsumer",
  DEFAULT_MAX_RECORDS,
  DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
  DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST,
  DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
  DEFAULT_SHARD_SYNC_INTERVAL_MILLIS,
  DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
  new ClientConfiguration(),
  new ClientConfiguration(),
  new ClientConfiguration(),
  DEFAULT_TASK_BACKOFF_TIME_MILLIS,
  DEFAULT_METRICS_BUFFER_TIME_MILLIS,
  DEFAULT_METRICS_MAX_QUEUE_SIZE,
  DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
  Regions.EU_CENTRAL_1.getName(),
  DEFAULT_SHUTDOWN_GRACE_MILLIS,
  DEFAULT_DDB_BILLING_MODE,
  null,
  0,
  0, 
  0
);
final Worker worker = new Worker.Builder()
 .recordProcessorFactory(new IpProcessorFactory())
 .config(consumerConfig)
 .build();
CompletableFuture.runAsync(worker.run());

4.4. Producer

Let’s now define the KinesisProducerConfiguration object, adding the IAM Credentials and the AWS Region:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
  .setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
  .setVerifyCertificate(false)
  .setRegion(Regions.EU_CENTRAL_1.getName());

this.kinesisProducer = new KinesisProducer(producerConfig);

We’ll include the kinesisProducer object previously created in a @Scheduled job and produce records for our Kinesis data stream continuously:

IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
  .forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));

5. Spring Cloud Stream Binder Kinesis

We’ve already seen two libraries, both created outside of the Spring ecosystem. We’ll now see how the Spring Cloud Stream Binder Kinesis can simplify our life further while building on top of Spring Cloud Stream.

5.1. Maven Dependency

The Maven dependency we need to define in our application for the Spring Cloud Stream Binder Kinesis is:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
    <version>2.2.0</version>
</dependency>

5.2. Spring Setup

When running on EC2, the required AWS properties are automatically discovered, so there is no need to define them. Since we’re running our examples on a local machine, we need to define our IAM access key, secret key, and region for our AWS account. We’ve also disabled the automatic CloudFormation stack name detection for the application:

cloud.aws.credentials.access-key=my-aws-access-key
cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false

Spring Cloud Stream is bundled with three interfaces that we can use in our stream binding:

  • The Sink is for data ingestion
  • The Source is used for publishing records
  • The Processor is a combination of both

We can also define our own interfaces if we need to.

5.3. Consumer

Defining a consumer is a two-part job. First, we’ll define, in the application.properties, the data stream from which we’ll consume:

spring.cloud.stream.bindings.input-in-0.destination=live-ips
spring.cloud.stream.bindings.input-in-0.group=live-ips-group
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.function.definition = input

And next, let’s define a Spring @Configuration class with a @Bean using a Supplier that allow us to read from the Kinesis stream:

@Configuration public class ConsumerBinder {
  @Bean Consumer < String > input() {
    return str - >{
      System.out.println(str);
    };
  }
}

5.4. Producer

The producer can also be split in two. First, we have to define our stream properties inside application.properties:

spring.cloud.stream.bindings.output-out-0.destination=myStream
spring.cloud.stream.bindings.output-out-0.content-type=text/plain
spring.cloud.stream.poller.fixed-delay = 3000

And then we add @Bean with a Supplier on a Spring @Configuration to create new test messages every few seconds:

@Configuration class ProducerBinder {
  @Bean public Supplier output() {
    return () -> IntStream.range(1, 200)
                 .mapToObj(ipSuffix - >"192.168.0." + ipSuffix)
                 .map(entry - >MessageBuilder.withPayload(entry)
                 .build());
  }
}

That’s all we need for Spring Cloud Stream Binder Kinesis to work. We can simply start the application now.

6. Conclusion

In this article, we’ve seen how to integrate our Spring project with two AWS libraries for interacting with a Kinesis Data Stream. We’ve also seen how to use the Spring Cloud Stream Binder Kinesis library to make the implementation even easier.

The source code for this article can be found over on Github.

Course – LS (cat=Spring)

Get started with Spring and Spring Boot, through the Learn Spring course:

>> THE COURSE
res – Microservices (eBook) (cat=Cloud/Spring Cloud)
Comments are closed on this article!