Partner – Microsoft – NPI (cat=Java)
announcement - icon

Microsoft JDConf 2024 conference is getting closer, on March 27th and 28th. Simply put, it's a free virtual event to learn about the newest developments in Java, Cloud, and AI.

Josh Long and Mark Heckler are kicking things off in the keynote, so it's definitely going to be both highly useful and quite practical.

This year’s theme is focused on developer productivity and how these technologies transform how we work, build, integrate, and modernize applications.

For the full conference agenda and speaker lineup, you can explore JDConf.com:

>> RSVP Now

 

1. Overview

Simply put, MBassador is a high-performance event bus utilizing the publish-subscribe semantics.

Messages are broadcasted to one or more peers without the prior knowledge of how many subscribers there are, or how they use the message.

2. Maven Dependency

Before we can use the library, we need to add the mbassador dependency:

<dependency>
    <groupId>net.engio</groupId>
    <artifactId>mbassador</artifactId>
    <version>1.3.1</version>
</dependency>

3. Basic Event Handling

3.1. Simple Example

We’ll start with a simple example of publishing a message:

private MBassador<Object> dispatcher = new MBassador<>();
private String messageString;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenStringDispatched_thenHandleString() {
    dispatcher.post("TestString").now();
 
    assertNotNull(messageString);
    assertEquals("TestString", messageString);
}

@Handler
public void handleString(String message) {
    messageString = message;
}

At the top of this test class, we see the creation of a MBassador with its default constructor. Next, in the @Before method, we call subscribe() and pass in a reference to the class itself.

In subscribe(), the dispatcher inspects the subscriber for @Handler annotations.

And, in the first test, we call dispatcher.post(…).now() to dispatch the message – which results in handleString() being called.

This initial test demonstrates several important concepts. Any Object can be a subscriber, as long as it has one or more methods annotated with @Handler. A subscriber can have any number of handlers.

We’re using test objects that subscribe to themselves for the sake of simplicity, but in most production scenarios, message dispatchers will in different classes from consumers.

Handler methods have only one input parameter – the message, and can’t throw any checked exceptions.

Similar to the subscribe() method, the post method accepts any Object. This Object is delivered to subscribers.

When a message is posted, it is delivered to any listeners that have subscribed to the message type.

Let’s add another message handler and send a different message type:

private Integer messageInteger; 

@Test
public void whenIntegerDispatched_thenHandleInteger() {
    dispatcher.post(42).now();
 
    assertNull(messageString);
    assertNotNull(messageInteger);
    assertTrue(42 == messageInteger);
}

@Handler
public void handleInteger(Integer message) {
    messageInteger = message;
}

As expected, when we dispatch an Integer, handleInteger() is called, and handleString() is not. A single dispatcher can be used to send more than one message type.

3.2. Dead Messages

So where does a message go when there is no handler for it? Let’s add a new event handler and then send a third message type:

private Object deadEvent; 

@Test
public void whenLongDispatched_thenDeadEvent() {
    dispatcher.post(42L).now();
 
    assertNull(messageString);
    assertNull(messageInteger);
    assertNotNull(deadEvent);
    assertTrue(deadEvent instanceof Long);
    assertTrue(42L == (Long) deadEvent);
} 

@Handler
public void handleDeadEvent(DeadMessage message) {
    deadEvent = message.getMessage();
}

In this test, we dispatch a Long instead of an Integer. Neither handleInteger() nor handleString() are called, but handleDeadEvent() is.

When there are no handlers for a message, it gets wrapped in a DeadMessage object. Since we added a handler for Deadmessage, we capture it.

DeadMessage can be safely ignored; if an application does not need to track dead messages, they can be allowed to go nowhere.

4. Using an Event Hierarchy

Sending String and Integer events is limiting. Let’s create a few message classes:

public class Message {}

public class AckMessage extends Message {}

public class RejectMessage extends Message {
    int code;

    // setters and getters
}

We have a simple base class and two classes that extend it.

4.1. Sending a Base Class Message

We’ll start with Message events:

private MBassador<Message> dispatcher = new MBassador<>();

private Message message;
private AckMessage ackMessage;
private RejectMessage rejectMessage;

@Before
public void prepareTests() {
    dispatcher.subscribe(this);
}

@Test
public void whenMessageDispatched_thenMessageHandled() {
    dispatcher.post(new Message()).now();
    assertNotNull(message);
    assertNull(ackMessage);
    assertNull(rejectMessage);
}

@Handler
public void handleMessage(Message message) {
    this.message = message;
}

@Handler
public void handleRejectMessage(RejectMessage message) {
   rejectMessage = message;
}

@Handler
public void handleAckMessage(AckMessage message) {
    ackMessage = message;
}

Discover MBassador – a high-performance pub-sub event bus. This limits us to using Messages but adds an additional layer of type safety.

When we send a Message, handleMessage() receives it. The other two handlers do not.

4.2. Sending a Subclass Message

Let’s send a RejectMessage:

@Test
public void whenRejectDispatched_thenMessageAndRejectHandled() {
    dispatcher.post(new RejectMessage()).now();
 
    assertNotNull(message);
    assertNotNull(rejectMessage);
    assertNull(ackMessage);
}

