Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

This tutorial will be an introduction to Apache Storm, a distributed real-time computation system.

We’ll focus on and cover:

  • What exactly is Apache Storm and what problems it solves
  • Its architecture, and
  • How to use it in a project

2. What Is Apache Storm?

Apache Storm is free and open source distributed system for real-time computations.

It provides fault-tolerance, scalability, and guarantees data processing, and is especially good at processing unbounded streams of data. 

Some good use cases for Storm can be processing credit card operations for fraud detection or processing data from smart homes to detect faulty sensors.

Storm allows integration with various databases and queuing systems available on the market.

3. Maven Dependency

Before we use Apache Storm, we need to include the storm-core dependency in our project:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.2</version>
    <scope>provided</scope>
</dependency>

We should only use the provided scope if we intend to run our application on the Storm cluster.

To run the application locally, we can use a so-called local mode that will simulate the Storm cluster in a local process, in such case we should remove the provided.

4. Data Model

Apache Storm’s data model consists of two elements: tuples and streams.

4.1. Tuple

Tuple is an ordered list of named fields with dynamic types. This means that we don’t need to explicitly declare the types of the fields.

Storm needs to know how to serialize all values that are used in a tuple. By default, it can already serialize primitive types, Strings and byte arrays.

And since Storm uses Kryo serialization, we need to register the serializer using Config to use the custom types. We can do this in one of two ways:

First, we can register the class to serialize using its full name:

Config config = new Config();
config.registerSerialization(User.class);

In such a case, Kryo will serialize the class using FieldSerializerBy default, this will serialize all non-transient fields of the class, both private and public.

Or instead, we can provide both the class to serialize and the serializer we want Storm to use for that class:

Config config = new Config();
config.registerSerialization(User.class, UserSerializer.class);

To create the custom serializer, we need to extend the generic class Serializer that has two methods write and read.

4.2. Stream

Stream is the core abstraction in the Storm ecosystem. The Stream is an unbounded sequence of tuples.

Storms allows processing multiple streams in parallel.

Every stream has an id that is provided and assigned during declaration.

5. Topology

The logic of the real-time Storm application is packaged into the topology. The topology consists of spouts and bolts.

5.1. Spout

Spouts are the sources of the streams. They emit tuples to the topology.

Tuples can be read from various external systems like Kafka, Kestrel or ActiveMQ.

Spouts can be reliable or unreliable. Reliable means that the spout can reply that the tuple that has failed to be processed by Storm. Unreliable means that the spout doesn’t reply since it is going to use a fire-and-forget mechanism to emit the tuples.

To create the custom spout, we need to implement the IRichSpout interface or extend any class that already implements the interface, for example, an abstract BaseRichSpout class.

Let’s create an unreliable spout:

public class RandomIntSpout extends BaseRichSpout {

    private Random random;
    private SpoutOutputCollector outputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext,
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        outputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        outputCollector.emit(new Values(random.nextInt(), System.currentTimeMillis()));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("randomInt", "timestamp"));
    }
}

Our custom RandomIntSpout will generate random integer and timestamp every second.

5.2. Bolt

Bolts process tuples in the stream. They can perform various operations like filtering, aggregations or custom functions.

Some operations require multiple steps, and thus we will need to use multiple bolts in such cases.

To create the custom Bolt, we need to implement IRichBolt or for simpler operations IBasicBolt interface.

There are also multiple helper classes available for implementing Bolt. In this case, we’ll use BaseBasicBolt:

public class PrintingBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        System.out.println(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }
}

This custom PrintingBolt will simply print all tuples to the console.

6. Creating a Simple Topology

Let’s put these ideas together into a simple topology. Our topology will have one spout and three bolts.

6.1. RandomNumberSpout

In the beginning, we’ll create an unreliable spout. It will generate random integers from the range (0,100) every second:

public class RandomNumberSpout extends BaseRichSpout {
    private Random random;
    private SpoutOutputCollector collector;

    @Override
    public void open(Map map, TopologyContext topologyContext, 
      SpoutOutputCollector spoutOutputCollector) {
        random = new Random();
        collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        int operation = random.nextInt(101);
        long timestamp = System.currentTimeMillis();

        Values values = new Values(operation, timestamp);
        collector.emit(values);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.2. FilteringBolt

Next, we’ll create a bolt that will filter out all elements with operation equal to 0:

public class FilteringBolt extends BaseBasicBolt {
    @Override
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        int operation = tuple.getIntegerByField("operation");
        if (operation > 0) {
            basicOutputCollector.emit(tuple.getValues());
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("operation", "timestamp"));
    }
}

6.3. AggregatingBolt

Next, let’s create a more complicated Bolt that will aggregate all positive operations from each day.

For this purpose, we’ll use a specific class created especially for implementing bolts that operate on windows instead of operating on single tuples: BaseWindowedBolt.

Windows are an essential concept in stream processing, splitting the infinite streams into finite chunks. We can then apply computations to each chunk. There are generally two types of windows:

Time windows are used to group elements from a given time period using timestamps. Time windows may have a different number of elements.

Count windows are used to create windows with a defined size. In such a case, all windows will have the same size and the window will not be emitted if there are fewer elements than the defined size.

Our AggregatingBolt will generate the sum of all positive operations from a time window along with its beginning and end timestamps:

public class AggregatingBolt extends BaseWindowedBolt {
    private OutputCollector outputCollector;
    
    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.outputCollector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sumOfOperations", "beginningTimestamp", "endTimestamp"));
    }

