Spring Batch - Apache Kafka Integration
In this example, we'll learn how to write and read data from the Apache Kafka using Spring Batch. Below are the few commands which makes you to be get familiar with the Apache Kafka.
# Spring Batch and Fafka Writer
# List All Topics
kafka-topics.bat --list --zookeeper localhost:2181
# View Messages
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic customers --from-beginning
# Delete Topic
kafka-topics.bat --zookeeper localhost:2181 --delete --topic test
# First N messages
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning --max-messages 10
# Next N messages
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --max-messages 10
kafka-topics.bat --zookeeper localhost:2181 --alter --topic customers --delete-config retention.ms
WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
Going forward, please use kafka-configs.sh for this functionality
Updated config for topic customers.
```
C:\Users\pc>kafka-topics.bat --list --zookeeper localhost:2181
C:\Users\pc>kafka-topics.bat --list --zookeeper localhost:2181
customers
C:\Users\pc>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic customers --from-beginning
{"id":1,"firstName":" John","lastName":" Doe","birthdate":{"month":"OCTOBER","year":1952,"dayOfYear":284,"dayOfMonth":10,"dayOfWeek":"FRIDAY","hour":10,"minute":10,"monthValue":10,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":2,"firstName":" Amy","lastName":" Eugene","birthdate":{"month":"JULY","year":1985,"dayOfYear":186,"dayOfMonth":5,"dayOfWeek":"FRIDAY","hour":17,"minute":10,"monthValue":7,"nano":0,"second":0,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":3,"firstName":" Laverne","lastName":" Mann","birthdate":{"month":"DECEMBER","year":1988,"dayOfYear":346,"dayOfMonth":11,"dayOfWeek":"SUNDAY","hour":10,"minute":10,"monthValue":12,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":4,"firstName":" Janice","lastName":" Preston","birthdate":{"month":"FEBRUARY","year":1960,"dayOfYear":50,"dayOfMonth":19,"dayOfWeek":"FRIDAY","hour":10,"minute":10,"monthValue":2,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":5,"firstName":" Pauline","lastName":" Rios","birthdate":{"month":"AUGUST","year":1977,"dayOfYear":241,"dayOfMonth":29,"dayOfWeek":"MONDAY","hour":10,"minute":10,"monthValue":8,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":6,"firstName":" Perry","lastName":" Burnside","birthdate":{"month":"MARCH","year":1981,"dayOfYear":69,"dayOfMonth":10,"dayOfWeek":"TUESDAY","hour":10,"minute":10,"monthValue":3,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":7,"firstName":" Todd","lastName":" Kinsey","birthdate":{"month":"DECEMBER","year":1998,"dayOfYear":348,"dayOfMonth":14,"dayOfWeek":"MONDAY","hour":10,"minute":10,"monthValue":12,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":8,"firstName":" Jacqueline","lastName":" Hyde","birthdate":{"month":"MARCH","year":1983,"dayOfYear":79,"dayOfMonth":20,"dayOfWeek":"SUNDAY","hour":10,"minute":10,"monthValue":3,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":9,"firstName":" Rico","lastName":" Hale","birthdate":{"month":"OCTOBER","year":2000,"dayOfYear":284,"dayOfMonth":10,"dayOfWeek":"TUESDAY","hour":10,"minute":10,"monthValue":10,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":10,"firstName":" Samuel","lastName":" Lamm","birthdate":{"month":"NOVEMBER","year":1999,"dayOfYear":315,"dayOfMonth":11,"dayOfWeek":"THURSDAY","hour":10,"minute":10,"monthValue":11,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":11,"firstName":" Robert","lastName":" Coster","birthdate":{"month":"OCTOBER","year":1972,"dayOfYear":284,"dayOfMonth":10,"dayOfWeek":"TUESDAY","hour":10,"minute":10,"monthValue":10,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":12,"firstName":" Tamara","lastName":" Soler","birthdate":{"month":"JANUARY","year":1978,"dayOfYear":2,"dayOfMonth":2,"dayOfWeek":"MONDAY","hour":10,"minute":10,"monthValue":1,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":13,"firstName":" Justin","lastName":" Kramer","birthdate":{"month":"NOVEMBER","year":1951,"dayOfYear":323,"dayOfMonth":19,"dayOfWeek":"MONDAY","hour":10,"minute":10,"monthValue":11,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":14,"firstName":" Andrea","lastName":" Law","birthdate":{"month":"OCTOBER","year":1959,"dayOfYear":287,"dayOfMonth":14,"dayOfWeek":"WEDNESDAY","hour":10,"minute":10,"monthValue":10,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":15,"firstName":" Laura","lastName":" Porter","birthdate":{"month":"DECEMBER","year":2010,"dayOfYear":346,"dayOfMonth":12,"dayOfWeek":"SUNDAY","hour":10,"minute":10,"monthValue":12,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":16,"firstName":" Michael","lastName":" Cantu","birthdate":{"month":"APRIL","year":1999,"dayOfYear":101,"dayOfMonth":11,"dayOfWeek":"SUNDAY","hour":10,"minute":10,"monthValue":4,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":17,"firstName":" Andrew","lastName":" Thomas","birthdate":{"month":"MAY","year":1967,"dayOfYear":124,"dayOfMonth":4,"dayOfWeek":"THURSDAY","hour":10,"minute":10,"monthValue":5,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":18,"firstName":" Jose","lastName":" Hannah","birthdate":{"month":"SEPTEMBER","year":1950,"dayOfYear":259,"dayOfMonth":16,"dayOfWeek":"SATURDAY","hour":10,"minute":10,"monthValue":9,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":19,"firstName":" Valerie","lastName":" Hilbert","birthdate":{"month":"JUNE","year":1966,"dayOfYear":164,"dayOfMonth":13,"dayOfWeek":"MONDAY","hour":10,"minute":10,"monthValue":6,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
{"id":20,"firstName":" Patrick","lastName":" Durham","birthdate":{"month":"OCTOBER","year":1978,"dayOfYear":285,"dayOfMonth":12,"dayOfWeek":"THURSDAY","hour":10,"minute":10,"monthValue":10,"nano":0,"second":10,"chronology":{"id":"ISO","calendarType":"iso8601"}}}
```
pom.xml - This file maintains the dependency needed for the application.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>spring-batch-kafka-writer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-batch-kafka-writer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR3</spring-cloud.version>
<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Customer.java - Domain class to hold Customer info.
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
CustomerFieldSetMapper.java - Interface that is used to map data obtained from a FieldSet into an object.
public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
return Customer.builder()
.id(fieldSet.readLong("id"))
.firstName(fieldSet.readRawString("firstName"))
.lastName(fieldSet.readRawString("lastName"))
.birthdate(fieldSet.readRawString("birthdate"))
.build();
}
}
JobConfig.java - This is main configuration class needed to configure all Spring Batch platform beans
KafkaItemWriter - An ItemWriter implementation for Apache Kafka using a KafkaTemplate with default topic configured.
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.kafka.KafkaItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.kafka.core.KafkaTemplate;
import com.example.demo.mapper.CustomerFieldSetMapper;
import com.example.demo.model.Customer;
@Configuration
public class JobConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private KafkaTemplate<Long, Customer> kafkaTemplate;
@Bean
public FlatFileItemReader<Customer> customerItemReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new ClassPathResource("/data/customer.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
customerLineMapper.setLineTokenizer(tokenizer);
customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
customerLineMapper.afterPropertiesSet();
reader.setLineMapper(customerLineMapper);
return reader;
}
@Bean
public KafkaItemWriter<Long, Customer> kafkaItemWriter() throws Exception{
KafkaItemWriter<Long, Customer> writer = new KafkaItemWriter<>();
writer.setKafkaTemplate(kafkaTemplate);
writer.setItemKeyMapper(Customer::getId);
writer.setDelete(false);
writer.afterPropertiesSet();
return writer;
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(customerItemReader())
.writer(kafkaItemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
}
SpringBatchKafkaWriterApplication.java - This is the main App class.
import java.util.Date;
import java.util.UUID;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchKafkaWriterApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public static void main(String[] args) {
SpringApplication.run(SpringBatchKafkaWriterApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("jobId", UUID.randomUUID().toString())
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}
application.properties - Configuration needed to talk to the apache kafka.
## spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.group-id=customers-group ## spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.client-id=customers-client ## spring.kafka.consumer.properties.spring.json.trusted.packages=* ## spring.kafka.template.default-topic=customers # Execute all Spring Batch jobs in the context on startup. Make it false spring.batch.job.enabled=false
customer.csv
id,firstName,lastName, 1, Kavita, Khapre,10-10-1952 10:10:10 2, Ashu, Dhole,05-07-1985 17:10:00 3, Meenal, Das,11-12-1988 10:10:10 4, Surajeet, Sen,19-02-1960 10:10:10 5, Prateek, Bhagwat,29-08-1977 10:10:10 6, Sagar, Karande,10-03-1981 10:10:10 7, Neha, Parate,14-12-1998 10:10:10 8, Deepak, Kale,20-03-1983 10:10:10 9, Rahul, Mane,10-10-2000 10:10:10 10, Karan, Date,11-11-1999 10:10:10 11, Savani, Natu,10-10-1972 10:10:10 12, Ashish, Swant,02-01-1978 10:10:10 13, Sachin, Meghe,19-11-1951 10:10:10 14, Dinesh, Pawar,14-10-1959 10:10:10 15, Deepti, Dongare,12-12-2010 10:10:10 16, Ravi, Rahangdale,11-04-1999 10:10:10 17, Megha, Telkhade,04-05-1967 10:10:10 18, Pawan, Bokade,16-09-1950 10:10:10 19, Prabhakar, Dhakite,13-06-1966 10:10:10 20, Kirti, Ranade,12-10-1978 10:10:10
============ Reader example
CustomerLineAggregator.java
import org.springframework.batch.item.file.transform.LineAggregator;
import com.example.demo.model.Customer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CustomerLineAggregator implements LineAggregator<Customer>{
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public String aggregate(Customer item) {
try {
return objectMapper.writeValueAsString(item);
} catch (Exception e) {
throw new RuntimeException("Unable to Serialized Customer", e);
}
}
}
JobConfiguration.java
import java.io.File;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.kafka.KafkaItemReader;
import org.springframework.batch.item.kafka.builder.KafkaItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import com.example.demo.lineAggregator.CustomerLineAggregator;
import com.example.demo.model.Customer;
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private KafkaProperties properties;
@Bean
public KafkaItemReader<Long, Customer> kafkaItemReader() {
Properties props = new Properties();
props.putAll(this.properties.buildConsumerProperties());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaItemReaderBuilder<Long, Customer>()
.partitions(0)
.consumerProperties(props)
.name("customers-reader")
.saveState(true)
.topic("customers")
.build();
}
@Bean
public FlatFileItemWriter<Customer> customerItemWriter() throws Exception{
String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
System.out.println(">> Output Path = "+customerOutputPath);
FlatFileItemWriter<Customer> itemWriter = new FlatFileItemWriter<>();
//A LineAggregator implementation that simply calls Object.toString() on the given object
//itemWriter.setLineAggregator(new PassThroughLineAggregator<>());
//Alternate ways
itemWriter.setLineAggregator(new CustomerLineAggregator());
itemWriter.setResource(new FileSystemResource(customerOutputPath));
itemWriter.afterPropertiesSet();
return itemWriter;
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(100)
.reader(kafkaItemReader())
.writer(customerItemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job").incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
}
application.properties
## spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.group-id=customers-group ## spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.client-id=customers-client ## spring.kafka.consumer.properties.spring.json.trusted.packages=* ## spring.kafka.template.default-topic=customers1 spring.batch.job.enabled=false
SpringBatchKafkaReaderApplication.java
import java.util.Date;
import java.util.UUID;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class SpringBatchKafkaReaderApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public static void main(String[] args) {
SpringApplication.run(SpringBatchKafkaReaderApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("JobId", UUID.randomUUID().toString())
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}