I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE COURSE

1. Overview

Spring Cloud Stream is a framework built on top of Spring Boot and Spring Integration that helps in creating event-driven or message-driven microservices.

In this article, we’ll introduce concepts and constructs of Spring Cloud Stream with some simple examples.

2. Maven Dependencies

To get started, we’ll need to add the Spring Cloud Starter Stream with the broker RabbitMQ Maven dependency as messaging-middleware to our pom.xml:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    <version>1.3.0.RELEASE</version>
</dependency>

And we’ll add the module dependency from Maven Central to enable JUnit support as well:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-support</artifactId>
    <version>1.3.0.RELEASE</version>
    <scope>test</scope>
</dependency>

3. Main Concepts

Microservices architecture follows the “smart endpoints and dumb pipes” principle. Communication between endpoints is driven by messaging-middleware parties like RabbitMQ or Apache Kafka. Services communicate by publishing domain events via these endpoints or channels.

Let’s walk through the concepts that make up the Spring Cloud Stream framework, along with the essential paradigms that we must be aware of to build message-driven services.

3.1. Constructs

Let’s look at a simple service in Spring Cloud Stream that listens to input binding and sends a response to the output binding:

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}

The annotation @EnableBinding configures the application to bind the channels INPUT and OUTPUT defined within the interface Processor. Both channels are bindings that can be configured to use a concrete messaging-middleware or binder.

Let’s take a look at the definition of all these concepts:

  • Bindings — a collection of interfaces that identify the input and output channels declaratively
  • Binder — messaging-middleware implementation such as Kafka or RabbitMQ
  • Channel — represents the communication pipe between messaging-middleware and the application
  • StreamListeners — message-handling methods in beans that will be automatically invoked on a message from the channel after the MessageConverter does the serialization/deserialization between middleware-specific events and domain object types / POJOs
  • Message Schemas — used for serialization and deserialization of messages, these schemas can be statically read from a location or loaded dynamically, supporting the evolution of domain object types

3.2. Communication Patterns

Messages designated to destinations are delivered by the Publish-Subscribe messaging pattern. Publishers categorize messages into topics, each identified by a name. Subscribers express interest in one or more topics. The middleware filters the messages, delivering those of the interesting topics to the subscribers.

Now, the subscribers could be grouped. A consumer group is a set of subscribers or consumers, identified by a group id, within which messages from a topic or topic’s partition are delivered in a load-balanced manner.

4. Programming Model

This section describes the basics of building Spring Cloud Stream applications.

4.1. Functional Testing

The test support is a binder implementation that allows interacting with the channels and inspecting messages.

Let’s send a message to the above enrichLogMessage service and check whether the response contains the text “[1]: “ at the beginning of the message:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationTests {

    @Autowired
    private Processor pipe;

    @Autowired
    private MessageCollector messageCollector;

    @Test
    public void whenSendMessage_thenResponseShouldUpdateText() {
        pipe.input()
          .send(MessageBuilder.withPayload(new LogMessage("This is my message"))
          .build());

        Object payload = messageCollector.forChannel(pipe.output())
          .poll()
          .getPayload();

        assertEquals("[1]: This is my message", payload.toString());
    }
}

4.2. Custom Channels

In the above example, we used the Processor interface provided by Spring Cloud, which has only one input and one output channel.

If we need something different, like one input and two output channels, we can create a custom processor:

public interface MyProcessor {
    String INPUT = "myInput";

    @Input
    SubscribableChannel myInput();

    @Output("myOutput")
    MessageChannel anOutput();

    @Output
    MessageChannel anotherOutput();
}

Spring will provide the proper implementation of this interface for us. The channel names can be set using annotations like in @Output(“myOutput”).

Otherwise, Spring will use the method names as the channel names. Therefore, we’ve got three channels called myInput, myOutput, and anotherOutput.

Now, let’s imagine we want to route the messages to one output if the value is less than 10 and into another output is the value is greater than or equal to 10:

@Autowired
private MyProcessor processor;

@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
    if (val < 10) {
        processor.anOutput().send(message(val));
    } else {
        processor.anotherOutput().send(message(val));
    }
}

private static final <T> Message<T> message(T val) {
    return MessageBuilder.withPayload(val).build();
}

4.3. Conditional Dispatching

Using the @StreamListener annotation, we also can filter the messages we expect in the consumer using any condition that we define with SpEL expressions.

As an example, we could use conditional dispatching as another approach to route messages into different outputs:

@Autowired
private MyProcessor processor;

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
    processor.anOutput().send(message(val));
}

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
    processor.anotherOutput().send(message(val));
}