    @Override
    public void execute(TupleWindow tupleWindow) {
        List<Tuple> tuples = tupleWindow.get();
        tuples.sort(Comparator.comparing(this::getTimestamp));

        int sumOfOperations = tuples.stream()
          .mapToInt(tuple -> tuple.getIntegerByField("operation"))
          .sum();
        Long beginningTimestamp = getTimestamp(tuples.get(0));
        Long endTimestamp = getTimestamp(tuples.get(tuples.size() - 1));

        Values values = new Values(sumOfOperations, beginningTimestamp, endTimestamp);
        outputCollector.emit(values);
    }

    private Long getTimestamp(Tuple tuple) {
        return tuple.getLongByField("timestamp");
    }
}

Note that, in this case, getting the first element of the list directly is safe. That’s because each window is calculated using the timestamp field of the Tuple, so there has to be at least one element in each window.

6.4. FileWritingBolt

Finally, we’ll create a bolt that will take all elements with sumOfOperations greater than 2000, serialize them and write them to the file:

public class FileWritingBolt extends BaseRichBolt {
    public static Logger logger = LoggerFactory.getLogger(FileWritingBolt.class);
    private BufferedWriter writer;
    private String filePath;
    private ObjectMapper objectMapper;

    @Override
    public void cleanup() {
        try {
            writer.close();
        } catch (IOException e) {
            logger.error("Failed to close writer!");
        }
    }

    @Override
    public void prepare(Map map, TopologyContext topologyContext, 
      OutputCollector outputCollector) {
        objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
        
        try {
            writer = new BufferedWriter(new FileWriter(filePath));
        } catch (IOException e) {
            logger.error("Failed to open a file for writing.", e);
        }
    }

    @Override
    public void execute(Tuple tuple) {
        int sumOfOperations = tuple.getIntegerByField("sumOfOperations");
        long beginningTimestamp = tuple.getLongByField("beginningTimestamp");
        long endTimestamp = tuple.getLongByField("endTimestamp");

        if (sumOfOperations > 2000) {
            AggregatedWindow aggregatedWindow = new AggregatedWindow(
                sumOfOperations, beginningTimestamp, endTimestamp);
            try {
                writer.write(objectMapper.writeValueAsString(aggregatedWindow));
                writer.newLine();
                writer.flush();
            } catch (IOException e) {
                logger.error("Failed to write data to file.", e);
            }
        }
    }
    
    // public constructor and other methods
}

Note that we don’t need to declare the output as this will be the last bolt in our topology

6.5. Running the Topology

Finally, we can pull everything together and run our topology:

public static void runTopology() {
    TopologyBuilder builder = new TopologyBuilder();

    Spout random = new RandomNumberSpout();
    builder.setSpout("randomNumberSpout");

    Bolt filtering = new FilteringBolt();
    builder.setBolt("filteringBolt", filtering)
      .shuffleGrouping("randomNumberSpout");

    Bolt aggregating = new AggregatingBolt()
      .withTimestampField("timestamp")
      .withLag(BaseWindowedBolt.Duration.seconds(1))
      .withWindow(BaseWindowedBolt.Duration.seconds(5));
    builder.setBolt("aggregatingBolt", aggregating)
      .shuffleGrouping("filteringBolt"); 
      
    String filePath = "./src/main/resources/data.txt";
    Bolt file = new FileWritingBolt(filePath);
    builder.setBolt("fileBolt", file)
      .shuffleGrouping("aggregatingBolt");

    Config config = new Config();
    config.setDebug(false);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Test", config, builder.createTopology());
}

To make the data flow through each piece in the topology, we need to indicate how to connect them. shuffleGroup allows us to state that data for filteringBolt will be coming from randomNumberSpout.

For each Bolt, we need to add shuffleGroup which defines the source of elements for this bolt.  The source of elements may be a Spout or another Bolt. And if we set the same source for more than one boltthe source will emit all elements to each of them.

In this case, our topology will use the LocalCluster to run the job locally.

7. Conclusion

In this tutorial, we introduced Apache Storm, a distributed real-time computation system. We created a spout, some bolts, and pulled them together into a complete topology.

And, as always, all the code samples can be found over on GitHub.

 

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.