I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE COURSE

1. Introduction

In this article, we’ll demonstrate how to use the Spring Cloud App starters – which provide bootstrapped and ready-to-go applications – that can serve as starting points for future development.

Simply put, Task App Starters are dedicated for use-cases like database migration and distributed testing, and Stream App Starters provide integrations with external systems.

Overall, there are over 55 starters; check out the official documentation here and here for more information about these two.

Next, we’ll build a small distributed Twitter application that will stream Twitter posts into a Hadoop Distributed File System.

2. Getting Setup

We’ll use the consumer-key and access-token to create a simple Twitter app.

Then, we’ll set up Hadoop so we can persist our Twitter Stream for future Big Data purposes.

Lastly, we have the option to either use the supplied Spring GitHub repositories to compile and assemble standalone components of the sourcesprocessors-sinks architecture pattern using Maven or combine sources, processors, and sinks through their Spring Stream binding interfaces.

We’ll take a look at both ways to do this.

It’s worth noting that, formerly, all Stream App Starters were collated into one large repo at github.com/spring-cloud/spring-cloud-stream-app-starters. Each Starter has been simplified and isolated.

3. Twitter Credentials

First, let’s set up our Twitter Developer credentials. To get Twitter developer credentials, follow the steps to set up an app and create an access token from the official Twitter developer documentation.

Specifically, we’ll need:

  1. Consumer Key
  2. Consumer Key Secret
  3. Access Token Secret
  4. Access Token

Make sure to keep that window open or jot those down since we’ll be using those below!

4. Installing Hadoop

Next, let’s install Hadoop! We can either follow the official documentation or simply leverage Docker:

$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1

5. Compiling Our App Starters

To use freestanding, fully individual components, we can download and compile desired Spring Cloud Stream App Starters individually from their GitHub repositories.

5.1. Twitter Spring Cloud Stream App Starter

Let’s add the Twitter Spring Cloud Stream App Starter (org.springframework.cloud.stream.app.twitterstream.source) to our project:

git clone https://github.com/spring-cloud-stream-app-starters/twitter.git

Then, we run Maven:

./mvnw clean install -PgenerateApps

The resulting compiled Starter App will be available in ‘/target’ of the local project root.

Then we can run that compiled .jar and pass in the relevant application properties like so:

java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
    --accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>

We can also pass our credentials using the familiar Spring application.properties:

twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...

5.2. HDFS Spring Cloud Stream App Starter

Now (with Hadoop already set up), let’s add the HDFS Spring Cloud Stream App Starter (org.springframework.cloud.stream.app.hdfs.sink) dependency to our project.

First, clone the relevant repo:

git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git

Then, run the Maven job:

./mvnw clean install -PgenerateApps

The resulting compiled Starter App will be available in ‘/target’ of the local project root. We can then run that compiled .jar and pass in relevant application properties:

java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/

hdfs://127.0.0.1:50010/‘ is the default for Hadoop but your default HDFS port may vary depending on how you configured your instance.

We can see the list of data nodes (and their current ports) at ‘http://0.0.0.0:50070‘given the configured we passed in previously.

We can also pass our credentials using the familiar Spring application.properties before compilation – so we don’t have to always pass these in via CLI.

Let’s configure our application.properties to use the default Hadoop port:

hdfs.fs-uri=hdfs://127.0.0.1:50010/

6. Using AggregateApplicationBuilder

Alternatively, we can combine our Spring Stream Source and Sink through the org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder into a simple Spring Boot application!

First, we’ll add the two Stream App Starters to our pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
        <version>1.3.1.BUILD-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
        <version>1.3.1.BUILD-SNAPSHOT</version>
    </dependency>
</dependencies>

Then we’ll begin combining our two Stream App Starter dependencies by wrapping them into their respective sub-applications.

6.1. Building Our App Components

Our SourceApp specifies the Source to be transformed or consumed:

@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
    @InboundChannelAdapter(Source.OUTPUT)
    public String timerMessageSource() {
        return new SimpleDateFormat().format(new Date());
    }
}

Note that we bind our SourceApp to org.springframework.cloud.stream.messaging.Source and inject the appropriate configuration class to pick up the needed settings from our environmental properties.

Next, we set up a simple org.springframework.cloud.stream.messaging.Processor binding:

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        log.info("Payload received!");
        return payload;
    }
}

Then, we create our consumer (Sink):

@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
    @ServiceActivator(inputChannel= Sink.INPUT)
    public void loggerSink(Object payload) {
        log.info("Received: " + payload);
    }
}

Here, we bind our SinkApp to org.springframework.cloud.stream.messaging.Sink and again inject the correct configuration class to use our specified Hadoop settings.

Lastly, we combine our SourceApp, ProcessorApp, and our SinkApp using the AggregateApplicationBuilder in our AggregateApp main method:

@SpringBootApplication
public class AggregateApp {
    public static void main(String[] args) {
        new AggregateApplicationBuilder()
          .from(SourceApp.class).args("--fixedDelay=5000")
          .via(ProcessorApp.class)
          .to(SinkApp.class).args("--debug=true")
          .run(args);
    }
}

As with any Spring Boot application, we can inject specified settings as environmental properties through application.properties or programmatically.

Since we’re using the Spring Stream framework we can also pass our arguments into the AggregateApplicationBuilder constructor.

6.2. Running the Completed App

We can then compile and run our application using the following command line instructions:

    $ mvn install
    $ java -jar twitterhdfs.jar

Remember to keep each @SpringBootApplication class in a separate package (otherwise, several different binding exceptions will be thrown)! For more information about how to use the AggregateApplicationBuilder – have a look at the official docs.

After we compile and run our app we should see something like the following in our console (naturally the contents will vary by Tweet):

2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
c.b.twitterhdfs.processor.ProcessorApp   : Payload received!
2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
com.baeldung.twitterhdfs.sink.SinkApp    : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...

Those demonstrate the correct operation of our Processor and Sink on receiving data from the Source! In this example, we haven’t configured our HDFS Sink to do much – it will simply print the message “Payload received!”

7. Conclusion

In this tutorial, we’ve learned how to combine two awesome Spring Stream App Starters into one sweet Spring Boot example!

Here are some other great official articles on Spring Boot Starters and how to create a customized starter!

As always, the code used in the article can be found over on GitHub.

I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE LESSONS

Leave a Reply

Be the First to Comment!

avatar
  Subscribe  
Notify of