Spring Top

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE

1. Overview

In this quick article, we'll introduce the reactor-bus by setting up a real-life scenario for a reactive, event-driven application.

2. The Basics of Project Reactor

2.1. Why Reactor?

Modern applications need to deal with a huge number of concurrent requests and process a significant amount of data. Standard, blocking code is no longer sufficient to fulfill these requirements.

The reactive design pattern is an event-based architectural approach for asynchronous handling of a large volume of concurrent service requests coming from single or multiple service handlers.

The Project Reactor is based on this pattern and has a clear and ambitious goal of building non-blocking, reactive applications on the JVM.

2.2. Example Scenarios

Before we get started, here are a few interesting scenarios where leveraging the reactive architectural style would make sense, just to get an idea of where we might apply it:

  • Notification services for a large online shopping platform like Amazon
  • Huge transaction processing services for the banking sector
  • Stocks trading businesses where stocks' prices change simultaneously

3. Maven Dependencies

Let's start to use Project Reactor Bus by adding the following dependency into our pom.xml:

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-bus</artifactId>
    <version>2.0.8.RELEASE</version>
</dependency>

We can check the latest version of reactor-bus in Maven Central.

4. Building a Demo Application

To better understand the benefits of the reactor-based approach, let's look at a practical example.

We'll build a simple application responsible for sending notifications to the users of an online shopping platform. For example, if a user places a new order, then the app sends an order confirmation via email or SMS.

A typical synchronous implementation would naturally be limited by the email or SMS service's throughput. Therefore, traffic spikes, such as holidays would generally be problematic.

With a reactive approach, we can design our system to be more flexible and to adapt better to failures or timeouts that may occur in the external systems, such as gateway servers.

Let's have a look at the application – starting with the more traditional aspects and moving on to the more reactive constructs.

4.1. Simple POJO

First, let's create a POJO class to represent the notification data:

public class NotificationData {
	
    private long id;
    private String name;
    private String email;
    private String mobile;
    
    // getter and setter methods
}

4.2. The Service Layer

Let's now define a simple service layer:

public interface NotificationService {

    void initiateNotification(NotificationData notificationData) 
      throws InterruptedException;

}

And the implementation, simulating a long-running operation:

@Service
public class NotificationServiceimpl implements NotificationService {
	
    @Override
    public void initiateNotification(NotificationData notificationData) 
      throws InterruptedException {

      System.out.println("Notification service started for "
        + "Notification ID: " + notificationData.getId());
		
      Thread.sleep(5000);
		
      System.out.println("Notification service ended for "
        + "Notification ID: " + notificationData.getId());
    }
}

Notice that to illustrate a real-life scenario of sending messages via an SMS or email gateway, we're intentionally introducing a five seconds delay in the initiateNotification method with Thread.sleep(5000).

Consequently, when a thread hits the service, it'll be blocked for five seconds.

4.3. The Consumer

Let's now jump into the more reactive aspects of our application and implement a consumer – which we'll then map to the reactor event bus:

@Service
public class NotificationConsumer implements 
  Consumer<Event<NotificationData>> {

    @Autowired
    private NotificationService notificationService;
	
    @Override
    public void accept(Event<NotificationData> notificationDataEvent) {
        NotificationData notificationData = notificationDataEvent.getData();
        
        try {
            notificationService.initiateNotification(notificationData);
        } catch (InterruptedException e) {
            // ignore        
        }	
    }
}

 

As we can see, the consumer we created implements the Consumer<T> interface. The main logic resides in the accept method.

This is a similar approach we can meet in a typical Spring listener implementation.

4.4. The Controller

Finally, now that we're able to consume the events, let's also generate them.

We're going to do that in a simple controller:

@Controller
public class NotificationController {

    @Autowired
    private EventBus eventBus;

    @GetMapping("/startNotification/{param}")
    public void startNotification(@PathVariable Integer param) {
        for (int i = 0; i < param; i++) {
            NotificationData data = new NotificationData();
            data.setId(i);

            eventBus.notify("notificationConsumer", Event.wrap(data));

            System.out.println(
              "Notification " + i + ": notification task submitted successfully");
        }
    }
}

This is quite self-explanatory – we're emitting events through the EventBus here.

For example, if a client hits the URL with a param value of ten, then ten events will be sent through the event bus.

4.5. The Java Config

Let's now put everything together and create a simple Spring Boot application.

