Comprehensive Guide to Spring Batch Processing Part 2

Comprehensive Guide to Spring Batch Processing Part 2

Introduction to Spring Batch Partitioning

When dealing with large-scale batch processing, performance and scalability become critical considerations. Processing a single large dataset sequentially can be time-consuming and inefficient, especially in scenarios where faster results are required. This is where Spring Batch Partitioning comes into play.

Partitioning is a technique in Spring Batch that allows a single job to be divided into smaller chunks (partitions) that can be processed in parallel. By distributing the workload across multiple threads or even multiple machines, partitioning not only improves performance but also ensures the efficient utilization of computational resources.

In the previous article, we introduced Spring Batch and discussed its core concepts and architecture. While Spring Batch provides a robust foundation for batch processing, it processes data sequentially by default. For tasks that involve large datasets, sequential processing can become a bottleneck. To tackle this, Spring Batch offers built-in support for partitioning, which allows you to split the workload and process each partition concurrently.

This article is dedicated to exploring Spring Batch Partitioning in depth. We’ll cover:

  1. Why Partitioning? — The benefits of partitioning and the scenarios where it becomes indispensable.
  2. How Partitioning Works — The architecture of Spring Batch Partitioning, including the role of the Master and Slave steps.
  3. Implementing Partitioning — A step-by-step guide to implementing partitioning in a Spring Batch job, with practical examples.
  4. Best Practices — Tips and considerations for optimizing partitioned jobs, including handling failures and resource management.

Partitioning is particularly useful for tasks like the one I encountered recently, where I needed to process a large schedule to determine payment eligibility and call a repayment method. Using partitioning, I was able to divide the workload into smaller chunks and process them in parallel, significantly improving the overall efficiency of the system.

By the end of this article, you’ll have a solid understanding of how to implement partitioning in Spring Batch and how it can help you build scalable and performant batch processing solutions.

Let’s dive into the world of Spring Batch Partitioning!

Spring Batch Partitioning is a mechanism used to divide large datasets into smaller partitions and process them concurrently. Partitioning improves performance and scalability by leveraging parallel processing, either locally (using threads) or remotely (using distributed systems).

This guide walks you through a complete example of Spring Batch Partitioning using a TaskExecutorPartitionHandler. It includes all components, a REST API to trigger the job, and database interactions.

What Will We Build?

We will create a Spring Batch application that:

  1. Reads data from a person table.
  2. Processes the data in parallel partitions by dividing it into id ranges.
  3. Writes the processed data to a processed_person table.
  4. Allows the job to be triggered via a REST API.

Core Concepts in Spring Batch Partitioning

1. Master Step

The Master Step:

  • Divides the dataset into logical partitions.
  • Assigns each partition to a Worker Step for processing.
  • Uses a PartitionHandler to coordinate the worker steps.

2. Worker Step

The Worker Step handles:

  • Reading data for a specific partition.
  • Processing the data.
  • Writing the processed data.

3. PartitionHandler

The PartitionHandler executes worker steps. It can:

  • Run locally using threads (e.g., TaskExecutorPartitionHandler).
  • Run remotely using messaging systems (e.g., MessageChannelPartitionHandler).

4. Partitioner

The Partitioner divides the dataset into partitions based on logic (e.g., ranges of IDs).

Complete Partitioning Example

1. Project Setup

Dependencies

Add the following dependencies in your pom.xml:

<dependencies>
    <!-- Spring Batch -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <!-- Spring Data JPA -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <!-- H2 Database -->
    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <scope>runtime</scope>
    </dependency>
</dependencies>        

2. Database Schema

data.sql

INSERT INTO person (id, first_name, last_name) VALUES
(1, 'John', 'Doe'),
(2, 'Jane', 'Smith'),
(3, 'James', 'Brown'),
(4, 'Emily', 'Davis'),
(5, 'Michael', 'Wilson');        

3. Model Classes

Person.java

package kia.example.springbatch.model;

import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;

@Entity
public class Person {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id ;
    private String firstName ;
    private String lastName ;

