이번 포스팅을 통해서 spring boot와 embedded Debezium을 연동하는 법을 살펴보겠습니다. 코드는 github에서 확인할 수 있습니다.
https://github.com/seonwoo960000/spring-boot-embedded-debezium
설정
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.5</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>spring-boot-debezium-1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-debezium-1</name>
<description>spring-boot-debezium-1</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.21</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>1.4.2.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>1.4.2.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>1.4.2.Final</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
- spring-boot-starter-jpa: jpa로 데이터베이스와 통신을 진행합니다.
- mysql-connector-java: MySQL connector입니다.
- debezium-*: spring boot와 embedded debezium 연동을 위한 의존성입니다.
application.yml
server:
port: 8080
## Source Database Properties
customer:
datasource:
host: localhost
port: 3307
database: customerdb
username: root
password: root
## Primary/Target Database Properties
spring:
datasource:
url: jdbc:mysql://localhost:3308/customerdb
username: root
password: root
jpa.hibernate.ddl-auto: create-drop
jpa.show-sql: true
## Logging properties
logging:
level:
root: INFO
io:
debezium:
mysql:
BinlogReader: INFO
데이터베이스는 MySQL을 활용합니다. Source database, 즉 embedded debezium을 통해 spring boot와 연결된 데이터베이스는 3307 포트로 통신합니다. 이후 spring boot 애플리케이션을 활용해서 3308번 포트로 통신하는 데이터베이스에 변경사항을 저장하도록 하겠습니다.
docker-compose.yml
Local 환경에 MySQL을 여러 포트로 띄우기 위해 도커를 사용하겠습니다. 해당 스크립트는 docker-compose up -d 커맨드를 통해 실행시킬 수 있습니다(단, 도커 데몬이 실행 중이어야 정상적으로 동작합니다).
### comamnd: docker-compose up-d
version: "3.9"
services:
# spring boot와 embedded debezium으로 연결하고자 하는 MySQL
mysql-1:
container_name: source-database
image: mysql
ports:
- 3307:3306
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_USER: user
MYSQL_PASSWORD: password
MYSQL_DATABASE: customerdb
# spring boot로 변경사항을 추적하고 그 변경사항을 저장할 MySQL
mysql-2:
container_name: target-database
image: mysql
ports:
- 3308:3306
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_USER: user
MYSQL_PASSWORD: password
MYSQL_DATABASE: customerdb
코드
가장 기초가 되는 entity, repository, service 클래스를 작성하겠습니다.
entity
@Entity
@Getter
@Setter
public class Customer {
@Id
private Long id;
private String fullname;
private String email;
}
repository
@Repository
public interface CustomerRepository extends JpaRepository<Customer, Long> {
}
service
@Service
@RequiredArgsConstructor
public class CustomerService {
private final CustomerRepository customerRepository;
public void replicateData(Map<String, Object> customerData, Operation operation) {
final ObjectMapper objectMapper = new ObjectMapper();
final Customer customer = objectMapper.convertValue(customerData, Customer.class);
if (Operation.DELETE == operation) {
customerRepository.deleteById(customer.getId());
} else {
customerRepository.save(customer);
}
}
}
그다음으로는 Debezium과 관련된 설정 및 변경사항을 추적하기 위한 코드를 작성하겠습니다.
DebeziumConfig
@Configuration
public class DebeziumConfig {
/**
* Database details.
*/
@Value("${customer.datasource.host}")
private String customerDbHost;
@Value("${customer.datasource.database}")
private String customerDbName;
@Value("${customer.datasource.port}")
private String customerDbPort;
@Value("${customer.datasource.username}")
private String customerDbUsername;
@Value("${customer.datasource.password}")
private String customerDbPassword;
/**
* Customer Database Connector Configuration
*/
@Bean
public io.debezium.config.Configuration customerConnector() throws IOException {
File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
return io.debezium.config.Configuration.create()
.with("name", "customer-mysql-connector")
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
.with("offset.flush.interval.ms", "60000")
.with("database.hostname", customerDbHost)
.with("database.port", customerDbPort)
.with("database.user", customerDbUsername)
.with("database.password", customerDbPassword)
.with("database.dbname", customerDbName)
.with("database.include.list", customerDbName)
.with("include.schema.changes", "false")
.with("database.allowPublicKeyRetrieval", "true")
.with("database.server.id", "10181")
.with("database.server.name", "customer-mysql-db-server")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
.build();
}
}
각 설정에 대한 정보는 다음 링크에서 확인할 수 있습니다.
https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties
DebeziumListener
@Slf4j
@Component
public class DebeziumListener {
private final Executor executor = Executors.newSingleThreadExecutor();
private final CustomerService customerService;
private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;
public DebeziumListener(Configuration config, CustomerService customerService) {
this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(config.asProperties())
.notifying(this::handleChangeEvent)
.build();
this.customerService = customerService;
}
private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();
log.info("Key = '" + sourceRecord.key() + "' value = '" + sourceRecord.value() + "'");
Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
if (sourceRecordChangeValue != null) {
Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
if (operation != Operation.READ) {
String record = operation == Operation.DELETE ? BEFORE : AFTER;
Struct struct = (Struct) sourceRecordChangeValue.get(record);
Map<String, Object> payload = struct.schema().fields().stream()
.map(Field::name)
.filter(fieldName -> struct.get(fieldName) != null)
.map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
.collect(toMap(Pair::getKey, Pair::getValue));
this.customerService.replicateData(payload, operation);
log.info("Updated Data: {} with Operation: {}", payload, operation.name());
}
}
}
@PostConstruct
private void start() {
this.executor.execute(debeziumEngine);
}
@PreDestroy
private void stop() throws IOException {
if (this.debeziumEngine != null) {
this.debeziumEngine.close();
}
}
}
handleChangeEvent 메서드는 Debezium을 통해 데이터의 변경사항을 추적해서 읽기 이외의 작업에 대해 변경사항을 customerService.replicateDate 메서드에 위임해서 처리합니다. Operation(enum)을 통해 데이터베이스에서 어떤 유형의 변경사항(READ, CREATE, UPDATE, DELETE)이 발생했는지 확인할 수 있습니다.
실행
MySql에 테이블 생성
docker-compose 명령어를 통해 위에서 말씀드린 스크립트를 실행했더라면 현재 2개의 MySql docker 컨테이너가 실행하고 있을 겁니다.
각각의 데이터베이스에 접속해서 customerdb에 다음 DDL로 customer 테이블을 생성합니다.
Spring Boot 실행
양쪽 데이터베이스에 모두 테이블을 생성했다면 spring boot를 실행시킵니다(Spring boot가 실행돼야만 debezium을 통해 source 데이터베이스의 변경사항을 target database에 저장할 수 있습니다).
Source 데이터베이스에 변경사항 수행
Source 데이터베이스에 INSERT, UPDATE, DELETE 수행하면서 어떻게 데이터가 복제되는지 살펴보겠습니다.
Insert
Spring boot에서 생성된 로그는 다음과 같습니다.
2022-11-07 08:10:25.956 INFO 39684 --- [pool-1-thread-1] c.e.s.listener.DebeziumListener : Key = 'Struct{id=1}' value = 'Struct{after=Struct{id=1,fullname=seonwoo,email=seonwoo@naver.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=0,snapshot=true,db=customerdb,table=customer,server_id=0,file=binlog.000005,pos=157,row=0},op=r,ts_ms=1667776225742}'
2022-11-07 08:10:25.956 INFO 39684 --- [pool-1-thread-1] c.e.s.listener.DebeziumListener : Key = 'Struct{id=2}' value = 'Struct{after=Struct{id=2,fullname=sinwoo,email=sinwoo@naver.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=0,snapshot=last,db=customerdb,table=customer,server_id=0,file=binlog.000005,pos=157,row=0},op=r,ts_ms=1667776225743}'
2022-11-07 08:10:25.956 INFO 39684 --- [pool-1-thread-1] i.d.connector.mysql.ChainedReader : Transitioning from the snapshot reader to the binlog reader
2022-11-07 08:10:25.989 INFO 39684 --- [pool-1-thread-1] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-customer-mysql-db-server-binlog-client
2022-11-07 08:10:25.991 INFO 39684 --- [-localhost:3307] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-customer-mysql-db-server-binlog-client
2022-11-07 08:10:26.006 INFO 39684 --- [-localhost:3307] c.g.shyiko.mysql.binlog.BinaryLogClient : Connected to localhost:3307 at binlog.000005/157 (sid:10181, cid:10)
2022-11-07 08:10:26.006 INFO 39684 --- [-localhost:3307] i.debezium.connector.mysql.BinlogReader : Connected to MySQL binlog at localhost:3307, starting at binlog file 'binlog.000005', pos=157, skipping 0 events plus 0 rows
2022-11-07 08:10:26.006 INFO 39684 --- [pool-1-thread-1] i.debezium.connector.mysql.BinlogReader : Waiting for keepalive thread to start
2022-11-07 08:10:26.007 INFO 39684 --- [-localhost:3307] io.debezium.util.Threads : Creating thread debezium-mysqlconnector-customer-mysql-db-server-binlog-client
2022-11-07 08:10:26.111 INFO 39684 --- [pool-1-thread-1] i.debezium.connector.mysql.BinlogReader : Keepalive thread is running
Update
Delete
마무리
이번 포스팅을 통해 embedded Debezium과 Spring boot를 연동하는 법을 살펴봤습니다. Debezium 사용해보면서 Debezium을 잘 활용하면 하나의 트랜잭션에서 처리가 불가능하던 작업을 (예를 들어 database commit 이후 kafka로 메시지를 보낸다는 등) 쉽게 수행할 수 있을 거란 생각이 들었습니다.
Reference
'Open Source > Debezium' 카테고리의 다른 글
[Debezium] Debezium 이란? (0) | 2022.11.06 |
---|