profile image

L o a d i n g . . .

728x90

이번 포스팅을 통해서 spring boot와 embedded Debezium을 연동하는 법을 살펴보겠습니다. 코드는 github에서 확인할 수 있습니다. 

https://github.com/seonwoo960000/spring-boot-embedded-debezium

 

GitHub - seonwoo960000/spring-boot-embedded-debezium

Contribute to seonwoo960000/spring-boot-embedded-debezium development by creating an account on GitHub.

github.com

설정 

pom.xml 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.5</version>
    <relativePath /> <!-- lookup parent from repository -->
  </parent>
  <groupId>com.example</groupId>
  <artifactId>spring-boot-debezium-1</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>spring-boot-debezium-1</name>
  <description>spring-boot-debezium-1</description>
  <properties>
    <java.version>11</java.version>
  </properties>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.21</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
    </dependency>

    <dependency>
      <groupId>io.debezium</groupId>
      <artifactId>debezium-api</artifactId>
      <version>1.4.2.Final</version>
    </dependency>
    <dependency>
      <groupId>io.debezium</groupId>
      <artifactId>debezium-embedded</artifactId>
      <version>1.4.2.Final</version>
    </dependency>
    <dependency>
      <groupId>io.debezium</groupId>
      <artifactId>debezium-connector-mysql</artifactId>
      <version>1.4.2.Final</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.10</version>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <configuration>
          <excludes>
            <exclude>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
            </exclude>
          </excludes>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>
  • spring-boot-starter-jpa: jpa로 데이터베이스와 통신을 진행합니다. 
  • mysql-connector-java: MySQL connector입니다. 
  • debezium-*: spring boot와 embedded debezium 연동을 위한 의존성입니다. 

application.yml 

server:
  port: 8080

## Source Database Properties
customer:
  datasource:
    host: localhost
    port: 3307
    database: customerdb
    username: root
    password: root

## Primary/Target Database Properties
spring:
  datasource:
    url: jdbc:mysql://localhost:3308/customerdb
    username: root
    password: root
  jpa.hibernate.ddl-auto: create-drop
  jpa.show-sql: true

## Logging properties
logging:
  level:
    root: INFO
    io:
      debezium:
        mysql:
          BinlogReader: INFO

데이터베이스는 MySQL을 활용합니다. Source database, 즉 embedded debezium을 통해 spring boot와 연결된 데이터베이스는 3307 포트로 통신합니다. 이후 spring boot 애플리케이션을 활용해서 3308번 포트로 통신하는 데이터베이스에 변경사항을 저장하도록 하겠습니다. 

docker-compose.yml 

Local 환경에 MySQL을 여러 포트로 띄우기 위해 도커를 사용하겠습니다. 해당 스크립트는 docker-compose up -d 커맨드를 통해 실행시킬 수 있습니다(단, 도커 데몬이 실행 중이어야 정상적으로 동작합니다). 

### comamnd: docker-compose up-d

version: "3.9"
services:
  # spring boot와 embedded debezium으로 연결하고자 하는 MySQL 
  mysql-1:
    container_name: source-database
    image: mysql
    ports:
      - 3307:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

  # spring boot로 변경사항을 추적하고 그 변경사항을 저장할 MySQL 
  mysql-2:
    container_name: target-database
    image: mysql
    ports:
      - 3308:3306
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_USER: user
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: customerdb

코드

가장 기초가 되는 entity, repository, service 클래스를 작성하겠습니다. 

entity 

@Entity
@Getter
@Setter
public class Customer {

    @Id
    private Long id;
    private String fullname;
    private String email;
}

repository 

@Repository
public interface CustomerRepository extends JpaRepository<Customer, Long> {
}

service 

@Service
@RequiredArgsConstructor
public class CustomerService {

    private final CustomerRepository customerRepository;

    public void replicateData(Map<String, Object> customerData, Operation operation) {
        final ObjectMapper objectMapper = new ObjectMapper();
        final Customer customer = objectMapper.convertValue(customerData, Customer.class);

        if (Operation.DELETE == operation) {
            customerRepository.deleteById(customer.getId());
        } else {
            customerRepository.save(customer);
        }
    }
}

 