When we send a RejectMessage both handleRejectMessage() and handleMessage() receive it.

Since RejectMessage extends Message, the Message handler received it, in addition to the RejectMessage handler.

Let’s verify this behavior with an AckMessage:

@Test
public void whenAckDispatched_thenMessageAndAckHandled() {
    dispatcher.post(new AckMessage()).now();
 
    assertNotNull(message);
    assertNotNull(ackMessage);
    assertNull(rejectMessage);
}

Just as we expected, when we send an AckMessage, both handleAckMessage() and handleMessage() receive it.

5. Filtering Messages

Organizing messages by type is already a powerful feature, but we can filter them even more.

5.1. Filter on Class and Subclass

When we posted a RejectMessage or AckMessage, we received the event in both the event handler for the particular type and in the base class.

We can solve this type hierarchy issue by making Message abstract and creating a class such as GenericMessage. But what if we don’t have this luxury?

We can use message filters:

private Message baseMessage;
private Message subMessage;

@Test
public void whenMessageDispatched_thenMessageFiltered() {
    dispatcher.post(new Message()).now();
 
    assertNotNull(baseMessage);
    assertNull(subMessage);
}

@Test
public void whenRejectDispatched_thenRejectFiltered() {
    dispatcher.post(new RejectMessage()).now();
 
    assertNotNull(subMessage);
    assertNull(baseMessage);
}

@Handler(filters = { @Filter(Filters.RejectSubtypes.class) })
public void handleBaseMessage(Message message) {
    this.baseMessage = message;
}

@Handler(filters = { @Filter(Filters.SubtypesOnly.class) })
public void handleSubMessage(Message message) {
    this.subMessage = message;
}

The filters parameter for the @Handler annotation accepts a Class that implements IMessageFilter. The library offers two examples:

The Filters.RejectSubtypes does as its name suggests: it will filter out any subtypes. In this case, we see that RejectMessage is not handled by handleBaseMessage().

The Filters.SubtypesOnly also does as its name suggests: it will filter out any base types. In this case, we see that Message is not handled by handleSubMessage().

5.2. IMessageFilter

The Filters.RejectSubtypes and the Filters.SubtypesOnly both implement IMessageFilter.

RejectSubTypes compares the class of the message to its defined message types and will only allow through messages that equal one of its types, as opposed to any subclasses.

5.3. Filter With Conditions

Fortunately, there is an easier way of filtering messages. MBassador supports a subset of Java EL expressions as conditions for filtering messages.

Let’s filter a String message based on its length:

private String testString;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
    dispatcher.post("foobar!").now();
 
    assertNull(testString);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

The “foobar!” message is seven characters long and is filtered. Let’s send a shorter String:


@Test
public void whenShortStringDispatched_thenStringHandled() {
    dispatcher.post("foobar").now();
 
    assertNotNull(testString);
}

Now, the “foobar” is only six characters long and is passed through.

Our RejectMessage contains a field with an accessor. Let’s write a filter for that:

private RejectMessage rejectMessage;

@Test
public void whenWrongRejectDispatched_thenRejectFiltered() {

    RejectMessage testReject = new RejectMessage();
    testReject.setCode(-1);

    dispatcher.post(testReject).now();
 
    assertNull(rejectMessage);
    assertNotNull(subMessage);
    assertEquals(-1, ((RejectMessage) subMessage).getCode());
}

@Handler(condition = "msg.getCode() != -1")
public void handleRejectMessage(RejectMessage rejectMessage) {
    this.rejectMessage = rejectMessage;
}

Here again, we can query a method on an object and either filter the message or not.

5.4. Capture Filtered Messages

Similar to DeadEvents, we may want to capture and process filtered messages. There is a dedicated mechanism for capturing filtered events too. Filtered events are treated differently from “dead” events.

Let’s write a test that illustrates this:

private String testString;
private FilteredMessage filteredMessage;
private DeadMessage deadMessage;

@Test
public void whenLongStringDispatched_thenStringFiltered() {
    dispatcher.post("foobar!").now();
 
    assertNull(testString);
    assertNotNull(filteredMessage);
    assertTrue(filteredMessage.getMessage() instanceof String);
    assertNull(deadMessage);
}

@Handler(condition = "msg.length() < 7")
public void handleStringMessage(String message) {
    this.testString = message;
}

@Handler
public void handleFilterMessage(FilteredMessage message) {
    this.filteredMessage = message;
}

@Handler
public void handleDeadMessage(DeadMessage deadMessage) {
    this.deadMessage = deadMessage;
}

With the addition of a FilteredMessage handler, we can track Strings that are filtered because of their length. The filterMessage contains our too-long String while deadMessage remains null.

6. Asynchronous Message Dispatch and Handling

So far all of our examples have used synchronous message dispatch; when we called post.now() the messages were delivered to each handler in the same thread we called post() from.

6.1. Asynchronous Dispatch

