Generic Top

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we'll introduce Apache Beam and explore its fundamental concepts.

We'll start by demonstrating the use case and benefits of using Apache Beam, and then we'll cover foundational concepts and terminologies. Afterward, we'll walk through a simple example that illustrates all the important aspects of Apache Beam.

2. What is Apache Beam?

Apache Beam (Batch + strEAM) is a unified programming model for batch and streaming data processing jobs. It provides a software development kit to define and construct data processing pipelines as well as runners to execute them.

Apache Beam is designed to provide a portable programming layer. In fact, the Beam Pipeline Runners translate the data processing pipeline into the API compatible with the backend of the user's choice. Currently, these distributed processing backends are supported:

  • Apache Apex
  • Apache Flink
  • Apache Gearpump (incubating)
  • Apache Samza
  • Apache Spark
  • Google Cloud Dataflow
  • Hazelcast Jet

3. Why Apache Beam?

Apache Beam fuses batch and streaming data processing, while others often do so via separate APIs. Consequently, it's very easy to change a streaming process to a batch process and vice versa, say, as requirements change.

Apache Beam raises portability and flexibility. We focus on our logic rather than the underlying details. Moreover, we can change the data processing backend at any time.

There are Java, Python, Go, and Scala SDKs available for Apache Beam. Indeed, everybody on the team can use it with their language of choice.

4. Fundamental Concepts

With Apache Beam, we can construct workflow graphs (pipelines) and execute them. The key concepts in the programming model are:

  • PCollection – represents a data set which can be a fixed batch or a stream of data
  • PTransform – a data processing operation that takes one or more PCollections and outputs zero or more PCollections
  • Pipeline – represents a directed acyclic graph of PCollection and PTransform, and hence, encapsulates the entire data processing job
  • PipelineRunner – executes a Pipeline on a specified distributed processing backend

Simply put, a PipelineRunner executes a Pipeline, and a Pipeline consists of PCollection and PTransform.

5. Word Count Example

Now that we've learned the basic concepts of Apache Beam, let's design and test a word count task.

5.1. Constructing a Beam Pipeline

Designing the workflow graph is the first step in every Apache Beam job. Let's define the steps of a word count task:

  1. Read the text from a source.
  2. Split the text into a list of words.
  3. Lowercase all words.
  4. Trim punctuations.
  5. Filter stopwords.
  6. Count each unique word.

To achieve this, we'll need to convert the above steps into a single Pipeline using PCollection and PTransform abstractions.

5.2. Dependencies

Before we can implement our workflow graph, we should add Apache Beam's core dependency to our project:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>${beam.version}</version>
</dependency>

Beam Pipeline Runners rely on a distributed processing backend to perform tasks. Let's add DirectRunner as a runtime dependency:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>${beam.version}</version>
    <scope>runtime</scope>
</dependency>

Unlike other Pipeline Runners, DirectRunner doesn't need any additional setup, which makes it a good choice for starters.

5.3. Implementation

Apache Beam utilizes the Map-Reduce programming paradigm (same as Java Streams). In fact, it's a good idea to have a basic concept of reduce(), filter(), count(), map(), and flatMap() before we continue.

Creating a Pipeline is the first thing we do:

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

Now we apply our six-step word count task:

PCollection<KV<String, Long>> wordCount = p
    .apply("(1) Read all lines", 
      TextIO.read().from(inputFilePath))
    .apply("(2) Flatmap to a list of words", 
      FlatMapElements.into(TypeDescriptors.strings())
      .via(line -> Arrays.asList(line.split("\\s"))))
    .apply("(3) Lowercase all", 
      MapElements.into(TypeDescriptors.strings())
      .via(word -> word.toLowerCase()))
    .apply("(4) Trim punctuations", 
      MapElements.into(TypeDescriptors.strings())
      .via(word -> trim(word)))
    .apply("(5) Filter stopwords", 
      Filter.by(word -> !isStopWord(word)))
    .apply("(6) Count words", 
      Count.perElement());

The first (optional) argument of apply() is a String that is only for better readability of the code. Here is what each apply() does in the above code:

  1. First, we read an input text file line by line using TextIO.
  2. Splitting each line by whitespaces, we flat-map it to a list of words.
  3. Word count is case-insensitive, so we lowercase all words.
  4. Earlier, we split lines by whitespace, ending up with words like “word!” and “word?”, so we remove punctuations.
  5. Stopwords such as “is” and “by” are frequent in almost every English text, so we remove them.
  6. Finally, we count unique words using the built-in function Count.perElement().

As mentioned earlier, pipelines are processed on a distributed backend. It's not possible to iterate over a PCollection in-memory since it's distributed across multiple backends. Instead, we write the results to an external database or file.

First, we convert our PCollection to String. Then, we use TextIO to write the output:

wordCount.apply(MapElements.into(TypeDescriptors.strings())
    .via(count -> count.getKey() + " --> " + count.getValue()))
    .apply(TextIO.write().to(outputFilePath));

Now that our Pipeline definition is complete, we can run and test it.

5.4. Running and Testing

So far, we've defined a Pipeline for the word count task. At this point, let's run the Pipeline:

p.run().waitUntilFinish();

On this line of code, Apache Beam will send our task to multiple DirectRunner instances. Consequently, several output files will be generated at the end. They'll contain things like:

...
apache --> 3
beam --> 5
rocks --> 2
...

Defining and running a distributed job in Apache Beam is as simple and expressive as this. For comparison, word count implementation is also available on Apache Spark, Apache Flink, and Hazelcast Jet.

6. Where Do We Go From Here?

We successfully counted each word from our input file, but we don't have a report of the most frequent words yet. Certainly, sorting a PCollection is a good problem to solve as our next step.

Later, we can learn more about Windowing, Triggers, Metrics, and more sophisticated Transforms. Apache Beam Documentation provides in-depth information and reference material.

7. Conclusion

In this tutorial, we learned what Apache Beam is and why it's preferred over alternatives. We also demonstrated basic concepts of Apache Beam with a word count example.

The code for this tutorial is available over on GitHub.

Generic bottom

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE
Comments are closed on this article!