Spring Integration with DB - Using Channel Adapters
In this article, we'll learn how integrate Spring Integration with MySQL database using Channel Adapter code
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.globomantics.jdbcexample</groupId> <artifactId>JdbcExample</artifactId> <version>0.0.1-SNAPSHOT</version> <name>JdbcExample</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.21</version> </dependency> <dependency> <groupId>org.mariadb.jdbc</groupId> <artifactId>mariadb-java-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
Reservation - Model class holds the Reservation information
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Reservation {
private Long id;
private String name;
}
ReservationListener.java
import com.globomantics.jdbcexample.model.Reservation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Service;
@Service
public class ReservationListener {
private static final Logger logger = LogManager.getLogger(ReservationListener.class);
@ServiceActivator(inputChannel = "newReservationChannel")
public void handleReservation(Reservation reservation) {
logger.info("Received message: {}", reservation);
}
}
ReservationService.java
import com.globomantics.jdbcexample.config.JdbcOutboundConfig;
import com.globomantics.jdbcexample.model.Reservation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ReservationService {
private static final Logger logger = LogManager.getLogger(ReservationService.class);
@Autowired
private JdbcOutboundConfig.CreateReservationGateway reservationGateway;
public void createReservation(Reservation reservation) {
logger.info("Publishing reservation: {}", reservation);
reservationGateway.createReservation(reservation);
}
}
JdbcInboundConfig.java
import java.util.List;
import javax.sql.DataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Splitter;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import com.globomantics.jdbcexample.model.Reservation;
@Configuration
public class JdbcInboundConfig {
private static final String UPDATE = "update reservation set status = 1 where id in (:id)";
private static final String SQL = "SELECT * FROM reservation where status = 0";
@Bean
public MessageChannel newReservationChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel newReservationListChannel() {
return new DirectChannel();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Splitter(inputChannel = "newReservationListChannel", outputChannel = "newReservationChannel")
public List<Reservation> splitter(Message message) {
return (List) message.getPayload();
}
@Bean
@InboundChannelAdapter(value = "newReservationListChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<?> checkDbForReservations(DataSource dataSource) {
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(dataSource, SQL);
adapter.setRowMapper((rs, index) -> new Reservation(rs.getLong("id"), rs.getString("name")));
adapter.setUpdateSql(UPDATE);
return adapter;
}
}
JdbcOutboundConfig.java
import com.globomantics.jdbcexample.model.Reservation;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import javax.sql.DataSource;
@Configuration
public class JdbcOutboundConfig {
private static final String SQL = "INSERT INTO reservation (id, name, status) VALUES (?, ?, 0)";
@Bean
public MessageChannel createReservationChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "createReservationChannel")
public MessageHandler jdbcReservationMessageHandler(DataSource dataSource) {
JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource, SQL);
jdbcMessageHandler.setPreparedStatementSetter((ps, message) -> {
Reservation reservation = (Reservation) message.getPayload();
ps.setLong(1, reservation.getId());
ps.setString(2, reservation.getName());
});
return jdbcMessageHandler;
}
@MessagingGateway(defaultRequestChannel = "createReservationChannel")
public interface CreateReservationGateway {
void createReservation(Reservation reservation);
}
}
JdbcExampleApplication.java
import com.globomantics.jdbcexample.model.Reservation;
import com.globomantics.jdbcexample.service.ReservationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.Arrays;
import java.util.List;
@SpringBootApplication
public class JdbcExampleApplication implements CommandLineRunner {
@Autowired
private ReservationService service;
public static void main(String[] args) {
SpringApplication.run(JdbcExampleApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
List<Reservation> reservations = Arrays.asList(
new Reservation(1L, "User 1"),
new Reservation(2L, "User 2"),
new Reservation(3L, "User 3"),
new Reservation(4L, "User 4"),
new Reservation(5L, "User 5"));
reservations.forEach(reservation -> {
service.createReservation(reservation);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.err.println("InterruptedException = "+e);
}
});
}
}
application.properties
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/test spring.datasource.username=root spring.datasource.password=root
spring.datasource.initialization-mode=always
schema.sql
create database reservationdb; use reservationdb; -- Create the reservation table if it does not already exist CREATE TABLE IF NOT EXISTS reservation ( id INTEGER NOT NULL AUTO_INCREMENT, name VARCHAR(128) NOT NULL, status INTEGER, PRIMARY KEY (id) ); -- Delete existing rows from the reservation table DELETE FROM reservation;
Database output -
mysql> use test; Database changed mysql> show tables; +----------------+ | Tables_in_test | +----------------+ | reservation | +----------------+ 1 row in set (0.00 sec) mysql> select * from reservation; +----+--------+--------+ | id | name | status | +----+--------+--------+ | 1 | User 1 | 1 | | 2 | User 2 | 1 | | 3 | User 3 | 1 | | 4 | User 4 | 1 | | 5 | User 5 | 1 | +----+--------+--------+ 5 rows in set (0.00 sec)
awesome article! Thank you!