The MBassador.post() returns a SyncAsyncPostCommand. This class offers several methods, including:

  • now() – dispatch messages synchronously; the call will block until all messages have been delivered
  • asynchronously() – executes the message publication asynchronously

Let’s use asynchronous dispatch in a sample class. We’ll use Awaitility in these tests to simplify the code:

private MBassador<Message> dispatcher = new MBassador<>();
private String testString;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenAsyncDispatched_thenMessageReceived() {
    dispatcher.post("foobar").asynchronously();
 
    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testString);
}

@Handler
public void handleStringMessage(String message) {
    this.testString = message;
    ready.set(true);
}

We call asynchronously() in this test, and use an AtomicBoolean as a flag with await() to wait for the delivery thread to deliver the message.

If we comment out the call to await(), we risk the test failing, because we check testString before the delivery thread completes.

6.2. Asynchronous Handler Invocation

Asynchronous dispatch allows the message provider to return to message processing before the messages are delivered to each handler, but it still calls each handler in order, and each handler has to wait for the previous to finish.

This can lead to problems if one handler performs an expensive operation.

MBassador provides a mechanism for asynchronous handler invocation. Handlers configured for this receive messages in their thread:

private Integer testInteger;
private String invocationThreadName;
private AtomicBoolean ready = new AtomicBoolean(false);

@Test
public void whenHandlerAsync_thenHandled() {
    dispatcher.post(42).now();
 
    await().untilAtomic(ready, equalTo(true));
    assertNotNull(testInteger);
    assertFalse(Thread.currentThread().getName().equals(invocationThreadName));
}

@Handler(delivery = Invoke.Asynchronously)
public void handleIntegerMessage(Integer message) {
 
    this.invocationThreadName = Thread.currentThread().getName();
    this.testInteger = message;
    ready.set(true);
}

Handlers can request asynchronous invocation with the delivery = Invoke.Asynchronously property on the Handler annotation. We verify this in our test by comparing the Thread names in the dispatching method and the handler.

7. Customizing MBassador

So far we’ve been using an instance of MBassador with its default configuration. The dispatcher’s behavior can be modified with annotations, similar to those we have seen so far; we’ll cover a few more to finish this tutorial.

7.1. Exception Handling

Handlers cannot define checked exceptions. Instead, the dispatcher can be provided with an IPublicationErrorHandler as an argument to its constructor:

public class MBassadorConfigurationTest
  implements IPublicationErrorHandler {

    private MBassador dispatcher;
    private String messageString;
    private Throwable errorCause;

    @Before
    public void prepareTests() {
        dispatcher = new MBassador<String>(this);
        dispatcher.subscribe(this);
    }

    @Test
    public void whenErrorOccurs_thenErrorHandler() {
        dispatcher.post("Error").now();
 
        assertNull(messageString);
        assertNotNull(errorCause);
    }

    @Test
    public void whenNoErrorOccurs_thenStringHandler() {
        dispatcher.post("Error").now();
 
        assertNull(errorCause);
        assertNotNull(messageString);
    }

    @Handler
    public void handleString(String message) {
        if ("Error".equals(message)) {
            throw new Error("BOOM");
        }
        messageString = message;
    }

    @Override
    public void handleError(PublicationError error) {
        errorCause = error.getCause().getCause();
    }
}

When handleString() throws an Error, it is saved to errorCause.

7.2. Handler Priority

Handlers are called in reverse order of how they are added, but this isn’t behavior we want to rely on. Even with the ability to call handlers in their threads, we may still need to know what order they will be called in.

We can set handler priority explicitly:

private LinkedList<Integer> list = new LinkedList<>();

@Test
public void whenRejectDispatched_thenPriorityHandled() {
    dispatcher.post(new RejectMessage()).now();

    // Items should pop() off in reverse priority order
    assertTrue(1 == list.pop());
    assertTrue(3 == list.pop());
    assertTrue(5 == list.pop());
}

@Handler(priority = 5)
public void handleRejectMessage5(RejectMessage rejectMessage) {
    list.push(5);
}

@Handler(priority = 3)
public void handleRejectMessage3(RejectMessage rejectMessage) {
    list.push(3);
}

@Handler(priority = 2, rejectSubtypes = true)
public void handleMessage(Message rejectMessage) 
    logger.error("Reject handler #3");
    list.push(3);
}

@Handler(priority = 0)
public void handleRejectMessage0(RejectMessage rejectMessage) {
    list.push(1);
}

Handlers are called from highest priority to lowest. Handlers with the default priority, which is zero, are called last. We see that the handler numbers pop() off in reverse order.

7.3. Reject Subtypes, the Easy Way

What happened to handleMessage() in the test above? We don’t have to use RejectSubTypes.class to filter our sub types.

RejectSubTypes is a boolean flag that provides the same filtering as the class, but with better performance than the IMessageFilter implementation.

We still need to use the filter-based implementation for accepting subtypes only, though.

8. Conclusion

MBassador is a simple and straightforward library for passing messages between objects. Messages can be organized in a variety of ways and can be dispatched synchronously or asynchronously.

And, as always, the example is available in this GitHub project.

Course – LS (cat=Java)

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are closed on this article!