Expand Authors Top

If you have a few years of experience in the Java ecosystem and you’d like to share that with the community, have a look at our Contribution Guidelines.

Expanded Audience – Frontegg – Security (partner)
announcement - icon User management is very complex, when implemented properly. No surprise here.

Not having to roll all of that out manually, but instead integrating a mature, fully-fledged solution - yeah, that makes a lot of sense.
That's basically what Frontegg is - User Management for your application. It's focused on making your app scalable, secure and enjoyable for your users.
From signup to authentication, it supports simple scenarios all the way to complex and custom application logic.

Have a look:

>> Elegant User Management, Tailor-made for B2B SaaS

November Discount Launch 2022 – Top
We’re finally running a Black Friday launch. All Courses are 30% off until end-of-day today:

>> GET ACCESS NOW

NPI – Lightrun – Spring (partner)

We rely on other people’s code in our own work. Every day. It might be the language you’re writing in, the framework you’re building on, or some esoteric piece of software that does one thing so well you never found the need to implement it yourself.

The problem is, of course, when things fall apart in production - debugging the implementation of a 3rd party library you have no intimate knowledge of is, to say the least, tricky. It’s difficult to understand what talks to what and, specifically, which part of the underlying library is at fault.

Lightrun is a new kind of debugger.

It's one geared specifically towards real-life production environments. Using Lightrun, you can drill down into running applications, including 3rd party dependencies, with real-time logs, snapshots, and metrics. No hotfixes, redeployments, or restarts required.

Learn more in this quick, 5-minute Lightrun tutorial:

>> The Essential List of Spring Boot Annotations and Their Use Cases

1. Overview

In our previous introduction to Spring Batch, we introduced the framework as a batch-processing tool. We also explored the configuration details and the implementation for a single-threaded, single process job execution.

To implement a job with some parallel processing, a range of options is provided. At a higher level, there are two modes of parallel processing:

  1. Single-Process, multi-threaded
  2. Multi-Process

In this quick article, we’ll discuss the partitioning of Step, which can be implemented for both single process and multi-process jobs.

2. Partitioning a Step

Spring Batch with partitioning provides us the facility to divide the execution of a Step:

partitioning overview

Partitioning Overview

The above picture shows an implementation of a Job with a partitioned Step.

There's a Step called “Master”, whose execution is divided into some “Slave” steps. These slaves can take the place of a master, and the outcome will still be unchanged. Both master and slave are instances of Step. Slaves can be remote services or just locally executing threads.

If required, we can pass data from the master to the slave. The meta data (i.e. the JobRepository), makes sure that every slave is executed only once in a single execution of the Job.

Here is the sequence diagram showing how it all works:

partitioning spi

Partitioning Step

As shown, the PartitionStep is driving the execution. The PartitionHandler is responsible for splitting the work of “Master” into the “Slaves”. The rightmost Step is the slave.

3. The Maven POM

The Maven dependencies are the same as mentioned in our previous article. That is, Spring Core, Spring Batch and the dependency for the database (in our case, SQLite).

4. Configuration

In our introductory article, we saw an example of converting some financial data from CSV to XML file. Let’s extend the same example.

Here, we’ll convert the financial information from 5 CSV files to corresponding XML files, using a multi-threaded implementation.

We can achieve this using a single Job and Step partitioning. We'll have five threads, one for each of the CSV files.

First of all, let’s create a Job:

@Bean(name = "partitionerJob")
public Job partitionerJob() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return jobs.get("partitioningJob")
      .start(partitionStep())
      .build();
}

As we can see, this Job starts with the PartitioningStep. This is our master step which will be divided into various slave steps:

@Bean
public Step partitionStep() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return steps.get("partitionStep")
      .partitioner("slaveStep", partitioner())
      .step(slaveStep())
      .taskExecutor(taskExecutor())
      .build();
}

