Comprehensive Guide to Spring Batch Processing Part 2
Introduction to Spring Batch Partitioning
When dealing with large-scale batch processing, performance and scalability become critical considerations. Processing a single large dataset sequentially can be time-consuming and inefficient, especially in scenarios where faster results are required. This is where Spring Batch Partitioning comes into play.
Partitioning is a technique in Spring Batch that allows a single job to be divided into smaller chunks (partitions) that can be processed in parallel. By distributing the workload across multiple threads or even multiple machines, partitioning not only improves performance but also ensures the efficient utilization of computational resources.
In the previous article, we introduced Spring Batch and discussed its core concepts and architecture. While Spring Batch provides a robust foundation for batch processing, it processes data sequentially by default. For tasks that involve large datasets, sequential processing can become a bottleneck. To tackle this, Spring Batch offers built-in support for partitioning, which allows you to split the workload and process each partition concurrently.
This article is dedicated to exploring Spring Batch Partitioning in depth. We’ll cover:
Partitioning is particularly useful for tasks like the one I encountered recently, where I needed to process a large schedule to determine payment eligibility and call a repayment method. Using partitioning, I was able to divide the workload into smaller chunks and process them in parallel, significantly improving the overall efficiency of the system.
By the end of this article, you’ll have a solid understanding of how to implement partitioning in Spring Batch and how it can help you build scalable and performant batch processing solutions.
Let’s dive into the world of Spring Batch Partitioning!
Spring Batch Partitioning is a mechanism used to divide large datasets into smaller partitions and process them concurrently. Partitioning improves performance and scalability by leveraging parallel processing, either locally (using threads) or remotely (using distributed systems).
This guide walks you through a complete example of Spring Batch Partitioning using a TaskExecutorPartitionHandler. It includes all components, a REST API to trigger the job, and database interactions.
What Will We Build?
We will create a Spring Batch application that:
Core Concepts in Spring Batch Partitioning
1. Master Step
The Master Step:
2. Worker Step
The Worker Step handles:
3. PartitionHandler
The PartitionHandler executes worker steps. It can:
4. Partitioner
The Partitioner divides the dataset into partitions based on logic (e.g., ranges of IDs).
Complete Partitioning Example
1. Project Setup
Dependencies
Add the following dependencies in your pom.xml:
<dependencies>
<!-- Spring Batch -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- Spring Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- H2 Database -->
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
2. Database Schema
data.sql
INSERT INTO person (id, first_name, last_name) VALUES
(1, 'John', 'Doe'),
(2, 'Jane', 'Smith'),
(3, 'James', 'Brown'),
(4, 'Emily', 'Davis'),
(5, 'Michael', 'Wilson');
3. Model Classes
Recommended by LinkedIn
Person.java
package kia.example.springbatch.model;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
@Entity
public class Person {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id ;
private String firstName ;
private String lastName ;
public Person(Long id, String firstName, String lastName) {
this.id = id;
this.firstName = firstName;
this.lastName = lastName;
}
public Person() {
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
}
ProcessedPerson.java
package kia.example.springbatch.model;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
@Entity
public class PersonAfterProcess {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id ;
private String name ;
public PersonAfterProcess() {
}
public PersonAfterProcess(String name) {
this.name = name;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
4. Partitioner
ColumnRangePartitioner.java
The partitioner divides the dataset into partitions based on id ranges.
package kia.example.springbatch.partitioning;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import java.util.HashMap;
import java.util.Map;
public class ColumnRangePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> partitions = new HashMap<>();
int min = 1; // Minimum ID (can also be fetched from the database)
int max = 5; // Maximum ID (can also be fetched from the database)
int range = (max - min) / gridSize;
int start = min;
int end = start + range;
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("start", start);
context.putInt("end", end);
partitions.put("partition" + i, context);
start = end + 1;
end = (i == gridSize - 2) ? max : end + range;
}
return partitions;
}
}
5. Processor
PersonProcessor.java
package com.example.batch.processor;import com.example.batch.model.Person;
import com.example.batch.model.ProcessedPerson;
import org.springframework.batch.item.ItemProcessor;
public class PersonProcessor implements ItemProcessor<Person, ProcessedPerson> {
@Override
public ProcessedPerson process(Person person) {
ProcessedPerson processedPerson = new ProcessedPerson();
processedPerson.setFullName(person.getFirstName() + " " + person.getLastName());
return processedPerson;
}
}
6. Batch Configuration
BatchConfig.java
This configuration sets up the partitioning process.
package kia.example.springbatch.partitioning;
import kia.example.springbatch.model.Person;
import kia.example.springbatch.model.PersonAfterProcess;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.partition.PartitionHandler;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JpaItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import jakarta.persistence.EntityManagerFactory;
import org.springframework.transaction.PlatformTransactionManager;
import java.util.Map;
@Configuration
@EnableBatchProcessing
public class BatchConfigPartitioning {
private final EntityManagerFactory entityManagerFactory;
private final ItemProcessor personProcessor;
public BatchConfigPartitioning(EntityManagerFactory entityManagerFactory,
ItemProcessor personProcessor) {
this.entityManagerFactory = entityManagerFactory;
this.personProcessor = personProcessor;
}
@Bean
@StepScope
public JpaPagingItemReader<Person> readerPartitioner(
@Value("#{stepExecutionContext['start']}") Integer start,
@Value("#{stepExecutionContext['end']}") Integer end) {
return new JpaPagingItemReaderBuilder<Person>()
.name("personItemReader")
.entityManagerFactory(entityManagerFactory)
.queryString("SELECT p FROM Person p WHERE p.id BETWEEN :start AND :end")
.parameterValues(Map.of("start", start, "end", end)) // Pass dynamic parameters
.pageSize(3)
.build();
}
@Bean
public JpaItemWriter<PersonAfterProcess> writerPartitioner() {
JpaItemWriter<PersonAfterProcess> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(entityManagerFactory);
return writer;
}
@Bean
@Qualifier("workerStep")
public Step workerStep(JobRepository jobRepository, PlatformTransactionManager transactionManager ,JpaPagingItemReader readerPartitioner ) {
return new StepBuilder("workerStep" ,jobRepository )
.<Person, PersonAfterProcess>chunk(2 , transactionManager)
.reader(readerPartitioner)
.processor(personProcessor)
.writer(writerPartitioner())
.build();
}
@Bean
public PartitionHandler partitionHandler(Step workerStep) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
handler.setTaskExecutor(new SimpleAsyncTaskExecutor()); // Parallel execution
handler.setStep(workerStep); // Worker step
handler.setGridSize(3); // Number of partitions
return handler;
}
@Bean
public Step masterStep( JobRepository jobRepository ,@Qualifier("workerStep") Step stepWorker, ColumnRangePartitioner columnRangePartitioner) {
return new StepBuilder("masterStep" , jobRepository)
.partitioner("workerStep", columnRangePartitioner)
.partitionHandler(partitionHandler(stepWorker))
.build();
}
@Bean
public Job partitionedJob(Step masterStep , JobRepository jobRepository) {
return new JobBuilder("partitionedJob" , jobRepository)
.start(masterStep)
.build();
}
}
7. REST Controller
JobController.java
package com.example.batch.controller;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobLauncher;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/api/jobs")
public class JobController {
private final JobLauncher jobLauncher;
private final Job partitionedJob;
public JobController(JobLauncher jobLauncher, Job partitionedJob) {
this.jobLauncher = jobLauncher;
this.partitionedJob = partitionedJob;
}
@GetMapping("/run")
public ResponseEntity<String> runPartitionedJob() {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis()) // Unique parameter
.toJobParameters();
JobExecution execution = jobLauncher.run(partitionedJob, jobParameters);
return ResponseEntity.ok("Job started successfully with status: " + execution.getStatus());
} catch (Exception e) {
return ResponseEntity.status(500).body("Failed to start job: " + e.getMessage());
}
}
8. Running the Application
2. Trigger the job via API:
3. Verify the processed data:
Conclusion
In this complete example, we:
This approach improves performance and scalability for large datasets. For distributed environments, you can replace the TaskExecutorPartitionHandler with a MessageChannelPartitionHandler. Let me know if you have any questions!
Links :
Linkedin Links :
Spring Batch Overview باللغه العربية https://www.youtube.com/playlist?list=PLF9mDJoMemaEtJ3RNOFsrNxYl1clgHdDY