    public Person(Long id, String firstName, String lastName) {
        this.id = id;
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public Person() {

    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}        

ProcessedPerson.java

package kia.example.springbatch.model;

import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;

@Entity
public class PersonAfterProcess {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id ;
    private String name ;

    public PersonAfterProcess() {
    }

    public PersonAfterProcess(String name) {
        this.name = name;
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}        

4. Partitioner

ColumnRangePartitioner.java

The partitioner divides the dataset into partitions based on id ranges.

package kia.example.springbatch.partitioning;

import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import java.util.HashMap;
import java.util.Map;
public class ColumnRangePartitioner implements Partitioner {
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        Map<String, ExecutionContext> partitions = new HashMap<>();
        int min = 1; // Minimum ID (can also be fetched from the database)
        int max = 5; // Maximum ID (can also be fetched from the database)
        int range = (max - min) / gridSize;
        int start = min;
        int end = start + range;
        for (int i = 0; i < gridSize; i++) {
            ExecutionContext context = new ExecutionContext();
            context.putInt("start", start);
            context.putInt("end", end);
            partitions.put("partition" + i, context);
            start = end + 1;
            end = (i == gridSize - 2) ? max : end + range;
        }
        return partitions;
    }
}        

5. Processor

PersonProcessor.java

package com.example.batch.processor;import com.example.batch.model.Person;
import com.example.batch.model.ProcessedPerson;
import org.springframework.batch.item.ItemProcessor;
public class PersonProcessor implements ItemProcessor<Person, ProcessedPerson> {
 @Override
    public ProcessedPerson process(Person person) {
        ProcessedPerson processedPerson = new ProcessedPerson();
        processedPerson.setFullName(person.getFirstName() + " " + person.getLastName());
        return processedPerson;
    }
}        

6. Batch Configuration

BatchConfig.java

This configuration sets up the partitioning process.

package kia.example.springbatch.partitioning;



import kia.example.springbatch.model.Person;
import kia.example.springbatch.model.PersonAfterProcess;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import jakarta.persistence.EntityManagerFactory;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.Map;

@Configuration
@EnableBatchProcessing
public class BatchConfigPartitioning {
    private final EntityManagerFactory entityManagerFactory;
    private final ItemProcessor personProcessor;


    public BatchConfigPartitioning(EntityManagerFactory entityManagerFactory,
                       ItemProcessor personProcessor) {
        this.entityManagerFactory = entityManagerFactory;
        this.personProcessor = personProcessor;
    }


    @Bean
    @StepScope
    public JpaPagingItemReader<Person> readerPartitioner(
            @Value("#{stepExecutionContext['start']}") Integer start,
            @Value("#{stepExecutionContext['end']}") Integer end) {
        return new JpaPagingItemReaderBuilder<Person>()
                .name("personItemReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("SELECT p FROM Person p WHERE p.id BETWEEN :start AND :end")
                .parameterValues(Map.of("start", start, "end", end)) // Pass dynamic parameters
                .pageSize(3)
                .build();
    }


    @Bean
    public JpaItemWriter<PersonAfterProcess> writerPartitioner() {
        JpaItemWriter<PersonAfterProcess> writer = new JpaItemWriter<>();
        writer.setEntityManagerFactory(entityManagerFactory);
        return writer;
    }




    @Bean
    @Qualifier("workerStep")
    public Step workerStep(JobRepository jobRepository, PlatformTransactionManager transactionManager ,JpaPagingItemReader readerPartitioner ) {
        return new StepBuilder("workerStep" ,jobRepository )
                .<Person, PersonAfterProcess>chunk(2 , transactionManager)
                .reader(readerPartitioner)
                .processor(personProcessor)
                .writer(writerPartitioner())
                .build();
    }
    @Bean
    public PartitionHandler partitionHandler(Step workerStep) {
        TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
        handler.setTaskExecutor(new SimpleAsyncTaskExecutor()); // Parallel execution
        handler.setStep(workerStep); // Worker step
        handler.setGridSize(3); // Number of partitions
        return handler;
    }

    @Bean
    public Step masterStep( JobRepository jobRepository ,@Qualifier("workerStep") Step stepWorker, ColumnRangePartitioner columnRangePartitioner) {
        return new StepBuilder("masterStep" , jobRepository)
                .partitioner("workerStep", columnRangePartitioner)
                .partitionHandler(partitionHandler(stepWorker))
                .build();
    }

    @Bean
    public Job partitionedJob(Step masterStep , JobRepository jobRepository) {
        return new JobBuilder("partitionedJob" , jobRepository)
                .start(masterStep)
                .build();
    }
}        

7. REST Controller

JobController.java

package com.example.batch.controller;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobLauncher;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/jobs")
public class JobController {
    private final JobLauncher jobLauncher;
    private final Job partitionedJob;
    public JobController(JobLauncher jobLauncher, Job partitionedJob) {
        this.jobLauncher = jobLauncher;
        this.partitionedJob = partitionedJob;
    }
    @GetMapping("/run")
    public ResponseEntity<String> runPartitionedJob() {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                    .addLong("timestamp", System.currentTimeMillis()) // Unique parameter
                    .toJobParameters();
            JobExecution execution = jobLauncher.run(partitionedJob, jobParameters);
            return ResponseEntity.ok("Job started successfully with status: " + execution.getStatus());
        } catch (Exception e) {
            return ResponseEntity.status(500).body("Failed to start job: " + e.getMessage());
        }
    }        

8. Running the Application

  1. Start the application:

  • mvn spring-boot:run

2. Trigger the job via API:

  • curl -X GET http://localhost:8080/api/jobs/run

3. Verify the processed data:

  • http://localhost:8080/h2-console

Conclusion

In this complete example, we:

  1. Configured a TaskExecutorPartitionHandler for local partitioning using threads.
  2. Implemented a Partitioner to divide the dataset into partitions.
  3. Integrated a REST API to dynamically trigger the job.

This approach improves performance and scalability for large datasets. For distributed environments, you can replace the TaskExecutorPartitionHandler with a MessageChannelPartitionHandler. Let me know if you have any questions!

Links :

Git : https://github.com/KiaShamaei/springBatchComprehensive

Part 1 : https://medium.com/@kiarash.shamaii/comprehensive-guide-to-spring-batch-processing-part-1-227046c03027

Part 2 :https://medium.com/@kiarash.shamaii/comprehensive-guide-to-spring-batch-processing-part-2-fb599fdcb119

Part3 : https://medium.com/@kiarash.shamaii/comprehensive-guide-to-spring-batch-processing-part-3-65e37ef04ec2

Linkedin Links :

Part1 , Part3

To view or add a comment, sign in

More articles by Kiarash Shamaei

Others also viewed

Explore content categories