그다음으로는 Debezium과 관련된 설정 및 변경사항을 추적하기 위한 코드를 작성하겠습니다. 

DebeziumConfig 

@Configuration
public class DebeziumConfig {

    /**
     * Database details.
     */
    @Value("${customer.datasource.host}")
    private String customerDbHost;

    @Value("${customer.datasource.database}")
    private String customerDbName;

    @Value("${customer.datasource.port}")
    private String customerDbPort;

    @Value("${customer.datasource.username}")
    private String customerDbUsername;

    @Value("${customer.datasource.password}")
    private String customerDbPassword;

    /**
     * Customer Database Connector Configuration
     */
    @Bean
    public io.debezium.config.Configuration customerConnector() throws IOException {
        File offsetStorageTempFile = File.createTempFile("offsets_", ".dat");
        File dbHistoryTempFile = File.createTempFile("dbhistory_", ".dat");
        return io.debezium.config.Configuration.create()
                                               .with("name", "customer-mysql-connector")
                                               .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
                                               .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
                                               .with("offset.storage.file.filename", offsetStorageTempFile.getAbsolutePath())
                                               .with("offset.flush.interval.ms", "60000")
                                               .with("database.hostname", customerDbHost)
                                               .with("database.port", customerDbPort)
                                               .with("database.user", customerDbUsername)
                                               .with("database.password", customerDbPassword)
                                               .with("database.dbname", customerDbName)
                                               .with("database.include.list", customerDbName)
                                               .with("include.schema.changes", "false")
                                               .with("database.allowPublicKeyRetrieval", "true")
                                               .with("database.server.id", "10181")
                                               .with("database.server.name", "customer-mysql-db-server")
                                               .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
                                               .with("database.history.file.filename", dbHistoryTempFile.getAbsolutePath())
                                               .build();
    }
}

 각 설정에 대한 정보는 다음 링크에서 확인할 수 있습니다. 

https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-connector-properties

 

DebeziumListener

@Slf4j
@Component
public class DebeziumListener {

    private final Executor executor = Executors.newSingleThreadExecutor();
    private final CustomerService customerService;
    private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

    public DebeziumListener(Configuration config, CustomerService customerService) {
        this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                                            .using(config.asProperties())
                                            .notifying(this::handleChangeEvent)
                                            .build();

        this.customerService = customerService;
    }

    private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
        SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();

        log.info("Key = '" + sourceRecord.key() + "' value = '" + sourceRecord.value() + "'");
        Struct sourceRecordChangeValue = (Struct) sourceRecord.value();

        if (sourceRecordChangeValue != null) {
            Operation operation = Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

            if (operation != Operation.READ) {
                String record = operation == Operation.DELETE ? BEFORE : AFTER;
                Struct struct = (Struct) sourceRecordChangeValue.get(record);
                Map<String, Object> payload = struct.schema().fields().stream()
                                                    .map(Field::name)
                                                    .filter(fieldName -> struct.get(fieldName) != null)
                                                    .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                                                    .collect(toMap(Pair::getKey, Pair::getValue));

                this.customerService.replicateData(payload, operation);
                log.info("Updated Data: {} with Operation: {}", payload, operation.name());
            }
        }
    }

    @PostConstruct
    private void start() {
        this.executor.execute(debeziumEngine);
    }

    @PreDestroy
    private void stop() throws IOException {
        if (this.debeziumEngine != null) {
            this.debeziumEngine.close();
        }
    }
}

handleChangeEvent 메서드는 Debezium을 통해 데이터의 변경사항을 추적해서 읽기 이외의 작업에 대해 변경사항을 customerService.replicateDate 메서드에 위임해서 처리합니다. Operation(enum)을 통해 데이터베이스에서 어떤 유형의 변경사항(READ, CREATE, UPDATE, DELETE)이 발생했는지 확인할 수 있습니다. 

실행 

MySql에 테이블 생성

docker-compose 명령어를 통해 위에서 말씀드린 스크립트를 실행했더라면 현재 2개의 MySql docker 컨테이너가 실행하고 있을 겁니다. 

왼쪽이 mysql-1(source database), 오른쪽이 mysql-2(target database)

각각의 데이터베이스에 접속해서 customerdb에 다음 DDL로 customer 테이블을 생성합니다. 

테이블 생성

Spring Boot 실행 

양쪽 데이터베이스에 모두 테이블을 생성했다면 spring boot를 실행시킵니다(Spring boot가 실행돼야만 debezium을 통해 source 데이터베이스의 변경사항을 target database에 저장할 수 있습니다).

 

Source 데이터베이스에 변경사항 수행

Source 데이터베이스에 INSERT, UPDATE, DELETE 수행하면서 어떻게 데이터가 복제되는지 살펴보겠습니다. 

Insert 

Insert 이후 데이터가 복제됨을 확인

Spring boot에서 생성된 로그는 다음과 같습니다.

2022-11-07 08:10:25.956  INFO 39684 --- [pool-1-thread-1] c.e.s.listener.DebeziumListener          : Key = 'Struct{id=1}' value = 'Struct{after=Struct{id=1,fullname=seonwoo,email=seonwoo@naver.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=0,snapshot=true,db=customerdb,table=customer,server_id=0,file=binlog.000005,pos=157,row=0},op=r,ts_ms=1667776225742}'
2022-11-07 08:10:25.956  INFO 39684 --- [pool-1-thread-1] c.e.s.listener.DebeziumListener          : Key = 'Struct{id=2}' value = 'Struct{after=Struct{id=2,fullname=sinwoo,email=sinwoo@naver.com},source=Struct{version=1.4.2.Final,connector=mysql,name=customer-mysql-db-server,ts_ms=0,snapshot=last,db=customerdb,table=customer,server_id=0,file=binlog.000005,pos=157,row=0},op=r,ts_ms=1667776225743}'
2022-11-07 08:10:25.956  INFO 39684 --- [pool-1-thread-1] i.d.connector.mysql.ChainedReader        : Transitioning from the snapshot reader to the binlog reader
2022-11-07 08:10:25.989  INFO 39684 --- [pool-1-thread-1] io.debezium.util.Threads                 : Creating thread debezium-mysqlconnector-customer-mysql-db-server-binlog-client
2022-11-07 08:10:25.991  INFO 39684 --- [-localhost:3307] io.debezium.util.Threads                 : Creating thread debezium-mysqlconnector-customer-mysql-db-server-binlog-client
2022-11-07 08:10:26.006  INFO 39684 --- [-localhost:3307] c.g.shyiko.mysql.binlog.BinaryLogClient  : Connected to localhost:3307 at binlog.000005/157 (sid:10181, cid:10)
2022-11-07 08:10:26.006  INFO 39684 --- [-localhost:3307] i.debezium.connector.mysql.BinlogReader  : Connected to MySQL binlog at localhost:3307, starting at binlog file 'binlog.000005', pos=157, skipping 0 events plus 0 rows
2022-11-07 08:10:26.006  INFO 39684 --- [pool-1-thread-1] i.debezium.connector.mysql.BinlogReader  : Waiting for keepalive thread to start
2022-11-07 08:10:26.007  INFO 39684 --- [-localhost:3307] io.debezium.util.Threads                 : Creating thread debezium-mysqlconnector-customer-mysql-db-server-binlog-client
2022-11-07 08:10:26.111  INFO 39684 --- [pool-1-thread-1] i.debezium.connector.mysql.BinlogReader  : Keepalive thread is running

Update 

Update 이후 변경사항이 전파된 것을 확인

Delete 

Delete 이후 변경사항이 전파된 것을 확인

마무리 

이번 포스팅을 통해 embedded Debezium과 Spring boot를 연동하는 법을 살펴봤습니다. Debezium 사용해보면서 Debezium을 잘 활용하면 하나의 트랜잭션에서 처리가 불가능하던 작업을 (예를 들어 database commit 이후 kafka로 메시지를 보낸다는 등) 쉽게 수행할 수 있을 거란 생각이 들었습니다. 

Reference 

 

참고 블로그: https://www.baeldung.com/debezium-intro

728x90

'Open Source > Debezium' 카테고리의 다른 글

[Debezium] Debezium 이란?  (0) 2022.11.06
복사했습니다!