First, we need to configure EventBus and Environment beans:

@Configuration
public class Config {

    @Bean
    public Environment env() {
        return Environment.initializeIfEmpty().assignErrorJournal();
    }

    @Bean
    public EventBus createEventBus(Environment env) {
        return EventBus.create(env, Environment.THREAD_POOL);
    }
}

In our case, we're instantiating the EventBus with a default thread pool available in the environment.

Alternatively, we can use a customized Dispatcher instance:

EventBus evBus = EventBus.create(
  env, 
  Environment.newDispatcher(
    REACTOR_CAPACITY,REACTOR_CONSUMERS_COUNT,   
    DispatcherType.THREAD_POOL_EXECUTOR));

Now, we're ready to create a main application code:

import static reactor.bus.selector.Selectors.$;

@SpringBootApplication
public class NotificationApplication implements CommandLineRunner {

    @Autowired
    private EventBus eventBus;

    @Autowired
    private NotificationConsumer notificationConsumer;

    @Override
    public void run(String... args) throws Exception {
        eventBus.on($("notificationConsumer"), notificationConsumer);
    }

    public static void main(String[] args) {
        SpringApplication.run(NotificationApplication.class, args);
    }
}

In our run method we're registering the notificationConsumer to be triggered when the notification matches a given selector.

Notice how we're using the static import of the $ attribute to create a Selector object.

5. Test the Application

Let's now create a test to see our NotificationApplication in action:

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class NotificationApplicationIntegrationTest {

    @LocalServerPort
    private int port;

    @Test
    public void givenAppStarted_whenNotificationTasksSubmitted_thenProcessed() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.getForObject("http://localhost:" + port + "/startNotification/10", String.class);
    }
}

As we can see, as soon as the request is executed, all ten tasks get submitted instantly without creating any blocking. And once submitted, the notification events get processed in parallel.

Notification 0: notification task submitted successfully
Notification 1: notification task submitted successfully
Notification 2: notification task submitted successfully
Notification 3: notification task submitted successfully
Notification 4: notification task submitted successfully
Notification 5: notification task submitted successfully
Notification 6: notification task submitted successfully
Notification 7: notification task submitted successfully
Notification 8: notification task submitted successfully
Notification 9: notification task submitted successfully
Notification service started for Notification ID: 1
Notification service started for Notification ID: 2
Notification service started for Notification ID: 3
Notification service started for Notification ID: 0
Notification service ended for Notification ID: 1
Notification service ended for Notification ID: 0
Notification service started for Notification ID: 4
Notification service ended for Notification ID: 3
Notification service ended for Notification ID: 2
Notification service started for Notification ID: 6
Notification service started for Notification ID: 5
Notification service started for Notification ID: 7
Notification service ended for Notification ID: 4
Notification service started for Notification ID: 8
Notification service ended for Notification ID: 6
Notification service ended for Notification ID: 5
Notification service started for Notification ID: 9
Notification service ended for Notification ID: 7
Notification service ended for Notification ID: 8
Notification service ended for Notification ID: 9

It's important to keep in mind that in our scenario there's no need to process these events in any particular order.

6. Conclusion

In this quick tutorial, we've created a simple event-driven application. We've also seen how to start writing a more reactive and non-blocking code.

However, this scenario just scratches the surface of the subject and represents just a good base to start experimenting with the reactive paradigm.

As always, the source code is available over on GitHub.

Spring bottom

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE
2 Comments
Oldest
Newest
Inline Feedbacks
View all comments
M Yank
M Yank
3 years ago

really good article. one question here: on low end system, how could we assign thread pool limit?

Abhinab Kanrar
Abhinab Kanrar
3 years ago
Reply to  M Yank

Following way you could assign thread pool:

@Bean
EventBus createEventBus(Environment env) {
EventBus evBus = EventBus.create(env, Environment.newDispatcher(NUMBER_OF_THREAD_YOU_WANT_TO_ASSIGN, NUMBER_OF_THREAD_YOU_WANT_TO_ASSIGN, DispatcherType.THREAD_POOL_EXECUTOR));
return evBus;
}

ideally you should assign available processor number as the thread pool size. So in the above case, NUMBER_OF_THREAD_YOU_WANT_TO_ASSIGN should be assigned in the following way:

NUMBER_OF_THREAD_YOU_WANT_TO_ASSIGN = Runtime.getRuntime().availableProcessors();

Comments are closed on this article!