Spring Integration with DB - Using Channel Adapters

In this article, we'll learn how integrate Spring Integration with MySQL database using Channel Adapter code

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.3.3.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.globomantics.jdbcexample</groupId>
	<artifactId>JdbcExample</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>JdbcExample</name>
	<description>Demo project for Spring Boot</description>


	<properties>
		<java.version>1.8</java.version>
	</properties>


	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-jdbc</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-jdbc</artifactId>
		</dependency>


		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.21</version>
		</dependency>


		<dependency>
			<groupId>org.mariadb.jdbc</groupId>
			<artifactId>mariadb-java-client</artifactId>
		</dependency>
		


		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
	</dependencies>


	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>


</project>

Reservation - Model class holds the Reservation information

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class Reservation {
    private Long id;
    private String name;
}

ReservationListener.java

import com.globomantics.jdbcexample.model.Reservation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Service;


@Service
public class ReservationListener {
    private static final Logger logger = LogManager.getLogger(ReservationListener.class);


    @ServiceActivator(inputChannel = "newReservationChannel")
    public void handleReservation(Reservation reservation) {
        logger.info("Received message: {}", reservation);
    }
}

ReservationService.java

import com.globomantics.jdbcexample.config.JdbcOutboundConfig;
import com.globomantics.jdbcexample.model.Reservation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
public class ReservationService {
    private static final Logger logger = LogManager.getLogger(ReservationService.class);


    @Autowired
    private JdbcOutboundConfig.CreateReservationGateway reservationGateway;


    public void createReservation(Reservation reservation) {
        logger.info("Publishing reservation: {}", reservation);
        reservationGateway.createReservation(reservation);
    }
}

JdbcInboundConfig.java

import java.util.List;


import javax.sql.DataSource;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Splitter;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;


import com.globomantics.jdbcexample.model.Reservation;


@Configuration
public class JdbcInboundConfig {
	private static final String UPDATE = "update reservation set status = 1 where id in (:id)";
	private static final String SQL = "SELECT * FROM reservation where status = 0";


	@Bean
	public MessageChannel newReservationChannel() {
		return new DirectChannel();
	}


	@Bean
	public MessageChannel newReservationListChannel() {
		return new DirectChannel();
	}


	@SuppressWarnings({ "unchecked", "rawtypes" })
	@Splitter(inputChannel = "newReservationListChannel", outputChannel = "newReservationChannel")
	public List<Reservation> splitter(Message message) {
		return (List) message.getPayload();
	}


	@Bean
	@InboundChannelAdapter(value = "newReservationListChannel", poller = @Poller(fixedDelay = "1000"))
	public MessageSource<?> checkDbForReservations(DataSource dataSource) {
		JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(dataSource, SQL);
		adapter.setRowMapper((rs, index) -> new Reservation(rs.getLong("id"), rs.getString("name")));
		adapter.setUpdateSql(UPDATE);
		return adapter;
	}
}

JdbcOutboundConfig.java

import com.globomantics.jdbcexample.model.Reservation;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;


import javax.sql.DataSource;


@Configuration
public class JdbcOutboundConfig {
	private static final String SQL = "INSERT INTO reservation (id, name, status) VALUES (?, ?, 0)";


	@Bean
	public MessageChannel createReservationChannel() {
		return new DirectChannel();
	}


	@Bean
	@ServiceActivator(inputChannel = "createReservationChannel")
	public MessageHandler jdbcReservationMessageHandler(DataSource dataSource) {
		JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource, SQL);
		
		jdbcMessageHandler.setPreparedStatementSetter((ps, message) -> {
			Reservation reservation = (Reservation) message.getPayload();
			ps.setLong(1, reservation.getId());
			ps.setString(2, reservation.getName());
		});
		return jdbcMessageHandler;
	}


	@MessagingGateway(defaultRequestChannel = "createReservationChannel")
	public interface CreateReservationGateway {
		void createReservation(Reservation reservation);
	}
}

JdbcExampleApplication.java

import com.globomantics.jdbcexample.model.Reservation;
import com.globomantics.jdbcexample.service.ReservationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


import java.util.Arrays;
import java.util.List;


@SpringBootApplication
public class JdbcExampleApplication implements CommandLineRunner {


    @Autowired
    private ReservationService service;


    public static void main(String[] args) {
        SpringApplication.run(JdbcExampleApplication.class, args);
    }


    @Override
    public void run(String... args) throws Exception {
        List<Reservation> reservations = Arrays.asList(
                new Reservation(1L, "User 1"),
                new Reservation(2L, "User 2"),
                new Reservation(3L, "User 3"),
                new Reservation(4L, "User 4"),
                new Reservation(5L, "User 5"));
        reservations.forEach(reservation -> {
        	service.createReservation(reservation);
        	try {
        		Thread.sleep(1000);
			} catch (InterruptedException e) {
				System.err.println("InterruptedException = "+e);
			}
		});
    }
}

application.properties

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.initialization-mode=always

schema.sql

create database reservationdb;
use reservationdb;


-- Create the reservation table if it does not already exist
CREATE TABLE IF NOT EXISTS reservation (
    id   INTEGER      NOT NULL AUTO_INCREMENT,
    name VARCHAR(128) NOT NULL,
    status INTEGER,
    PRIMARY KEY (id)
);


-- Delete existing rows from the reservation table
DELETE FROM reservation;

Database output -

mysql> use test;
Database changed

mysql> show tables;
+----------------+
| Tables_in_test |
+----------------+
| reservation    |
+----------------+
1 row in set (0.00 sec)


mysql> select * from reservation;
+----+--------+--------+
| id | name   | status |
+----+--------+--------+
|  1 | User 1 |      1 |
|  2 | User 2 |      1 |
|  3 | User 3 |      1 |
|  4 | User 4 |      1 |
|  5 | User 5 |      1 |
+----+--------+--------+
5 rows in set (0.00 sec)


To view or add a comment, sign in

More articles by Prateek Ashtikar

Others also viewed

Explore content categories