Here, we’ll create the PartitioningStep using the StepBuilderFactory. For that, we need to give the information about the SlaveSteps and the Partitioner.

The Partitioner is an interface which provides the facility to define a set of input values for each of the slaves. In other words, logic to divide tasks into respective threads goes here.

Let’s create an implementation of it, called CustomMultiResourcePartitioner, where we’ll put the input and output file names in the ExecutionContext to pass on to every slave step:

public class CustomMultiResourcePartitioner implements Partitioner {
 
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> map = new HashMap<>(gridSize);
        int i = 0, k = 1;
        for (Resource resource : resources) {
            ExecutionContext context = new ExecutionContext();
            Assert.state(resource.exists(), "Resource does not exist: " 
              + resource);
            context.putString(keyName, resource.getFilename());
            context.putString("opFileName", "output"+k+++".xml");
            map.put(PARTITION_KEY + i, context);
            i++;
        }
        return map;
    }
}

We’ll also create the bean for this class, where we’ll give the source directory for input files:

@Bean
public CustomMultiResourcePartitioner partitioner() {
    CustomMultiResourcePartitioner partitioner 
      = new CustomMultiResourcePartitioner();
    Resource[] resources;
    try {
        resources = resoursePatternResolver
          .getResources("file:src/main/resources/input/*.csv");
    } catch (IOException e) {
        throw new RuntimeException("I/O problems when resolving"
          + " the input file pattern.", e);
    }
    partitioner.setResources(resources);
    return partitioner;
}

We will define the slave step, just like any other step with the reader and the writer. The reader and writer will be same as we saw in our introductory example, except they will receive filename parameter from the StepExecutionContext.

Note that these beans need to be step scoped so that they will be able to receive the stepExecutionContext params, at every step. If they would not be step scoped, their beans will be created initially, and won't accept the filenames at step level:

@StepScope
@Bean
public FlatFileItemReader<Transaction> itemReader(
  @Value("#{stepExecutionContext[fileName]}") String filename)
  throws UnexpectedInputException, ParseException {
 
    FlatFileItemReader<Transaction> reader 
      = new FlatFileItemReader<>();
    DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
    String[] tokens 
      = {"username", "userid", "transactiondate", "amount"};
    tokenizer.setNames(tokens);
    reader.setResource(new ClassPathResource("input/" + filename));
    DefaultLineMapper<Transaction> lineMapper 
      = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(tokenizer);
    lineMapper.setFieldSetMapper(new RecordFieldSetMapper());
    reader.setLinesToSkip(1);
    reader.setLineMapper(lineMapper);
    return reader;
}
@Bean
@StepScope
public ItemWriter<Transaction> itemWriter(Marshaller marshaller, 
  @Value("#{stepExecutionContext[opFileName]}") String filename)
  throws MalformedURLException {
    StaxEventItemWriter<Transaction> itemWriter 
      = new StaxEventItemWriter<Transaction>();
    itemWriter.setMarshaller(marshaller);
    itemWriter.setRootTagName("transactionRecord");
    itemWriter.setResource(new ClassPathResource("xml/" + filename));
    return itemWriter;
}

While mentioning the reader and writer in the slave step, we can pass the arguments as null, because these filenames will not be used, as they will receive the filenames from stepExecutionContext:

@Bean
public Step slaveStep() 
  throws UnexpectedInputException, MalformedURLException, ParseException {
    return steps.get("slaveStep").<Transaction, Transaction>chunk(1)
      .reader(itemReader(null))
      .writer(itemWriter(marshaller(), null))
      .build();
}

5. Conclusion

In this tutorial, we discussed how to implement a job with parallel processing using Spring Batch.

As always, the complete implementation for this example is available over on GitHub.

November Discount Launch 2022 – Bottom
We’re finally running a Black Friday launch. All Courses are 30% off until end-of-day today:

>> GET ACCESS NOW

Generic footer banner
Comments are closed on this article!