Java Top

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE

If you have a few years of experience in the Java ecosystem, and you're interested in sharing that experience with the community (and getting paid for your work of course), have a look at the "Write for Us" page. Cheers. Eugen

1. Introduction

Spring Cloud Data Flow is a toolkit for building data integration and real-time data processing pipelines.  

Pipelines, in this case, are Spring Boot applications that are built with the use of Spring Cloud Stream or Spring Cloud Task frameworks.

In this tutorial, we’ll show how to use Spring Cloud Data Flow with Apache Spark.

2. Data Flow Local Server

First, we need to run the Data Flow Server to be able to deploy our jobs.

To run the Data Flow Server locally, we need to create a new project with the spring-cloud-starter-dataflow-server-local dependency:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

After that, we need to annotate the main class in the server with @EnableDataFlowServer:

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}

Once we run this application, we’ll have a local Data Flow server on port 9393.

3. Creating a Project

We’ll create a Spark Job as a standalone local application so that we won’t need any cluster to run it.

3.1. Dependencies

First, we’ll add the Spark dependency:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.4.0</version>
</dependency>

3.2. Creating a Job

And for our job, let’s approximate pi:

public class PiApproximation {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation");
        JavaSparkContext context = new JavaSparkContext(conf);
        int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;
        int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;

        List<Integer> xs = IntStream.rangeClosed(0, n)
          .mapToObj(element -> Integer.valueOf(element))
          .collect(Collectors.toList());

        JavaRDD<Integer> dataSet = context.parallelize(xs, slices);

        JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {
           double x = Math.random() * 2 - 1;
           double y = Math.random() * 2 - 1;
           return (x * x + y * y ) < 1 ? 1: 0;
        });

        int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);

        System.out.println("The pi was estimated as:" + count / n);

        context.stop();
    }
}

4. Data Flow Shell

Data Flow Shell is an application that’ll enable us to interact with the server. Shell uses the DSL commands to describe data flows.

To use the Data Flow Shell we need to create a project that’ll allow us to run it. First, we need the spring-cloud-dataflow-shell dependency:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

After adding the dependency, we can create the class that’ll run our Data Flow shell:

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
     
    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

5. Deploying the project

To deploy our project, we’ll use the so-called task runner that is available for Apache Spark in three versions: cluster, yarn, and client. We’re going to proceed with the local client version.

The task runner is what runs our Spark job.

To do that, we first need to register our task using Data Flow Shell:

app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT

The task allows us to specify multiple different parameters some of them are optional, but some of the parameters are necessary to deploy the Spark job properly:

  • spark.app-class, the main class of our submitted job
  • spark.app-jar, a path to the fat jar containing our job
  • spark.app-name, the name that’ll be used for our job
  • spark.app-args, the arguments that’ll be passed to the job

We can use the registered task spark-client to submit our job, remembering to provide the required parameters:

task create spark1 --definition "spark-client \
  --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \
  --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"

Note that spark.app-jar is the path to the fat-jar with our job.

After successful creation of the task, we can proceed to run it with the following command:

task launch spark1

This will invoke the execution of our task.

6. Summary

In this tutorial, we have shown how to use the Spring Cloud Data Flow framework to process data with Apache Spark. More information on the Spring Cloud Data Flow framework can be found in the documentation.

All code samples can be found on GitHub.

Java bottom

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE