profile image

L o a d i n g . . .

728x90

Spring Batch 애플리케이션을 작업하다보면 이 애플리케이션이 처리해야하는 데이터가 많아졌을 때 다음과 같은 고민을 어떻게 해결할 수 있을지 궁금했습니다. 

  • 기존 코드 변경을 최소화하며 단일 서버의 처리량을 최대한 높이는 방법이 없을까?
  • 동시성을 최소한으로 신경쓰고 처리량을 높일수는 없을까?

위 궁금증을 해소할 수 있는 방법을 찾던 중 spring batch의 partitioning 기능이 있다는 것을 알게됐습니다. 이번 포스팅에서는 위 궁금증을 spring batch에서 어떻게 해소해주는지 살펴보겠습니다. 코드는 아래 링크에서 확인해주세요. Main 브랜치에 partitioning을 적용하고 no-partitioning 브랜치는 일반적인 batch를 구현하였습니다. 

 

GitHub - seonwoo960000/spring-batch-partitioning

Contribute to seonwoo960000/spring-batch-partitioning development by creating an account on GitHub.

github.com

 

배치 Partitioning 이란 

스프링 배치에서 partitioning이란 하나의 배치 잡을 분할(partition)하여 다중 스레드에서 실행할 수 있도록하는  기능입니다. 예를 들어 2023년 1월 1일부터 2023년 12월 30일까지 일자별로 1건의 데이터가 존재한다고 가정하겠습니다. 배치 partitioning을 통해 1월, 2월, 3월 ... 12월로 partitioning을 적용해서 월별 집계를 병렬로 실행할 수 있습니다. 

 

기본 Batch Job 

DDL

사용할 데이터베이스 스키마는 위와 같습니다. 구현하고자하는 배치는 product를  월별로 집계하여 product_monthly를 만드는 것입니다. 스키마 및 더미 데이터 생성은 resources/sql/ddl.sql, resources/sql/dml.sql을 참고하세요. 

Entity 

@Entity
@Table(name = "product")
class Product(
    @Id
    @Column(name = "id", nullable = false, columnDefinition = "binary(16)")
    var id: UUID = UUID.randomUUID(),
    @Column(name = "date", nullable = false)
    val date: String,
    @Column(name = "price", nullable = false)
    val price: Long = 0,
) {
    override fun toString(): String {
        return "Product(id=$id, date='$date', price=$price)"
    }

    fun month(): String {
        return date.substring(0, 7)
    }
}

@Entity
@Table(name = "product_monthly")
class ProductMonthly(
    @Id
    @Column(name = "id", nullable = false, columnDefinition = "binary(16)")
    var id: UUID = UUID.randomUUID(),
    @Column(name = "`month`", nullable = false)
    val month: String,
    @Column(name = "price", nullable = false)
    var price: Long = 0,
): Serializable {
    companion object {
        fun default(month: String) = ProductMonthly(id = UUID.randomUUID(), month = month, price = 0L)
    }

    fun add(productMonthly: Product) {
        val month = productMonthly.date.substring(0, 7)
        if (this.month != month) {
            // @formatter:off
            throw IllegalArgumentException("Different month: ${this.month} != $month")
        }
        this.price += productMonthly.price
    }

    override fun toString(): String {
        return "ProductMonthly(id=$id, month='$month', price=$price)"
    }
}

 

Reader 

open class ProductMonthlyAggregationReaderFactory(
    private val entityManagerFactory: EntityManagerFactory,
    private val start: LocalDate,
    private val end: LocalDate,
    private val pageSize: Int = 1000
) {

    fun productMonthlyAggregationReader(): JpaPagingItemReader<Product> {
        val startDate = start.formatLocalDate();
        val endDate = end.formatLocalDate();
        println("Reader ----> startDate: $startDate, endDate: $endDate")

        return JpaPagingItemReaderBuilder<Product>()
            .name("productMonthlyAggregationReader")
            .entityManagerFactory(entityManagerFactory)
            .queryString("""
                SELECT p 
                FROM Product p 
                WHERE p.date BETWEEN :startDate AND :endDate
                ORDER BY p.id
                """)
            .parameterValues(mapOf("startDate" to startDate, "endDate" to endDate))
            .pageSize(pageSize)
            .build()
    }
}

Reader 구현체로는 JpaPagingItemReader을 활용하겠습니다. 자동으로 pagination을 적용하고 jpa를 사용할 수 있기 때문에 선택하였습니다. 위 코드를 간단히 설명드리면 product 테이블에서 pagination을 적용해 데이터를 조회합니다. WHERE 조건은 jobParameter으로 넘겨주는 startDate와 endDate를 사용할 예정입니다. 

 

Writer 

open class ProductMonthlyAggregationWriter : ItemWriter<Product> {
    private val productMonthlies = mutableMapOf<String, ProductMonthly>()

    override fun write(chunk: Chunk<out Product>) {
        chunk.groupBy { it.month() }
            .map { (yearMonth, products) ->
                // Uncomment below code to test spring batch job restarting failed step logic
                // if (yearMonth == "2023-01") {
                //     throw IllegalArgumentException("damn!!")
                // }

                val key = ProductMonthlyKeyUtils.productMonthlyKey(yearMonth)
                val productMonthly = productMonthlies.getOrPut(key) {
                        ProductMonthly.default(yearMonth)
                    }

                products.forEach { productMonthly.add(it) }
                println("Writer ----> yearMonth: $yearMonth")
            }
    }

    @AfterStep
    fun afterStep(stepExecution: StepExecution): ExitStatus {
        productMonthlies.keys.forEach {
            stepExecution.executionContext.put(it, productMonthlies[it])
        }
        return ExitStatus.COMPLETED
    }
}

class ProductMonthlyKeyUtils {
    companion object {
        const val PRODUCT_MONTHLY_KEY = "productMonthly-"

        fun productMonthlyKey(yearMonth: String): String {
            return PRODUCT_MONTHLY_KEY + yearMonth
        }

        fun productMonthlyKeysBetween(start: String, end: String): List<String> {
            var startMonth = YearMonth.parse(start)
            val endMonth = YearMonth.parse(end)

            val result = mutableListOf<String>()
            while (!startMonth.isAfter(endMonth)) {
                result.add(PRODUCT_MONTHLY_KEY + startMonth.toString())
                startMonth = startMonth.plusMonths(1)
            }

            return result
        }
    }
}

Writer는 월별로 product를 groupBy하여 집계합니다. 모든 step이 종료되면(@AfterStep) writer의 afterStep 메서드를 통해 stepExecution의 executionContext에 집계된 데이터(ProductMonthly)를 저장합니다. 

 

Listener 

open class ProductMonthlyAggregationListener(
    private val productMonthlyRepository: ProductMonthlyRepository
) : StepExecutionListener {
    private lateinit var startMonth: String
    private lateinit var endMonth: String

    override fun beforeStep(stepExecution: StepExecution) {
        val startDateParam = stepExecution.jobExecution.jobParameters.getLocalDate(Common.JOB_PARAMETERS_START_DATE)
        val endDateParam = stepExecution.jobExecution.jobParameters.getLocalDate(Common.JOB_PARAMETERS_END_DATE)
        if (startDateParam == null || endDateParam == null) {
            throw IllegalArgumentException("start date or end date is not provided in job parameters")
        }

        val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM")
        startMonth = startDateParam.format(dateTimeFormatter)
        endMonth = endDateParam.format(dateTimeFormatter)

        ProductMonthlyKeyUtils.productMonthlyKeysBetween(startMonth, endMonth)
            .forEach {
                stepExecution.executionContext.put(it, ProductMonthly.default(it))
            }
    }

    override fun afterStep(stepExecution: StepExecution): ExitStatus? {
        if (stepExecution.exitStatus.equals(ExitStatus.COMPLETED)) {
            ProductMonthlyKeyUtils.productMonthlyKeysBetween(startMonth, endMonth)
                .forEach {
                    productMonthlyRepository.save(stepExecution.executionContext[it]!! as ProductMonthly)
                }
        } else {
            logger.error("Step execution failed")
        }
        return super.afterStep(stepExecution)
    }
}

Listener는 다음과 같이 동작합니다. 

  • beforeStep: jobParameter을 통해 전달받은 startDate와 endDate를 참조해서 startDate와 endDate 사이에 존재하는 모든 월을 확인합니다. 그리고 월별로 ProductMonthly의 기본값(ProductMonthly.defualt(it))을 생성해서 stepExecution의 executionContext에 저장합니. 
  • afterStep: stepExecutionContext에 저장된 ProductMonthly 값을 조회하여 데이터베이스에 저장합니다. ProductMonthly 데이터는 Writer에 의해 집계 후 업데이트됩니다. 

 

JobConfiguration 

월별 집계 job을 정의하는 코드는 아래와 같습니다. 

@Configuration
class ProductMonthlyAggregationJobConfiguration(
    private val jobRepository: JobRepository,
    private val platformTransactionManager: PlatformTransactionManager,
    private val entityManagerFactory: EntityManagerFactory,
    private val productMonthlyRepository: ProductMonthlyRepository,
) {

    @Value("\${spring.batch.job.chunk-size:1000}")
    lateinit var chunkSize: Integer

    @Bean("productMonthlyAggregationJob")
    fun productMonthlyAggregationJob(
        @Qualifier("productMonthlyAggregationStep") productMonthlyAggregationStep: Step
    ): Job {
        return JobBuilder("productMonthlyAggregationJob", jobRepository)
            .start(productMonthlyAggregationStep)
            // comment below code to prevent running jobs with same jobParameters multiple times
            .incrementer(UniqueRunIdIncrementer())
            .build()
    }

    @JobScope
    @Bean("productMonthlyAggregationStep")
    fun productMonthlyAggregationStep(
        @Qualifier("productMonthlyAggregationReader") productMonthlyAggregationReader: JpaPagingItemReader<Product>,
        @Qualifier("productMonthlyAggregationWriter") productMonthlyAggregationWriter: ProductMonthlyAggregationWriter,
        @Qualifier("productMonthlyAggregationListener") productMonthlyAggregationListener: ProductMonthlyAggregationListener,
    ): Step {
        return StepBuilder("productMonthlyAggregationStep", jobRepository)
            .chunk<Product, Product>(chunkSize.toInt(), platformTransactionManager)
            .reader(productMonthlyAggregationReader)
            .writer(productMonthlyAggregationWriter)
            .listener(productMonthlyAggregationListener)
            .build()
    }

    @StepScope
    @Bean("productMonthlyAggregationReader")
    fun productMonthlyAggregationReader(
        @Value("#{jobParameters[${Common.JOB_PARAMETERS_START_DATE}]}")
        startDate: LocalDate,
        @Value("#{jobParameters[${Common.JOB_PARAMETERS_END_DATE}]}")
        endDate: LocalDate
    ): JpaPagingItemReader<Product> {
        return ProductMonthlyAggregationReaderFactory(
            entityManagerFactory,
            startDate,
            endDate,
            chunkSize.toInt()
        )
            .productMonthlyAggregationReader()
    }

    @StepScope
    @Bean("productMonthlyAggregationWriter")
    fun productMonthlyAggregationWriter(): ProductMonthlyAggregationWriter {
        return ProductMonthlyAggregationWriter()
    }

    @StepScope
    @Bean("productMonthlyAggregationListener")
    fun productMonthlyAggregationListener(): ProductMonthlyAggregationListener {
        return ProductMonthlyAggregationListener(productMonthlyRepository)
    }
}

 

테스트

위에서 정의한 job이 정상적으로 동작하는지 테스트해보겠습니다. 1월 1일부터 12월 31일까지 product 데이터를 모두 넣고 job을 실행시켜 집계를 수행하겠습니다. 

@SpringBootTest
@SpringBatchTest
@TestPropertySource(properties = ["spring.batch.job.chunk-size=100", "spring.batch.job.pool-size=3"])
@Sql("classpath:/sql/dml.sql")
class ProductMonthlyAggregationJobTest(
    @Qualifier("productMonthlyAggregationJob")
    @Autowired val productMonthlyAggregationJob: Job,
    @Autowired val jobLauncher: JobLauncher,
    @Autowired val jobRepository: JobRepository,
    @Autowired val jobRepositoryTestUtils: JobRepositoryTestUtils,
    @Autowired val productRepository: ProductRepository,
    @Autowired val productMonthlyRepository: ProductMonthlyRepository,
) {

    lateinit var jobLauncherTestUtils: JobLauncherTestUtils

    @BeforeEach
    fun setUp() {
        jobLauncherTestUtils = JobLauncherTestUtils()
        jobLauncherTestUtils.job = productMonthlyAggregationJob
        jobLauncherTestUtils.jobLauncher = jobLauncher
        jobLauncherTestUtils.jobRepository = jobRepository
    }

    @Test
    fun `productMonthlyAggregationJob test with default settings`() {
        val jobParameters = JobParametersBuilder()
            .addString(Common.JOB_PARAMETERS_START_DATE, "2023-01-01")
            .addString(Common.JOB_PARAMETERS_END_DATE, "2023-12-31")
            .toJobParameters()
        val jobExecution = jobLauncherTestUtils.launchJob(jobParameters) // 1
        assertThat(jobExecution.exitStatus).isEqualTo(ExitStatus.COMPLETED)
 
        val productsGroupedByMonth = productRepository.findAll().groupBy { it.month() } // 2
        val productMonthly = productMonthlyRepository.findAll().groupBy { it.month } // 3
        assertThat(productsGroupedByMonth.size).isEqualTo(productMonthly.size)
        productsGroupedByMonth.forEach { (month, products) ->
            assertThat(products.sumOf { it.price }).isEqualTo(productMonthly[month]?.sumOf { it.price })
        } // 4
    }

    @AfterEach
    fun cleanUp() {
        jobRepositoryTestUtils.removeJobExecutions()
    }
}
  1. job을 실행시킵니다 
  2. product 테이블에 저장된 모든 데이터를 조회해 월별로 group by를 수행합니다. 
  3. productMonthly 테이블에 저장된 모든 데이터를 조회해 월별로 group by를 수행합니다(월별로 1건씩만 존재합니다) 
  4. 2번과 3번에서 월별로 집계된 결과를 바탕으로 총 합을 비교합니다. 월별로 묶인 product의 price 총합과 productMonthly의 price 총합은 동일해야합니다. 

테스트를 실행하면 테스트가 정상적으로 완료된 것을 확인할 수 있습니다. 

Partitioning 

다음으로는 위 job에 spring batch partitioning을 적용해보겠습니다. Step을 어떻게 partitioning을 수행할지 결정하는 Partitioner을 다음과 같이 정의하겠습니다. 

open class ProductMonthlyAggregationPartitioner(
    private val jobParametersStartDate: LocalDate,
    private val jobParametersEndDate: LocalDate
) : Partitioner {
    override fun partition(gridSize: Int): MutableMap<String, ExecutionContext> {
        val partitionedExecutionContext = mutableMapOf<String, ExecutionContext>()
        partitionDates().forEachIndexed { index, partition ->
            val executionContext = ExecutionContext()
            executionContext.putString(STEP_EXECUTION_START_DATE, partition.first)
            executionContext.putString(STEP_EXECUTION_END_DATE, partition.second)
            partitionedExecutionContext["partition-$index"] = executionContext
        }
        return partitionedExecutionContext
    }

    fun partitionDates(): List<Pair<String, String>> {
        val start = jobParametersStartDate
        val end = jobParametersEndDate

        var current = start
        var partitionedDates = mutableListOf<Pair<String, String>>()
        while (current.isBefore(end) || current == end) {
            val yearMonth = YearMonth.from(current)
            val partitionStartDate = maxOf(start, yearMonth.atDay(1))
            val partitionEndDate = yearMonth.atEndOfMonth()

            partitionedDates.add(Pair(partitionStartDate.toString(), partitionEndDate.toString()))
            current = current.plusMonths(1)
        }

        return partitionedDates
    }
}

ProductMonthlyAggregationPartitioner는 jobParameter을 통해 전달받은 startDate와 endDate를 활용하여 월별로 처리할 데이터를 분할합니다. 예를들어 startDate가 2023-01-01이고 endDate가 2023-12-31이면 아래와 같이 분할됩니다. 분할된 범위는 각각 ExecutionContext에 담기며 최종적으로 partitionedExecutionContext에 저장됩니다. Spring batch는 partitioner에 의해 분할된 범위를 각각의 step에 전달하게됩니다. 

Reader, Writer 

Reader와 Writer의 코드는 변경이 필요하지 않습니다. 

Listener 

open class ProductMonthlyAggregationListener(
    private val productMonthlyRepository: ProductMonthlyRepository
) : StepExecutionListener {
    private lateinit var startMonth: String
    private lateinit var endMonth: String

    override fun beforeStep(stepExecution: StepExecution) {
        if (!stepExecution.executionContext.containsKey(Common.STEP_EXECUTION_START_DATE) ||
            !stepExecution.executionContext.containsKey(Common.STEP_EXECUTION_END_DATE)) {
            throw IllegalArgumentException("start date or end date is not provided in job parameters")
        }

        startMonth = stepExecution.executionContext[Common.STEP_EXECUTION_START_DATE].toString().substring(0, 7)
        endMonth = stepExecution.executionContext[Common.STEP_EXECUTION_END_DATE].toString().substring(0, 7)

        ProductMonthlyKeyUtils.productMonthlyKeysBetween(startMonth, endMonth)
            .forEach {
                stepExecution.executionContext.put(it, ProductMonthly.default(startMonth))
            }
    }

    override fun afterStep(stepExecution: StepExecution): ExitStatus? {
        if (stepExecution.exitStatus.equals(ExitStatus.COMPLETED)) {
            ProductMonthlyKeyUtils.productMonthlyKeysBetween(startMonth, endMonth)
                .forEach {
                    productMonthlyRepository.save(stepExecution.executionContext[it]!! as ProductMonthly)
                }
        } else {
            logger.error("Step execution failed")
        }
        return super.afterStep(stepExecution)
    }
}

기존 코드에서 Listener는 jobParameter에서 startDate와 endDate를 추출하였습니다. 하지만 partitioner에의해 step이 분할되고 우리는 stepExecution의 executionContext에 startDate와 endDate를 저장했기 때문에 listener의 코드 변경이 필요합니다. 수정된 listener는 stepExecution의 executionContext를 활용해서 startDate와 endDate를 조회합니다. 나머지 로직은 동일합니다. 

 

JobConfiguration 