The only limitation of this approach is that these methods must not return a value.

5. Setup

Let’s set up the application that will process the message from the RabbitMQ broker.

5.1. Binder Configuration

We can configure our application to use the default binder implementation via META-INF/spring.binders:

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Or we can add the binder library for RabbitMQ to the classpath by including this dependency:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    <version>1.3.0.RELEASE</version>
</dependency>

If no binder implementation is provided, Spring will use direct message communication between the channels.

5.2. RabbitMQ Configuration

To configure the example in section 3.1 to use the RabbitMQ binder, we need to update the application.yml located at src/main/resources:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
        output:
          destination: queue.pretty.log.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host>
                port: 5672
                username: <username>
                password: <password>
                virtual-host: /

The input binding will use the exchange called queue.log.messages, and the output binding will use the exchange queue.pretty.log.messages. Both bindings will use the binder called local_rabbit.

Note that we don’t need to create the RabbitMQ exchanges or queues in advance. When running the application, both exchanges are automatically created.

To test the application, we can use the RabbitMQ management site to publish a message. In the Publish Message panel of the exchange queue.log.messages, we need to enter the request in JSON format.

5.3. Customizing Message Conversion

Spring Cloud Stream allows us to apply message conversion for specific content types. In the above example, instead of using JSON format, we want to provide plain text.

To do this, we’ll to apply a custom transformation to LogMessage using a MessageConverter:

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    //...

    @Bean
    public MessageConverter providesTextPlainMessageConverter() {
        return new TextPlainMessageConverter();
    }

    //...
}
public class TextPlainMessageConverter extends AbstractMessageConverter {

    public TextPlainMessageConverter() {
        super(new MimeType("text", "plain"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (LogMessage.class == clazz);
    }

    @Override
    protected Object convertFromInternal(Message<?> message, 
        Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        String text = payload instanceof String 
          ? (String) payload 
          : new String((byte[]) payload);
        return new LogMessage(text);
    }
}

After applying these changes, going back to the Publish Message panel, if we set the header “contentTypes” to “text/plain” and the payload to “Hello World“, it should work as before.

5.4. Consumer Groups

When running multiple instances of our application, every time there is a new message in an input channel, all subscribers will be notified.

Most of the time, we need the message to be processed only once. Spring Cloud Stream implements this behavior via consumer groups.

To enable this behavior, each consumer binding can use the spring.cloud.stream.bindings.<CHANNEL>.group property to specify a group name:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
          group: logMessageConsumers
          ...

6. Message-Driven Microservices

In this section, we introduce all the required features for running our Spring Cloud Stream applications in a microservices context.

6.1. Scaling Up

When multiple applications are running, it’s important to ensure the data is split properly across consumers. To do so, Spring Cloud Stream provides two properties:

  • spring.cloud.stream.instanceCount — number of running applications
  • spring.cloud.stream.instanceIndex — index of the current application

For example, if we’ve deployed two instances of the above MyLoggerServiceApplication application, the property spring.cloud.stream.instanceCount should be 2 for both applications, and the property spring.cloud.stream.instanceIndex should be 0 and 1 respectively.

These properties are automatically set if we deploy the Spring Cloud Stream applications using Spring Data Flow as described in this article.

6.2. Partitioning

The domain events could be Partitioned messages. This helps when we are scaling up the storage and improving application performance.

The domain event usually has a partition key so that it ends up in the same partition with related messages.

Let’s say that we want the log messages to be partitioned by the first letter in the message, which would be the partition key, and grouped into two partitions.

There would be one partition for the log messages that start with A-M and another partition for N-Z. This can be configured using two properties:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression — the expression to partition the payloads
  • spring.cloud.stream.bindings.output.producer.partitionCount — the number of groups

Sometimes the expression to partition is too complex to write it in only one line. For these cases, we can write our custom partition strategy using the property spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Health Indicator

In a microservices context, we also need to detect when a service is down or starts failing. Spring Cloud Stream provides the property management.health.binders.enabled to enable the health indicators for binders.

When running the application, we can query the health status at http://<host>:<port>/health.

7. Conclusion

In this tutorial, we presented the main concepts of Spring Cloud Stream and showed how to use it through some simple examples over RabbitMQ. More info about Spring Cloud Stream can be found here.

The source code for this article can be found over on GitHub.

I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE LESSONS

newest oldest most voted
help
Guest

Log-message is deprecated and we cannot run the above locally

Grzegorz Piwowarek
Editor

Which version are you working with?