1. Overview

In this tutorial, we'll cover the Wire Tap Enterprise Integration Pattern (EIP), which helps us monitor messages flowing through the system.

This pattern allows us to intercept the messages without permanently consuming them off the channel.

2. Wire Tap Pattern

The Wire Tap inspects messages that travel on a Point-to-Point Channel. It receives the message, makes a copy, and sends it to the Tap Destination:

To understand this better, let's create a Spring Boot application with ActiveMQ and Camel.

3. Maven Dependencies

Let's add camel-spring-boot-dependencies:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.apache.camel.springboot</groupId>
            <artifactId>camel-spring-boot-dependencies</artifactId>
            <version>${camel.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Now, we'll add camel-spring-boot-starter:

<dependency>
    <groupId>org.apache.camel.springboot</groupId>
    <artifactId>camel-spring-boot-starter</artifactId>
</dependency>

To view the messages flowing through a route, we'll also need to include ActiveMQ:

<dependency>
    <groupId>org.apache.camel.springboot</groupId>
    <artifactId>camel-activemq-starter</artifactId>
</dependency>

4. Messaging Exchange

Let's create a message object:

public class MyPayload implements Serializable {
    private String value;
    ...
}

We will send this message to the direct:source to initiate the route:

try (CamelContext context = new DefaultCamelContext()) {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
    connectionFactory.setTrustAllPackages(true);
    context.addComponent("direct", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
    addRoute(context);

    try (ProducerTemplate template = context.createProducerTemplate()) {
        context.start();

        MyPayload payload = new MyPayload("One");
        template.sendBody("direct:source", payload);
        Thread.sleep(10000);
    } finally {
        context.stop();
    }
}

Next, we'll add a route and tap destination.

5. Tapping an Exchange

We will use the wireTap method to set the endpoint URI of the Tap Destination. Camel doesn't wait for a response from wireTap because it sets the Message Exchange Pattern to InOnly. The Wire Tap processor processes it on a separate thread:

wireTap("direct:tap").delay(1000)

Camel's Wire Tap node supports two flavors when tapping an exchange:

5.1. Traditional Wire Tap

Let's add a traditional Wire Tap route:

RoutesBuilder traditionalWireTapRoute() {
    return new RouteBuilder() {
        public void configure() {

            from("direct:source").wireTap("direct:tap")
                .delay(1000)
                .bean(MyBean.class, "addTwo")
                .to("direct:destination");

            from("direct:tap").log("Tap Wire route: received");

            from("direct:destination").log("Output at destination: '${body}'");
        }
    };
}

Here, Camel will only copy the Exchange – it won't do a deep cloneAll copies could share objects from the original exchange.

While processing multiple messages concurrently, there's a possibility of corrupting the final payload. We can create a deep clone of the payload before passing it to the Tap Destination to prevent this.

5.2. Sending a New Exchange

The Wire Tap EIP supports an Expression or Processor, pre-populated with a copy of the exchange. An Expression can only be used to set the message body.

The Processor variation gives full power over how the exchange is populated (setting properties, headers, etc).

Let's implement deep cloning in the payload:

public class MyPayload implements Serializable {

    private String value;
    ...
    public MyPayload deepClone() {
        MyPayload myPayload = new MyPayload(value);
        return myPayload;
   }
}

Now, let's implement the Processor class with a copy of the original exchange as input:

public class MyPayloadClonePrepare implements Processor {

    public void process(Exchange exchange) throws Exception {
        MyPayload myPayload = exchange.getIn().getBody(MyPayload.class);
        exchange.getIn().setBody(myPayload.deepClone());
        exchange.getIn().setHeader("date", new Date());
    }
}

We'll call it using onPrepare right after wireTap:

RoutesBuilder newExchangeRoute() throws Exception {
    return new RouteBuilder() {
        public void configure() throws Exception {

        from("direct:source").wireTap("direct:tap")
            .onPrepare(new MyPayloadClonePrepare())
            .end()
            .delay(1000);

        from("direct:tap").bean(MyBean.class, "addThree");
        }
     };
}

6. Conclusion

In this article, we implemented a Wire Tap pattern to monitor messages passing through certain message endpoints. Using Apache Camel's wireTap, we copy the message and send it to a different endpoint without altering the existing flow.

Camel supports two ways to tap an exchange. In the traditional Wire Tap, the original exchange is copied. In the second, we can create a new exchange. We can populate this new exchange with new values of message body using an Expression, or we can set headers – and optionally, the body – using a Processor.

The code sample is available over on GitHub.

Generic bottom

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE
guest
0 Comments
Inline Feedbacks
View all comments