@Configuration
class ProductMonthlyAggregationJobConfiguration(
    private val jobRepository: JobRepository,
    private val platformTransactionManager: PlatformTransactionManager,
    private val entityManagerFactory: EntityManagerFactory,
    private val productMonthlyRepository: ProductMonthlyRepository,
) {

    @Value("\${spring.batch.job.chunk-size:1000}")
    lateinit var chunkSize: Integer

    @Value("\${spring.batch.job.pool-size:6}")
    lateinit var poolSize: Integer

    @Bean("productMonthlyAggregationJob")
    fun productMonthlyAggregationJob(
        @Qualifier("productMonthlyAggregationStep.manager") productMonthlyAggregationStepManager: Step
    ): Job {
        return JobBuilder("productMonthlyAggregationJob", jobRepository)
            .start(productMonthlyAggregationStepManager)
            // comment below code to prevent running jobs with same jobParameters multiple times
            .incrementer(UniqueRunIdIncrementer())
            .build()
    }

    // (1) 
    @Bean("productMonthlyAggregationTaskPool")
    fun executor(): TaskExecutor { 
        val executor = ThreadPoolTaskExecutor()
        executor.corePoolSize = poolSize.toInt()
        executor.maxPoolSize = poolSize.toInt()
        executor.setThreadNamePrefix("partition-thread")
        executor.setWaitForTasksToCompleteOnShutdown(Boolean.TRUE)
        executor.initialize()
        return executor
    }

    // (2) 
    @JobScope
    @Bean("productMonthlyAggregationStep.manager")
    fun productMonthlyAggregationStepManager(
        @Qualifier("productMonthlyAggregationPartitioner") partitioner: ProductMonthlyAggregationPartitioner,
        @Qualifier("productMonthlyAggregationReader") productMonthlyAggregationReader: JpaPagingItemReader<Product>,
        @Qualifier("productMonthlyAggregationWriter") productMonthlyAggregationWriter: ProductMonthlyAggregationWriter,
        @Qualifier("productMonthlyAggregationListener") productMonthlyAggregationListener: ProductMonthlyAggregationListener,
        @Qualifier("productMonthlyAggregationTaskPool") executor: TaskExecutor
    ): Step {
        val productMonthlyAggregationStep = productMonthlyAggregationStep(
            productMonthlyAggregationReader,
            productMonthlyAggregationWriter,
            productMonthlyAggregationListener
        )
        val partitionHandler = productMonthlyAggregationPartitionHandler(productMonthlyAggregationStep)
        return StepBuilder("productMonthlyAggregationStep.manager", jobRepository)
            .partitioner("productMonthlyAggregationStep", partitioner)
            .step(productMonthlyAggregationStep)
            .partitionHandler(partitionHandler)
            .build()
    }

    // (3) 
    fun productMonthlyAggregationPartitionHandler(
        productMonthlyAggregationStep: Step,
    ): TaskExecutorPartitionHandler {
        val partitionHandler = TaskExecutorPartitionHandler()
        partitionHandler.setTaskExecutor(executor())
        partitionHandler.step = productMonthlyAggregationStep
        partitionHandler.gridSize = poolSize.toInt()
        return partitionHandler
    }

    fun productMonthlyAggregationStep(
        productMonthlyAggregationReader: JpaPagingItemReader<Product>,
        productMonthlyAggregationWriter: ProductMonthlyAggregationWriter,
        productMonthlyAggregationListener: ProductMonthlyAggregationListener,
    ): Step {
        return StepBuilder("productMonthlyAggregationStep", jobRepository)
            .chunk<Product, Product>(chunkSize.toInt(), platformTransactionManager)
            .reader(productMonthlyAggregationReader)
            .writer(productMonthlyAggregationWriter)
            .listener(productMonthlyAggregationListener)
            .build()
    }

    // (4) 
    @StepScope
    @Bean("productMonthlyAggregationReader")
    fun productMonthlyAggregationReader(
        @Value("#{stepExecutionContext[${Common.STEP_EXECUTION_START_DATE}]}")
        stepExecutionStartDate: LocalDate,
        @Value("#{stepExecutionContext[${Common.STEP_EXECUTION_END_DATE}]}")
        stepExecutionEndDate: LocalDate
    ): JpaPagingItemReader<Product> {
        return ProductMonthlyAggregationReaderFactory(
            entityManagerFactory,
            stepExecutionStartDate,
            stepExecutionEndDate,
            chunkSize.toInt()
        )
            .productMonthlyAggregationReader()
    }

    @StepScope
    @Bean("productMonthlyAggregationWriter")
    fun productMonthlyAggregationWriter(): ProductMonthlyAggregationWriter {
        return ProductMonthlyAggregationWriter()
    }

    @StepScope
    @Bean("productMonthlyAggregationListener")
    fun productMonthlyAggregationListener(): ProductMonthlyAggregationListener {
        return ProductMonthlyAggregationListener(productMonthlyRepository)
    }

    // (5) 
    @JobScope
    @Bean("productMonthlyAggregationPartitioner")
    fun productMonthlyAggregationPartitioner(
        @Value("#{jobParameters[${Common.JOB_PARAMETERS_START_DATE}]}")
        startDate: LocalDate,
        @Value("#{jobParameters[${Common.JOB_PARAMETERS_END_DATE}]}")
        endDate: LocalDate
    ): ProductMonthlyAggregationPartitioner {
        return ProductMonthlyAggregationPartitioner(startDate, endDate)
    }
}

(1) taskExecutor 

Partitioning된 step을 병렬로 실행할 때 사용할 taskExecutor을 정의합니다. 

 

(2) productMonthlyAggregationStepManager 

Partitioning을 수행할 step manager을 정의합니다. 

 

(3) productMonthlyAggregationPartitionHandler 

Partitioning을 수행할 step과 사용할 taskExecutor 및 gridSize를 지정합니다. gridSize란 동시에 실행하고자하는 StepExecution의 수를 지정하는데 사용됩니다. TaskExecutor와 poolSize를 동일하게 설정해서 사용하도록 하겠습니다. 

 

(4) productMonthlyAggregationReader 

각 reader에서 사용할 분할된 startDate와 endDate는 ProductMonthlyAggregationPartitioner에 의해 stepExecution의 executionContext에 저장했습니다. 따라서 jobParameters가 아닌 stepExecutionContext에서 startDate와 endDate를 조회해야합니다. 

 

(5) productMonthlyAggregationPartitioner 

JobParameter으로 전달받은 startDate와 endDate을 참조해 partitioning을 수행할 ProductMonthlyAggregationPartitioner을 생성합니다. 

 

변경점 

  • Partition을 담당할 partitioner을 정의하고 이를 job 설정에 추가 
  • Partitioner에 의해 stepExecution의 executionContext에 저장되는 데이터를 참조해야한다면 `@Value("#{stepExecutionContext[...]}")`를 활용

테스트 

Partitioning을 적용하지 않았을 때 사용한 테스트코드를 사용하겠습니다. 

gridSize == 3 

gridSize == 3인 경우 3개의 stepExecution이 동시에 실행됩니다(threadPoolSize도 3으로 gridSize와 동일하므로 3개의 stepExecution이 병렬적으로 실행될 수 있습니다) 

gridSize == 6 

6개의 stepExecution이 동시에 실행되는 것을 확인할 수 있습니다. 

 

결론 

Spring batch의 partitioning기능을 사용하면 단일 스레드에서 실행하는 것보다 더 빠르게 결과를 얻을 수 있습니다(병렬적으로 실행되기 때문입니다). Spring batch의 기능 중 partitioning 이외에도 다중 스레드를 활용해서 step을 병렬적으로 실행하는 방법이 있습니다. 하지만 이는 스레드간 공유하는 자원이 있을 때 동기화를 수행해야하기 때문에 partitioning에 비해 실행속도가 느릴 수 있습니다(partitioning은 step에서 처리할 데이터를 적절히 분할해서 step간 간섭없이 데이터를 처리할 수 있기 때문에 스레드간 동기화를 신경쓸 필요가 비교적 적습니다). 

728x90
복사했습니다!