Skip to content

Latest commit

ย 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

README.md

๐Ÿ“” ๋ชฉ์ฐจ



๐Ÿค” Batch ๋ž€?

์ •ํ•ด์ง„ ์‹œ๊ฐ„์— ์ผ๊ด„์ ์œผ๋กœ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•ด์ฃผ ํ”„๋กœ๊ทธ๋žจ์œผ๋กœ ์ฃผ๋กœ ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃฌ๋‹ค.
ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ชจ์•„์„œ ์ฒ˜๋ฆฌํ•˜๊ฑฐ๋‚˜, ์ผ์ • ์‹œ๊ฐ„ ๋’ค์— ์ฒ˜๋ฆฌํ•˜๊ณ ์ž ํ•  ๋•Œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๊ณ ,๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃฐ ๋•Œ ํŠธ๋ž˜ํ”ฝ์ด ์ ์€ ์‹œ๊ฐ„๋Œ€์— ์„œ๋ฒ„ ๋ฆฌ์†Œ์Šค๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•œ๋‹ค.
(์ฃผ๋กœ ETL:Extract-Transform-Load, ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ์ดํ„ฐ ์›จ์–ดํ•˜์šฐ์Šค์— ์ €์žฅ.)

๐Ÿค” Spring Batch?

์ž๋ฐ” ๊ธฐ๋ฐ˜ ํ‘œ์ค€ ๋ฐฐ์น˜ ๊ธฐ์ˆ ์˜ ๋ถ€์žฌ๋กœ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ์—์„œ ์š”๊ตฌํ•˜๋Š” ์žฌ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์ž๋ฐ” ๊ธฐ๋ฐ˜ ๋ฐฐ์น˜ ์•„ํ‚คํ…์ฒ˜ ํ‘œ์ค€์˜ ํ•„์š”์„ฑ์ด ๋Œ€๋‘๋˜์—ˆ๊ณ ,
Accenture์—์„œ ์†Œ์œ ํ•˜๊ณ  ์žˆ๋˜ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ ์•„ํ‚คํ…์ฒ˜ ํ”„๋ ˆ์ž„์›ค๋ฅด๋ฅผ ์Šคํ”„๋ง ๋ฐฐ์น˜ ํ”„๋กœ์ ํŠธ์— ๊ธฐ์ฆํ•˜์˜€๋‹ค.

๊ฐ€๋ณ๊ณ  ๋‹ค์–‘ํ•œ ๊ธฐ๋Šฅ์„ ๊ฐ€์ง„ ๋ฐฐ์น˜ ํ”„๋ ˆ์ž„์›Œํฌ๋กœ, ๊ฒฌ๊ณ ํ•œ ๋ฐฐ์น˜ ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜ ๊ฐœ๋ฐœ์ด ๊ฐ€๋Šฅํ•˜๋„๋ก ๋””์ž์ธ ๋˜์–ด์žˆ๋‹ค.
์ตœ๊ทผ ๊ธฐ์—… ์‹œ์Šคํ…œ ์šด์˜์— ํ•„์ˆ˜์ ์ด๋ผ๊ณ  ํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ธฐ์กด Spring ํ”„๋กœ์ ํŠธ์˜ ๋ชจ๋“ˆ์„ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ์žฅ์ ์„ ๊ฐ€์ง„๋‹ค.(์ƒˆ๋กœ์šด ์–ธ์–ด๋กœ ์ฒ˜๋ฆฌ๋ฅผ ์ƒˆ๋กœ ๊ตฌํ˜„ํ•˜์ง€ ์•Š์•„๋„ ๋œ๋‹ค.)
๋ฐฐ์น˜ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ๋กœ์ง€์„ ์ƒˆ๋กœ ๋งŒ๋“ค์ง€ ์•Š๊ณ  ์Šคํ”„๋ง ๋ฐฐ์น˜์—์„œ ์ œ๊ณตํ•˜๋Š” ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

  • ๐Ÿ‘† ๋ฐฐ์น˜์˜ ํ•ต์‹ฌ ํŒจํ„ด

    • Read: DB, ํŒŒ์ผ, ํ ๋“ฑ์—์„œ ๋‹ค๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๋Š”๋‹ค.
    • Process: ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€๊ณตํ•œ๋‹ค.
    • Write: ๊ฐ€๊ณต๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ์ €์žฅํ•œ๋‹ค.
  • ๐Ÿ‘† ๋ฐฐ์น˜ ์‹œ๋‚˜๋ฆฌ์˜ค

    • ๋ฐฐ์น˜ ํ”„๋กœ์„ธ์Šค๋ฅผ ์ฃผ๊ธฐ์ ์œผ๋กœ ์ปค๋ฐ‹ํ•œ๋‹ค.(ํšจ์œจ์ ์ธ ์ปค๋ฐ‹ ์ „๋žต.)
    • ๋™์‹œ ๋‹ค๋ฐœ์ ์ธ Job ์˜ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ, ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ.
    • ์‹คํŒจ ํ›„ ์Šค์ผ€์ค„๋ง์— ์˜ํ•ด ์žฌ์‹œ์ž‘๋œ๋‹ค.
    • ์˜์กด๊ด€๊ณ„๊ฐ€ ์žˆ๋Š” step๋“ค์„ ์ˆœ์ฐจ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค.
    • ์กฐ๊ฑด์— ๋”ฐ๋ผ ํ๋ฆ„์„ ๊ตฌ์„ฑํ•˜๋Š” ๋“ฑ ์ฒด๊ณ„์ ์ด๊ณ  ์œ ์—ฐํ•œ ๋ฐฐ์น˜ ๋ชจ๋ธ์„ ๊ตฌ์„ฑํ•œ๋‹ค.
    • ๋ฐ˜๋ณตํ•˜๊ฑฐ๋‚˜, ์žฌ์‹œ๋„, Skip ์ฒ˜๋ฆฌ(์ค‘์š”ํ•˜์ง€ ์•Š์€ ์˜ˆ์™ธ๋ฅผ ์Šคํ‚ต, ๊ณ„์† ์‹คํ–‰๋  ์ˆ˜ ์žˆ๋„๋ก)๋“ฑ..

๐Ÿ“Œ Spring Batch ์•„ํ‚คํ…์ณ

img_1.png

  • JobLauncher: Job์„ ์‹คํ–‰์‹œํ‚ค๋Š” ์ปดํฌ๋„ŒํŠธ
  • Job: ๋ฐฐ์น˜ ์ž‘์—….
  • JobRepository: Job์˜ ์‹คํ–‰๊ณผ Job, Step์„ ์ €์žฅ.
  • Step: ๋ฐฐ์น˜ ์ž‘์—…์˜ ๋‹จ๊ณ„. ItemReader, ItemProcessor, ItemWriter๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๊ณ , ์ฒ˜๋ฆฌํ•˜๊ณ , ์“ฐ๋Š” ๊ตฌ์„ฑ์„ ํ•˜๋‚˜์”ฉ ๊ฐ€์ง„๋‹ค.

img_2.png

  • Application

    ๋น„์ฆˆ๋‹ˆ์Šค, ์„œ๋น„์Šค ๋กœ์ง, Core, Infrastructure์„ ์ด์šฉํ•˜์—ฌ ๋ฐฐ์น˜ ๊ธฐ๋Šฅ์„ ๋งŒ๋“ ๋‹ค.
    ๊ฐœ๋ฐœ์ž๋Š” ์—…๋ฌด ๋กœ์ง์˜ ๊ตฌํ˜„์—๋งŒ ์ง‘์ค‘ํ•˜๊ณ  ๊ณตํ†ต์ ์ธ ๊ธฐ์ˆ  ๊ธฐ๋ฐ˜์€ ํ”„๋ ˆ์ž„ ์›Œํฌ๊ฐ€ ๋‹ด๋‹นํ•˜๋„๋ก ํ•œ๋‹ค.

  • Core

    ๋ฐฐ์น˜ ์ž‘์—…์„ ์‹œ์ž‘ํ•˜๊ณ  ์ œ์–ดํ•˜๋Š” ํ•„์ˆ˜ ํด๋ž˜์Šค(Job, Step, JobLauncher, Flow)
    Job์„ ์‹คํ–‰ํ•˜๊ณ  ๋ชจ๋‹ˆํ„ฐ๋ง, ๊ด€๋ฆฌํ•˜๋Š” API๋กœ ๊ตฌ์„ฑ๋˜์–ด ์žˆ๋‹ค.

  • Infrastructure

    ์™ธ๋ถ€์™€ ์ƒํ˜ธ์ž‘์šฉํ•˜๋Š” ๋ ˆ์ด์–ด, (ItemReader, ItemWriter, RetryTemplate, Skip)
    Application, Core ๋ชจ๋‘ ๊ณตํ†ต Infrastructure ์œ„์—์„œ ๋นŒ๋“œํ•œ๋‹ค. Job ์‹คํ–‰์˜ ํ๋ฆ„๊ณผ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ํ‹€์„ ์ œ๊ณตํ•œ๋‹ค.

    ์‹ค์ œ๋กœ ํŒจํ‚ค์ง€ ๊ตฌ์กฐ๋ฅผ ์—ด์–ด ํ™•์ธํ•ด ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

  • ๐Ÿง Job

    img_3.png

    • ์ „์ฒด ๋ฐฐ์น˜ ํ”„๋กœ์„ธ์Šค๋ฅผ ์บก์Аํ™”ํ•œ ๋„๋ฉ”์ธ์œผ๋กœ, Step์˜ ์ˆœ์„œ๋ฅผ ์ •์˜ํ•œ๋‹ค.
    • JobParameters ๋ฅผ ๋ฐ›๋Š”๋‹ค.
    • JobParameters๋ฅผ ๋ฐ›์•„ JobInstance๊ฐ€ ์ƒ์„ฑ๋˜๊ณ , JobExecution์œผ๋กœ ๋‚˜๋ˆ„์–ด์ ธ ์‹คํ–‰๋œ๋‹ค.
  • ๐Ÿง Step

    img_4.png

    • ์ž‘์—…์˜ ์ฒ˜๋ฆฌ ๋‹จ์œ„.
    • Chunk | Tasklet ๊ธฐ๋ฐ˜์œผ๋กœ ํ•˜๋‚˜์˜ ํŠธ๋žœ์žญ์…˜์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•œ๋‹ค.
    • commitInterval ๋งŒํผ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ๊ณ , ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•œ ๋’ค, ChunkSize ๋งŒํผ ํ•œ๋ฒˆ์— Write ํ•œ๋‹ค.

๐Ÿ“Œ Meta Data Schema

img_5.png

์Šคํ”„๋ง ๋ฐฐ์น˜๊ฐ€ ์‹คํ–‰ ๋ฐ ๊ด€๋ฆฌ๋ฅผ ์œ„ํ•œ ๋ชฉ์ ์œผ๋กœ ์—ฌ๋Ÿฌ ๋„๋ฉ”์ธ(Job, Step, Execution, Instance JobParams ...) ์˜ ์ •๋ณด๋ฅผ ์ €์žฅํ•  ์ˆ˜ ์žˆ๋Š” ์Šคํ‚ค๋งˆ๋ฅผ ์ œ๊ณตํ•œ๋‹ค.
Job ์˜ ์ด๋ ฅ(์„ฑ๊ณต, ์‹คํŒจ), ํŒŒ๋ผ๋ฏธํ„ฐ ๋“ฑ ์‹คํ–‰ ๊ฒฐ๊ณผ๋ฅผ ์กฐํšŒํ•  ์ˆ˜ ์žˆ๋‹ค. -> ๋ฆฌ์Šคํฌ ๋ฐœ์ƒ์‹œ ๋น ๋ฅธ ๋Œ€์ฒ˜ ๊ฐ€๋Šฅ.

DB์™€ ์—ฐ๋™ํ•  ๊ฒฝ์šฐ ํ•„์ˆ˜์ ์œผ๋กœ ๋ฉ”ํƒ€ ํ…Œ์ด๋ธ”์ด ์ƒ์„ฑ๋˜์–ด์•ผ ํ•˜๋ฉฐ ์Šคํ‚ค๋งˆ ํŒŒ์ผ์˜ ์œ„์น˜๋Š” /org/springframework/batch/core/schema-*.sql ์ด๋‹ค.(DB ์œ ํ˜•๋ณ„๋กœ ์ œ๊ณต)

  • ๐Ÿง ํ…Œ์ด๋ธ”

    • BATCH_JOB_INSTANCE

      Job ์ด ์‹คํ–‰๋  ๋•Œ JobInstance ์ •๋ณด๊ฐ€ ์ €์žฅ๋˜๋ฉฐ, job_name๊ณผ job_key๋กœ ํ•˜์—ฌ ํ•˜๋‚˜์˜ ๋ฐ์ดํ„ฐ๊ฐ€ ์ €์žฅ๋œ๋‹ค (์ธ์Šคํ„ด์Šค๋Š” ์œ ์ผ)

      • version: ์—…๋ฐ์ดํŠธ ๋งˆ๋‹ค 1์”ฉ ์ฆ๊ฐ€ํ•˜๋Š” ๊ฐ’
      • job_name: job์„ ๊ตฌ์„ฑํ•  ๋•Œ ๋ถ€์—ฌํ•œ ์ด๋ฆ„.
      • job_key: name ๊ณผ parmas๋ฅผ ํ•ฉ์ณ ํ•ด์‹ฑํ•œ ๊ฐ’
    • BATCH_JOB_EXECUTION

      Job์˜ ์‹คํ–‰ ์ •๋ณด(์ƒ์„ฑ, ์‹œ์ž‘, ์ข…๋ฃŒ ์‹œ๊ฐ„, ์‹คํ–‰ ์ƒํƒœ, ์ข…๋ฃŒ ์ฝ”๋“œ, ์‹คํŒจ ์›์ธ ๋ฉ”์‹œ์ง€, ๋งˆ์ง€๋ง‰ ์‹คํ–‰ ์‹œ์  ๋“ฑ)

    • BATCH_JOB_EXECUTION_PARAMS

      Job๊ณผ ํ•จ๊ป˜ ์‹คํ–‰๋˜๋Š” JobParams ์ •๋ณด๋ฅผ ์ €์žฅ.

      • type_cd : String, Long, Date ๋“ฑ์˜ ํƒ€์ž… ์ •๋ณด
      • key_name: ํŒŒ๋ผ๋ฏธํ„ฐ ํ‚ค ๊ฐ’.
      • string_val: ํŒŒ๋ผ๋ฏธํ„ฐ ๋ฌธ์ž ๊ฐ’
      • data_val: ํŒŒ๋ผ๋ฏธํ„ฐ ๋‚ ์งœ ๊ฐ’.
      • long_val
      • double_val
      • identifying: ์‹๋ณ„ ์—ฌ๋ถ€ (boolean)
    • BATCH_JOB_EXECUTION_C์œ ONTEXT

      Job์˜ ์‹คํ–‰๋™์•ˆ ์—ฌ๋Ÿฌ๊ฐ€์ง€ ์ƒํƒœ์ •๋ณด, ๊ณต์œ  ๋ฐ์ดํ„ฐ๋ฅผ JSON ํ˜•์‹์œผ๋กœ ์ง๋ ฌํ™”ํ•˜์—ฌ ์ €์žฅํ•œ๋‹ค. Step๊ฐ„์˜ ๊ณต์œ ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.

      • short_context: job์˜ ์‹คํ–‰ ์ƒํƒœ์ •๋ณด, ๊ณต์œ ๋ฐ์ดํ„ฐ ๋“ฑ์˜ ์ •๋ณด๋ฅผ ๋ฌธ์ž์—ด๋กœ ์ €์žฅ
      • serialized_context: ์ง๋ ฌํ™” ๋œ ์ „์ฒด ์ปจํ…์ŠคํŠธ
    • BATCH_STEP_EXECUTION

      • Step์˜ ์‹คํ–‰ ์ •๋ณด(์ƒ์„ฑ, ์‹œ์ž‘, ์ข…๋ฃŒ ์‹œ๊ฐ„, ์‹คํ–‰ ์ƒํƒœ, ์ข…๋ฃŒ ์ฝ”๋“œ, ์‹คํŒจ ์›์ธ ๋ฉ”์‹œ์ง€, ๋งˆ์ง€๋ง‰ ์‹คํ–‰ ์‹œ์  ๋“ฑ)
      • ๋ถ€๋ชจ(Job)์˜ ID
      • ํŠธ๋žœ์žญ์…˜๋‹น Commit, Read, Write, Filter, Read skip, Write skip, ProcessSkip, Rollback ์ˆ˜
    • BATCH_STEP_EXECUTION_CONTEXT

      Job์˜ ๊ฒฝ์šฐ์™€ ๋™์ผํ•˜์ง€๋งŒ, Step ๋ณ„๋กœ ์ €์žฅ๋˜๋ฉฐ Step๊ฐ„ ๊ณต์œ ํ•  ์ˆ˜ ์—†๋‹ค.

    ํ…Œ์ด๋ธ”๊ฐ„์˜ ๊ด€๊ณ„(1:N) ์— ์ฃผ์˜ํ•˜์—ฌ ์‚ดํŽด๋ณด์ž.

  • ๐Ÿง ์Šคํ‚ค๋งˆ ์ƒ์„ฑ ์„ค์ •

    • ์ˆ˜๋™ ์ƒ์„ฑ: ์ฟผ๋ฆฌ ๋ณต์‚ฌ ํ›„ ์ง์ ‘ ์ƒ์„ฑ.
    • ์ž๋™ ์ƒ์„ฑ: properties ์—์„œ spring.batch.jdbc.initialize-schema ์„ค์ •.
      • ALWAYS

        ์Šคํฌ๋ฆฝํŠธ ํ•ญ์ƒ ์‹คํ–‰, RDBMS ์„ค์ •์ด ๋˜์–ด์žˆ์„ ๊ฒฝ์šฐ ๋‚ด์žฅ DB๋ณด๋‹ค ์šฐ์„ ์ ์œผ๋กœ ์‹คํ–‰ํ•œ๋‹ค.

      • EMBEDDED

        ๋‚ด์žฅ DB ์ผ๋•Œ๋งŒ ์‹คํ–‰๋œ๋‹ค. (๊ธฐ๋ณธ๊ฐ’)

      • NEVER
        • ์Šคํฌ๋ฆฝํŠธ๋ฅผ ํ•ญ์ƒ ์‹คํ–‰ํ•˜์ง€ ์•Š๋Š”๋‹ค. ํ…Œ์ด๋ธ”์ด ์—†๋‹ค๊ฑฐ๋‚˜ ๋‚ด์žฅ DB ๋ผ๋ฉด ์˜ค๋ฅ˜ ๋ฐœ์ƒ.
        • ์šด์˜์—์„œ ์ˆ˜๋™์œผ๋กœ ์Šคํฌ๋ฆฝํŠธ ์ƒ์„ฑ ํ›„ ์„ค์ •ํ•˜๋Š” ๊ฒƒ์„ ๊ถŒ์žฅํ•œ๋‹ค.

๐Ÿ“Œ Spring Boot๋กœ Spring Batch ์‹œ์ž‘ํ•˜๊ธฐ

  • ์˜์กด์„ฑ ์ถ”๊ฐ€

    implementation 'org.springframework.boot:spring-boot-starter-batch'
    testImplementation 'org.springframework.batch:spring-batch-test'
  • @EnableBatchProcessing

    @EnableBatchProcessing
    @SpringBootApplication
    public class SpringBatchApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringBatchApplication.class, args);
        }
    
    }
    • ์Šคํ”„๋ง ๋ฐฐ์น˜๋ฅผ ์ž‘๋™์‹œํ‚ค๊ธฐ ์œ„ํ•ด ์„ ์–ธํ•˜๋Š” ์• ๋…ธํ…Œ์ด์…˜์œผ๋กœ, ์ด 4๊ฐœ์˜ ์„ค์ • ํด๋ž˜์Šค๋ฅผ ์‹คํ–‰์‹œํ‚ค๋ฉฐ ์Šคํ”„๋ง ๋ฐฐ์น˜์˜ ๋ชจ๋“  ์ดˆ๊ธฐํ™” ๋ฐ ์‹คํ–‰ ๊ตฌ์„ฑ์ด ์ด๋ฃจ์–ด์ง„๋‹ค.

    • ์Šคํ”„๋ง ๋ถ€ํŠธ ๋ฐฐ์น˜์˜ ์ž๋™์„ค์ • ํด๋ž˜์Šค๊ฐ€ ์‹คํ–‰๋˜์–ด ๋“ฑ๋ก๋œ ๋ชจ๋“  Job์„ ๊ฒ€์ƒ‰ํ•˜์—ฌ ์ดˆ๊ธฐํ™”ํ•˜๊ณ  ๋™์‹œ์— Job ์„ ์ˆ˜ํ–‰ํ•˜๋„๋ก ๊ตฌ์„ฑํ•œ๋‹ค.

    • ๐Ÿง ์Šคํ”„๋ง ๋ฐฐ์น˜ ์„ค์ • ํด๋ž˜์Šค

      • BatchAutoConfiguration

        ์Šคํ”„๋ง ๋ฐฐ์น˜๊ฐ€ ์ดˆ๊ธฐํ™” ๋  ๋•Œ ์ž๋™์œผ๋กœ ์‹คํ–‰, Job์„ ์ˆ˜ํ–‰ํ•˜๋Š” JobLauncherApplicationRunner ๋นˆ์„ ์ƒ์„ฑํ•œ๋‹ค.(ApplicationRunner๋ฅผ ๊ตฌํ˜„ํ–ˆ๊ธฐ ๋–„๋ฌธ์— ์Šคํ”„๋ง์ด ์‹คํ–‰์‹œํ‚จ๋‹ค.)

      • SimpleBatchConfiguration

        • JobBuilderFactory ์™€ StepBuilderFactory๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
        • ์Šคํ”„๋ง ๋ฐฐ์น˜์˜ ์ฃผ์š” ๊ตฌ์„ฑ ์š”์†Œ๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.(ํ”„๋ก์‹œ ๊ฐ์ฒด๋กœ ์ƒ์„ฑ๋œ๋‹ค.) - jobRepository, jobLauncher, hobRegistry, jobExplorer
      • BatchConfigurerConfiguration

        • BasicBatchConfigurer

          SimpleBatchConfiguration ์—์„œ ์ƒ์„ฑํ•œ ํ”„๋ก์‹œ ๊ฐ์ฒด์˜ ์‹ค์ œ ํƒ€๊ฒŸ์„ ์ƒ์„ฑํ•˜๋Š” ์„ค์ • ํด๋ž˜์Šค.

        • JpaBatchConfigurer

          JPA ๊ด€๋ จ ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•˜๋Š” ์„ค์ • ํด๋ž˜์Šค.

  • ๐Ÿง Tasklet ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•œ ๊ฐ„๋‹จํ•œ ๋ฐฐ์น˜ ํ”„๋กœ๊ทธ๋žจ

    @RequiredArgsConstructor
    @Configuration
    public class JobConfig {
    
        // #1
        private final JobBuilderFactory jobBuilderFactory; 
        private final StepBuilderFactory stepBuilderFactory;
    
        @Bean
        public Job myJob() {
            return jobBuilderFactory.get("myJob") // #2
                .start(myStep())
                .next(myStep2())
                .build();
        }
    
        @Bean
        public Step myStep() {
            return stepBuilderFactory.get("myStep1") // #2
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("================ My Step1 =============");
                    return RepeatStatus.FINISHED; // #3
                })
                .build();
        }
    
        @Bean
        public Step myStep2() {
            return stepBuilderFactory.get("muStep2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("================ My Step2 =============");
                    return RepeatStatus.FINISHED;
                })
                .build();
        }
    }
    • ๋ชจ๋“  Job๊ณผ Step์€ ๋นˆ์œผ๋กœ ๋“ฑ๋ก๋˜์–ด์•ผ ํ•œ๋‹ค.

    • (#1): Job, Step์„ ์ƒ์„ฑํ•˜๋Š” ๋นŒ๋” ํŒฉํ†น๋ฆฌ

    • (#2): Job, Step์˜ ์ด๋ฆ„์„ ์ง€์ •ํ•ด์ค€๋‹ค.

    • (#3): tasklet์€ ๊ธฐ๋ณธ์ ์œผ๋กœ ๋ฌดํ•œ๋ฐ˜๋ณตํ•œ๋‹ค. ๋•Œ๋ฌธ์— ์ด์™€ ๊ฐ™์€ ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜์—ฌ ํ•œ๋ฒˆ ์‹คํ–‰ ํ›„ ์ข…๋ฃŒํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•œ๋‹ค.(๋ฐ˜๋ณต false)

    • ๊ฒฐ๊ณผ img.png


๐Ÿ“Œ ๋„๋ฉ”์ธ์˜ ์ดํ•ด

๐Ÿง Job

Job Configuration์— ์˜ํ•ด ์ƒ์„ฑ๋˜๋Š” ๊ฐ์ฒด ๋‹จ์œ„๋กœ, ๋ฐฐ์น˜ ๊ณ„์ธต ๊ตฌ์กฐ์—์„œ ๊ฐ€์žฅ ์ƒ์œ„์— ์žˆ๋Š” ๊ฐœ๋…์ด๋ฉฐ ํ•˜๋‚˜์˜ ๋ฐฐ์น˜์ž‘์—… ์ž์ฒด์— ํ•ด๋‹นํ•œ๋‹ค.(์ตœ์ƒ์œ„ ์ธํ„ฐํŽ˜์ด์Šค)
๋ฐฐ์น˜ ์ž‘์—…์„ ์–ด๋–ป๊ฒŒ ๊ตฌ์„ฑํ•˜๊ณ  ์‹คํ–‰ํ• ์ง€๋ฅผ ์„ค์ •ํ•˜๊ณ  ๋ช…์„ธํ•ด ๋†“์€ ๊ฐ์ฒด๋กœ ์—ฌ๋Ÿฌ step์„ ํฌํ•จํ•˜๋Š” ์ปจํ…Œ์ด๋„ˆ ๋กœ์„œ์˜ ์—ญํ• ์„ ํ•œ๋‹ค. (1๊ฐœ ์ด์ƒ์˜ Step)

  • ๐Ÿ‘† ๊ตฌํ˜„์ฒด (AbstractJab์„ ๊ตฌํ˜„)

    - name : Job ์ด๋ฆ„
    - restartable: ์žฌ์‹œ์ž‘ ์—ฌ๋ถ€ ๊ธฐ๋ณธ๊ฐ’ true
    - JobRepository: ๋ฉ”ํƒ€๋ฐ์ด๋” ์ €์žฅ์†Œ
    - JobExecutionListener: Job ์ด๋ฒคํŠธ ๋ฆฌ์Šค๋„ˆ
    - JobParametersIncrementer: JobParameter ์ฆ๊ฐ€๊ธฐ
    - JobParametersValidator: JobParameter ๊ฒ€์ฆ๊ธฐ
    - SimpleStepHandler: Step์„ ์‹คํ–‰ํ•˜๋Š” ํ•ธ๋“ค๋Ÿฌ.
    
    • SimpleJob
      • ์ˆœ์ฐจ์ ์œผ๋กœ Step์„ ์‹คํ–‰์‹œํ‚ค๋Š” Job์œผ๋กœ, ํ‘œ์ค€ ๊ธฐ๋Šฅ์„ ๊ฐ€์ง€๊ณ  ์žˆ๋‹ค.(steps๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ์Œ)
    • FlowJob
      • ํŠน์ • ์กฐ๊ฑด๊ณผ ํ๋ฆ„์— ๋”ฐ๋ผ Step์„ ๊ตฌ์„ฑํ•˜๋Š” Job์œผ๋กœ, Flow ๊ฐ์ฒด๋ฅผ ์‹คํ–‰์‹œ์ผœ ์ž‘์—…์„ ์ง„ํ–‰ํ•œ๋‹ค.

    JobLauncher์˜ run(job, jobParameters) ๋ฉ”์„œ๋“œ์—์„œ job์„ ๋ฐ›์•„ ์‹คํ–‰์‹œํ‚ค๊ฒŒ ๋˜๋Š”๋ฐ, job.execute(execution)๋กœ step์„ ํ•˜๋‚˜ํ•˜๋‚˜ ์‹คํ–‰์‹œํ‚จ๋‹ค.
    ๊ตฌํ˜„์ฒด์ธ SimpleJobLauncher ์ฝ”๋“œ๋ฅผ ๋ณด๋ฉด jobRepository์—์„œ ํ•ด๋‹น ์žก์˜ ๋งˆ์ง€๋ง‰ Execution์„ ๊ฐ€์ ธ์™€ ์ƒํƒœ๋ฅผ ํ™•์ธํ•œ ํ›„ ์ƒˆ๋กœ์šด JobExecution์„ ์ƒ์„ฑํ•˜๊ณ ,์ƒ์„ฑ๋œ JobExecution์œผ๋กœ Job์„ ์‹คํ–‰ํ•œ๋‹ค. Job์˜ execute(AbstractJob ์˜) ์—์„œ๋Š” ๊ตฌํ˜„์ฒด์˜ doExecute()๋ฅผ ํ˜ธ์ถœํ•˜๊ณ , ํ•ด๋‹น ๋ฉ”์„œ๋“œ์—์„œ handleStep(step, jobExecution)์„ ์‹คํ–‰์‹œํ‚จ๋‹ค.

    handleStep ์—์„œ๋„ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ AbstractStep ์˜ execute ๋ฅผ ํ˜ธ์ถœํ•˜๊ณ , ๊ตฌํ˜„์ฒด์˜ doExecute๊ฐ€ ํ˜ธ์ถœ๋œ๋‹ค.

๐Ÿง JobInstance

img_3.png

Job์ด ์‹คํ–‰๋  ๋•Œ ์ƒ์„ฑ๋˜๋Š” ๋…ผ๋ฆฌ์  ์‹คํ–‰ ๋‹จ์œ„ ๊ฐ์ฒด๋กœ ๊ณ ์œ ํ•˜๊ฒŒ ์‹๋ณ„ ๊ฐ€๋Šฅํ•œ ์ž‘์—… ์‹คํ–‰์„ ๋‚˜ํƒ€๋‚ธ๋‹ค.
๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค(BATCH_JOB_INSTANCE)์— ์ €์žฅํ•˜๊ธฐ ์œ„ํ•ด ์ƒ์„ฑ๋˜๋Š” ์ธ์Šคํ„ด์Šค์ด๋‹ค.

์ฒ˜์Œ ์‹œ์ž‘ํ•˜๋Š” Job + JobParameter์˜ ๊ตฌ์„ฑ์ผ ๊ฒฝ์šฐ ์ƒˆ๋กœ์šด JobInstance๋ฅผ ์ƒ์„ฑํ•˜๊ณ , ์ด์ „๊ณผ ๋™์ผํ•œ ๊ตฌ์„ฑ์ด๋ผ๋ฉด ์ด๋ฏธ ์กด์žฌํ•˜๋Š” JobInstance๋ฅผ ๋ฆฌํ„ดํ•œ๋‹ค.
(๋™์ผํ•œ ๊ตฌ์„ฑ์œผ๋กœ ์‹คํ–‰ํ•  ์ˆ˜ ์—†์–ด ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๊ณ  Job์˜ ์‹คํ–‰์„ ์ค‘๋‹จํ•œ๋‹ค ) A job instance already exists and is complete for parameters={ ... }
์‹คํ–‰๋œ ํŒŒ๋ผ๋ฏธํ„ฐ๋Š” BATCH_JOB_EXECUTION_PARAMS์—์„œ ํ™•์ธํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ ๋‚ด๋ถ€์ ์œผ๋กœ๋Š” job_name + params_key ์˜ ํ•ด์‹œ๊ฐ’์„ ๊ฐ€์ง€๊ณ  ์ธ์Šคํ„ด์Šค ๊ฐ์ฒด๋ฅผ ์‹๋ณ„ํ•œ๋‹ค.

๐Ÿง JobParameter

Job์„ ์‹คํ–‰ํ•  ๋•Œ ํ•จ๊ป˜ ์‚ฌ์šฉ๋˜๋Š” ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ๊ฐ€์ง„ ๋„๋ฉ”์ธ ๊ฐ์ฒด๋กœ, ํ•˜๋‚˜์˜ JobInstance๋ฅผ ๊ตฌ๋ถ„ํ•˜๊ธฐ ์œ„ํ•œ ์šฉ๋„๋กœ ์‚ฌ์šฉ๋œ๋‹ค.

  • JobParameters: LinkedHashMap<String, Parameter>๋ฅผ ๋ฉค๋ฒ„๋ณ€์ˆ˜๋กœ ๊ฐ€์ง€๋Š” Wrapper ํด๋ž˜์Šค.

  • JobParameter: Object parameter, ParameterType parameterType, boolean identifying

  • ParameterType: String, Date, Long, Double

  • JobParameter์˜ ์ƒ์„ฑ๊ณผ ๋ฐ”์ธ๋”ฉ

    • ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜ ์‹คํ–‰์‹œ ์˜ต์…˜์œผ๋กœ ์ฃผ์ž….
      • Java -jar batch.jar name=user1 seq(long)=2L date(date)=2022/03/28 weight(double)=70.5
    • ์ฝ”๋“œ์—์„œ ์ƒ์„ฑ
      • JobParameterBuilder, DefaultJobParametersConverter
        JobParameters jobParameters = new JobParametersBuilder()
        .addString("name", "kim2")
        .addLong("seq", 1L)
        .addDate("data", new Date())
        .addDouble("weight", 70.5)
        .toJobParameters();
    • SpEL ์ด์šฉ
      • @Value("#{jobParameter[requestDate]}")
  • JobParameter ๊บผ๋‚ด๊ธฐ

    // StepContribution์—์„œ ๊บผ๋‚ด๊ธฐ
    JobParameters jobParameters = contribution.getStepExecution().getJobExecution().getJobParameters();
    jobParameters.getParameters() // Map<String, parameter>
    jobParameters.getString("key");
    jobParameters.getDate("key");
    jobParameters.getLong("key");
    jobParameters.getDouble("key");
        
    // ChunkContext ์—์„œ ๊บผ๋‚ด๊ธฐ
    Map<String, Object> chunkJobParameters = chunkContext.getStepContext().getJobParameters();

๐Ÿง JobExecution

JobInstance์— ๋Œ€ํ•œ ํ•œ๋ฒˆ์˜ ์‹œ๋„๋ฅผ ์˜๋ฏธํ•˜๋Š” ๊ฐ์ฒด๋กœ, ์‹คํ–‰ ์ค‘์— ๋ฐœ์ƒํ•œ ์ •๋ณด๋“ค์„ ์ €์žฅํ•˜๊ณ  ์žˆ๋Š” ๊ฐ์ฒด์ด๋‹ค.

  • ์‹œ์ž‘ ์‹œ๊ฐ„, ์ข…๋ฃŒ์‹œ๊ฐ„, ์ƒํƒœ(์‹œ์ž‘?, ์™„๋ฃŒ?, ์‹คํŒจ?), ์ข…๋ฃŒ์ƒํƒœ

JobExecution์˜ ์‹คํ–‰ ๊ฒฐ๊ณผ๊ฐ€ COMPLETED ์ด๋ฉด ์ธ์Šคํ„ด์Šค์˜ ์‹คํ–‰์ด ์™„๋ฃŒ๋œ ๊ฒƒ์œผ๋กœ ๊ฐ„์ฃผํ•ด์„œ ์žฌ ์‹คํ–‰ํ•  ์ˆ˜ ์—†๋‹ค.
FAILED๋ผ๋ฉด, ์‹คํ–‰์ด ์™„๋ฃŒ๋˜์ง€ ์•Š์€ ๊ฒƒ์ด๋ฏ€๋กœ ์žฌ์‹คํ–‰์ด ๊ฐ€๋Šฅํ•˜๋‹ค.(JobParameter๊ฐ€ ๊ฐ™๋”๋ผ๋„) ์ฆ‰, ์‹คํ–‰ ๊ฒฝ๊ณผ๊ฐ€ COMPLETED๊ฐ€ ๋  ๋•Œ๊นŒ์ง€ ์‹คํ–‰์ด ๊ฐ€๋Šฅํ•˜๋‹ค.
(ํ•œ Instance ๋‚ด์—์„œ ์—ฌ๋Ÿฌ๋ฒˆ์˜ ์‹œ๋„๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Œ, JobInstance์™€ N:1)

img_2.png

๋™์ผํ•œ Job Instance์— ๋Œ€ํ•ด ์„ฑ๊ณปํ•  ๋•Œ๊นŒ์ง€ Execution์ด ์ƒ์„ฑ๋จ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

๐Ÿง Step

Batch Job์„ ๊ตฌ์„ฑํ•˜๋Š” ๋…๋ฆฝ์ ์ธ ํ•˜๋‚˜์˜ ๋‹จ๊ณ„๋กœ, ์‹ค์ œ ๋ฐฐ์น˜๋Ÿด ์ฒ˜๋ฆฌํ•˜๋Š” ๋ชจ๋“  ์ •๋ณด๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ๋Š” ๋„๋ฉ”์ธ ๊ฐ์ฒด์ด๋‹ค.
๋ฐฐ์น˜์ž‘์—…์„ ์–ด๋–ป๊ฒŒ ๊ตฌ์„ฑํ•˜๊ณ  ์‹คํ–‰ํ•  ๊ฒƒ์ธ์ง€ ์„ธ๋ถ€์ž‘์—…์„ Task ๊ธฐ๋ฐ˜์œผ๋กœ ์„ค์ •ํ•˜๊ณ  ๋ช…์„ธํ•ด ๋†“์€ ๊ฐ์ฒด.

  • ๐Ÿ‘† ํ•„๋“œ

    • name
    • startLimit: ์‹คํ–‰ ์ œํ•œ ํšŸ์ˆ˜.
    • allowStartIfComplete: ์™„๋ฃŒ ํ›„ ์žฌ์‹คํ–‰ ๊ฐ€๋Šฅ์—ฌ๋ถ€.
    • stepExecutionListener: ์ด๋ฒคํŠธ ๋ฆฌ์Šค๋„ˆ.
    • jobRepository: ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ €์žฅ.
  • ๐Ÿ‘† ๊ตฌํ˜„์ฒด

    • TaskletStep: ๊ฐ€์žฅ ๊ธฐ๋ณธ์ ์ธ ๊ตฌํ˜„์ฒด, Taklet ํƒ€์ž…์˜ ๊ตฌํ˜„์ฒด๋ฅผ ์ œ์–ดํ•œ๋‹ค.
    • PartitionStep: ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ ๋ฐฉ์‹์œผ๋กœ ์Šคํ…์„ ์—ฌ๋Ÿฌ๊ฐœ๋กœ ๋ถ„๋ฆฌ ์‹คํ–‰ํ•œ๋‹ค.
    • JobStep: Step ๋‚ด์—์„œ Job์„ ์‹คํ–‰ํ•œ๋‹ค.( Job -> Step -> Job .. )
    • FlowStep: Step ๋‚ด์—์„œ Flow๋ฅผ ์‹คํ–‰ํ•˜๋„๋ก ํ•œ๋‹ค.

Step์„ ์‹คํ–‰์‹œํ‚ค๋Š” execute(StepExecution)๊ฐ€ ์žˆ๊ณ , StepExecution์—๋Š” ์‹คํ–‰ ๊ฒฐ๊ณผ์˜ ์ƒํƒœ๊ฐ€ ์ €์žฅ๋œ๋‹ค.

  • ๐Ÿ‘† API

    • Tasklet ์ง์ ‘ ์ƒ์„ฑ
      stepBuilderFactory.get("myStep1")
               .tasklet(myTasklet())
               .build();
    • ChunkOrientedTasklet
      stepBuilderFactory.get("myStep3")
              .<String, String>chunk(100) // <input, output>
              .reader(reader())
              .processor(processor())
              .writer(writer())
              .build();
    • JobStep
      stepBuilderFactory.get("jobStep")
              .job(myJob())
              .launcher(jobLauncher)
              .parametersExtractor(jobParametersExtractor())2
              .build();
    • FlowStep
      stepBuilderFactory.get("jobStep")
              .flow(myFlow())
              .build();

๐Ÿง StepExecution

img_1.png

  • Step์— ๋Œ€ํ•œ ํ•œ๋ฒˆ์˜ ์‹œ๋„๋ฅผ ์˜๋ฏธํ•˜๋Š” ๊ฐ์ฒด๋กœ ์‹คํ–‰์ค‘ ๋ฐœ์ƒํ•œ ์ •๋ณด๋“ค์„ ์ €์žฅํ•˜๊ณ  ์žˆ๋Š” ๊ฐ์ฒด. (์‹œ์ž‘,์ข…๋ฃŒ ์‹œ๊ฐ„, ์ƒํƒœ, commit count, rollback count ...)
  • Job์ด ์žฌ์‹œ์ž‘ ๋˜๋”๋ผ๋„ ์ด๋ฏธ ์„ฑ๊ณต์ ์œผ๋กœ ์™„๋ฃŒ๋œ Step์€ skip ํ•˜๊ณ , ์‹คํŒจํ–ˆ๋˜ Step๋งŒ ์‹คํ–‰๋œ๋‹ค.(allowStartIfComplete ๋กœ ์„ค์ • ๊ฐ€๋Šฅ.)
  • ๋ชจ๋“  StepExecution์ด ์„ฑ๊ณตํ•ด์•ผ JobExecution๋„ ์„ฑ๊ณต์œผ๋กœ ๋๋‚œ๋‹ค.

๐Ÿง StepContribution

  • ์ฒญํฌ ํ”„๋กœ์„ธ์Šค์˜ ๋ณ€๊ฒฝ ์‚ฌํ•ญ์„ ์ €์žฅํ•ด๋’€๋‹ค๊ฐ€ StepExecution์˜ ์ƒํƒœ๋ฅผ ์—…๋ฐ์ดํŠธ ํ•˜๋Š” ๋„๋ฉ”์ธ ๊ฐ์ฒด์ด๋‹ค.

  • ์ฒญํฌ ์ปค๋ฐ‹ ์ง์ „์— StepExecution์˜ apply()๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ƒํƒœ๋ฅผ ์—…๋ฐ์ดํŠธ ํ•œ๋‹ค.

  • ์‚ฌ์šฉ์ž ์ •์˜ ExitStatus๋ฅผ ์ง€์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

  • ๐Ÿ‘† ํ•„๋“œ

    • stepExecution
    • read, write, filter(ItemProcessor์— ์˜ํ•ด ํ•„ํ„ฐ๋ง๋œ) count
    • parent(StepExecution), read, write, process SkipCount
    • ExitStatus

    TaskletStep -> StepExecution -> StepContribution ์ˆœ์œผ๋กœ ์ƒ์„ฑ๋˜๊ณ ,
    chunkOrientedTasklet๊ณผ ๊ฐ™์€ ๊ตฌํ˜„์ฒด์—์„œ ์‹คํ–‰๋œ ItemReader, Processor, Writer ์˜ ์ƒํƒœ๋“ค์ด StepContribution์— ์ €์žฅ๋œ๋‹ค.
    ๊ทธ๋ฆฌ๊ณ , ์ตœ์ข…์ ์œผ๋กœ ์ปค๋ฐ‹๋˜๊ธฐ ์ „์— StepExecution์— ์ €์žฅํ•ด๋’€๋˜ ์ƒํƒœ๋ฅผ ์—…๋ฐ์ดํŠธ ํ•œ๋‹ค.

๐Ÿง ExecutionContext

Step, Job Execution ๊ฐ์ฒด์˜ ์ƒํƒœ๋ฅผ ์ €์žฅํ•˜๋Š” ๊ณต์œ  ๊ฐ์ฒด๋กœ key:value ์Œ์œผ๋กœ ๋œ ์ปฌ๋ ‰์…˜์ด๋ฉฐ DB์— ์ง๋ ฌํ™” ํ•œ ๊ฐ’์œผ๋กœ ์ €์žฅ๋˜๊ฒŒ ๋œ๋‹ค.

  • StepExecution ์˜ ๊ฐ’์€ Step ๊ฐ„ ๊ณต์œ  ๋ถˆ๊ฐ€๋Šฅ.

  • JobExecution ์˜ ๊ฐ’์€ Job ๊ฐ„ ๊ณต์œ ๋Š” ์•ˆ๋˜์ง€๋งŒ, Job์˜ Step๊ฐ„ ๊ณต์œ ๋Š” ๊ฐ€๋Šฅํ•˜๋‹ค.(ํ•„์š”ํ•œ ์ •๋ณด๋ฅผ ์ €์žฅํ•ด๋’€๋‹ค ๊บผ๋‚ด์“ฐ๊ธฐ์— ์œ ์šฉํ•  ๊ฒƒ ๊ฐ™๋‹ค)

    Job ์žฌ์‹œ์ž‘์‹œ ์ด๋ฏธ ์ฒ˜๋ฆฌํ•œ ๋ฐ์ดํ„ฐ๋ฅผ Skipํ•˜๊ณ  ์ˆ˜ํ–‰ํ•  ๋•Œ ํ•ด๋‹น ์ƒํƒœ ์ •๋ณด๋ฅผ ํ™œ์šฉํ•œ๋‹ค.

  • ExecutionContext ๊ฐ€์ ธ์˜ค๊ธฐ.

    ExecutionContext jobExecutionContext = contribution.getStepExecution().getJobExecution().getExecutionContext();
    ExecutionContext stepExecutionContext = chunkContext.getStepContext().getStepExecution().getExecutionContext();

    ChunkContext, Contribution ๊ฐ์ฒด ๋‘˜๋‹ค์—์„œ ๊ฐ€์ ธ์˜ค๋Š” ๊ฒƒ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.
    get, put ๋ฉ”์„œ๋“œ๋Š” ExecutionContext์˜ Map<String,Object> ์—์„œ ๊ฐ’์„ ๋„ฃ๊ณ , ๊ฐ€์ ธ์˜ค๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค. ์ปค๋ฐ‹ ์‹œ์ ์— DB์— ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•œ๋‹ค.
    JobInstance ๊ฐ€ ๋™์ผํ•˜๊ณ , ์ด์ „ ์‹คํ–‰์ด COMPLETED ์ƒํƒœ๊ฐ€ ์•„๋‹ˆ๋ผ๋ฉด ์ด์ „๊นŒ์ง€์˜ ExecutionContext์— ์ €์žฅ๋œ ๊ฐ’์„ ๋ถˆ๋Ÿฌ์˜จ ํ›„, ๋‚˜๋จธ์ง€ Step์„ ๋‹ค์‹œ ์‹คํ–‰ํ•œ๋‹ค.

  • getJob(Step)ExecutionContext?

    Map<String, Object> jobExecutionContext = chunkContext.getStepContext().getJobExecutionContext();
    Map<String, Object> stepExecutionContext = chunkContext.getStepContext().getStepExecutionContext();

์ƒ๊ธฐ์˜ getJobExecutionContext, getStepExecutionContext๋Š” ExecutionContext๋ฅผ ๊ฐ€์ ธ์˜ค๋Š” ๊ฒƒ์ด ์•„๋‹Œ ์ €์žฅ๋˜์–ด ์žˆ๋Š” ๊ฐ’์„ ๋ณต์‚ฌํ•ด ๋Œ๋ ค์ฃผ๋Š” ๋ฉ”์„œ๋“œ์ด๋‹ค.
์‹ค์ œ๋กœ ๋ฉ”์„œ๋“œ๋ฅผ ์‚ดํŽด๋ณด์•˜์„ ๋•Œ Map์„ ๋งŒ๋“ค์–ด ๋‚ด์šฉ์„ ๋ณต์‚ฌํ•˜๊ณ  ์ด๋ฅผ unmodifiableMap ์œผ๋กœ ๋Œ๋ ค์คŒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์—ˆ๋‹ค.

๐Ÿง JobRepository

๋ฐฐ์น˜ ์ž‘์—… ์ค‘์˜ ์ •๋ณด๋ฅผ ์ €์žฅํ•˜๋Š” ์ €์žฅ์†Œ๋กœ, ๋ฐฐ์น˜ ์ž‘์—…์˜ ์ˆ˜ํ–‰๊ณผ ๊ด€๋ จ๋œ ๋ชจ๋“  ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•œ๋‹ค.
JobLauncher, Job, Step ๊ตฌํ˜„์ฒด ๋‚ด๋ถ€์—์„œ CRUD ๊ธฐ๋Šฅ์„ ์ฒ˜๋ฆฌํ•œ๋‹ค.

  • ๐Ÿ‘† ์ฃผ์š” ๋ฉ”์„œ๋“œ

    • isJobInstanceExist(jobName, jobParameters)
    • createJobExecution(jobName, jobParameters)
    • getLastJobExecution(jobName, jobParameters)
    • getLastStepExecution(jobInstance, stepName)
    • update(jobExecution): Job์˜ ์‹คํ–‰ ์ •๋ณด ์—…๋ฐ์ดํŠธ
    • update(stepExecution)
    • add(stepExecution): ์‹คํ–‰ ์ค‘์ธ Step์˜ ์ƒˆ๋กœ์šด stepExecution ์ €์žฅ.
    • updateExecutionContext(jobExecution)
    • updateExecutionContext(stepExecution)

@EnableBatchProcessing ์• ๋…ธํ…Œ์ด์…˜์„ ์„ ์–ธํ•˜๋ฉด JobRepository๊ฐ€ ์ž๋™์œผ๋กœ ๋นˆ์œผ๋กœ ๋“ฑ๋ก๋œ๋‹ค.
BatchConfigurer ์ธํ„ฐํŽ˜์ด์Šค๋‚˜ ๊ตฌํ˜„์ด๋‹ค BasicBatchConfigurer๋ฅผ ์ƒ์†ํ•˜์—ฌ jobRepository๋ฅผ ์ปค์Šคํ…€ ํ•˜๋Š” ๊ฒƒ์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

  • JDBC

    JDBC ๋ฐฉ์‹์œผ๋กœ ์„ค์ •ํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” JobRepositoryFactoryBean์„ ์‚ฌ์šฉํ•˜๋Š”๋ฐ, AOP ๋ฐฉ์‹์œผ๋กœ ํŠธ๋žœ์žญ์…˜ ์ฒ˜๋ฆฌ๊ฐ€ ์ด๋ฃจ์–ด์ง„๋‹ค. ๊ฒฉ๋ฆฌ ๋ ˆ๋ฒจ์€ ๊ธฐ๋ณธ์ ์œผ๋กœSERIALIZEBLE์ด๊ณ , ๋‹ค๋ฅธ ๋ ˆ๋ฒจ๋กœ ๋ณ€๊ฒฝ ๊ฐ€๋Šฅํ•˜๋‹ค.
    ํ…Œ์ด๋ธ”์˜ ๊ธฐ๋ณธ prefix๋Š” "BATCH_"์ด๋ฉฐ ๋ณ€๊ฒฝ ๊ฐ€๋Šฅํ•˜๋‹ค.

    @Configuration
    public class CustomBatchConfigurer extends BasicBatchConfigurer {
    
        private final DataSource dataSource;
    
        protected CustomBatchConfigurer(BatchProperties properties, DataSource dataSource,
            TransactionManagerCustomizers transactionManagerCustomizers) {
            super(properties, dataSource, transactionManagerCustomizers);
            this.dataSource = dataSource;
        }
    
        @Override
        protected JobRepository createJobRepository() throws Exception {
            JobRepositoryFactoryBean factoryBean = new JobRepositoryFactoryBean();
            factoryBean.setDataSource(dataSource); // ์„ค์ •ํ•˜์ง€ ์•Š์•„๋„ ๊ธฐ๋ณธ์ ์œผ๋กœ ์„ค์ • ๋จ.
            factoryBean.setTransactionManager(getTransactionManager()); // BasicBatchConfigurer์— ์žˆ๋Š” ๋ฉ”์„œ๋“œ
            factoryBean.setIsolationLevelForCreate("ISOLATION_READ_COMMITTED");
            factoryBean.setTablePrefix("LOG_BATCH");
    
            return factoryBean.getObject();
        }
    }
  • In Memory

    DB์˜ ์ €์žฅ๊นŒ์ง€๋Š” ํ•„์š”๊ฐ€ ์—†๋‹ค๋ฉด MapJobRepositoryFactoryBean์„ ์‚ฌ์šฉํ•˜์—ฌ ์ธ๋ฉ”๋ชจ๋ฆฌ๋กœ ์‚ฌ์šฉํ•  ์ˆ˜๋„ ์žˆ๋‹ค.

  • JobRepository ์—์„œ ๊ฐ’ ์กฐํšŒ

    JobExecution lastJobExecution = jobRepository.getLastJobExecution(jobName, jobParameters);
    if(lastJobExecution != null) {
        lastJobExecution.getStepExecutions()
            .forEach(s -> System.out.println(s.getExitStatus()));
    }

๐Ÿง JobLauncher

Job๊ณผ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ธ์ž๋กœ ๋ฐ›์œผ๋ฉฐ ๋ฐฐ์น˜ ์ž‘์—…์„ ์‹คํ–‰์‹œํ‚จ ํ›„ ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ JobExecution์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
์Šคํ”„๋ง ๋ถ€ํŠธ ๋ฐฐ์น˜๊ฐ€ ๊ตฌ๋™๋˜๋ฉด ์ž๋™์œผ๋กœ ๋นˆ์ด ์ƒ์„ฑ๋˜๊ธฐ ๋•Œ๋ฌธ์— ๋”ฐ๋กœ ๋งŒ๋“ค์–ด์ฃผ์ง€ ์•Š์•„๋„ ๋œ๋‹ค.

ApplicationRunner๋ฅผ ๊ตฌํ˜„ํ•œ JobLauncherApplicationRunner๊ฐ€ JobLauncher๋ฅผ ์ž๋™์œผ๋กœ ์‹คํ–‰์‹œํ‚ค๊ฒŒ ๋œ๋‹ค.
๋™๊ธฐ์ (SyncTaskExecutor), ๋น„๋™๊ธฐ์ (SimpleAsyncExecutor) ์‹คํ–‰์ด ๊ฐ€๋Šฅํ•˜๋ฉฐ ๊ธฐ๋ณธ๊ฐ’์€ ๋™๊ธฐ์  ์‹คํ–‰์ด๋‹ค.
๋‘ ๋ฐฉ์‹์˜ ์ฐจ์ด๋Š” ์–ธ์ œ JobExecution์„ ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ๋ฐ˜ํ™˜ํ•˜๋А๋ƒ์ด๋‹ค. ๋™๊ธฐ์  ๋ฐฉ์‹์€ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ๊ฐ€ ์ตœ์ข…์ ์œผ๋กœ ์™„๋ฃŒ๋˜๋ฉด ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ๋ฐ˜ํ™˜ํ•˜์ง€๋งŒ,
๋น„๋™๊ธฐ์  ์‹คํ–‰์—์„œ๋Š” JobExecution์„ ํš๋“ํ•˜๋ฉด ๋ฐ”๋กœ ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ๋ฐ˜ํ™˜ํ•œ๋‹ค.(ExitStatus.UNKNOWN)
๋™๊ธฐ์  ์‹คํ–‰์€ ์Šค์ผ€์ค„๋Ÿฌ์— ์˜ํ•œ ๋ฐฐ์น˜์ฒ˜๋ฆฌ์™€ ๊ฐ™์ด ๋ฐฐ์น˜์ฒ˜๋ฆฌ์‹œ๊ฐ„์ด ๊ธธ์–ด๋„ ์ƒ๊ด€ ์—†๋Š” ๊ฒฝ์šฐ์— ์ ํ•ฉํ•˜๊ณ , ๋น„ ๋™๊ธฐ์  ์‹คํ–‰์€ HTTP์š”์ฒญ์— ์˜ํ•œ ๋ฐฐ์น˜ ์ฒ˜๋ฆฌ์— ์ ํ•ฉํ•˜๋‹ค.

  • ๋น„ ๋™๊ธฐ์  ์‹คํ–‰
@RequiredArgsConstructor
@RestController
public class JobLauncherController {

private final Job job;
private final BasicBatchConfigurer basicBatchConfigurer;


@PostMapping("/batch")
public String launch(@RequestBody Member member)
    throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {

    SimpleJobLauncher jobLauncher = (SimpleJobLauncher) basicBatchConfigurer.getJobLauncher();
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());

    jobLauncher.run(job, new JobParametersBuilder()
        .addString("id", member.getId())
        .addDate("date", new Date())
        .toJobParameters());

    return "batch completed";
}

setTaskExecutor๋Š” JobLauncher์˜ ๋ฉ”์„œ๋“œ๊ฐ€ ์•„๋‹Œ SimpleJobLauncher์˜ ๋ฉ”์„œ๋“œ์ด๊ธฐ ๋•Œ๋ฌธ์— ๋นˆ์œผ๋กœ ์ฃผ์ž…๋ฐ›์•„ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†๋‹ค.
JobLauncher ์ธํ„ฐํŽ˜์ด์Šค๋กœ ์ฃผ์ž… ๋ฐ›๋”๋ผ๋„ ํ”„๋ก์‹œ ๊ฐ์ฒด์ด๊ธฐ ๋–„๋ฌธ์— SimpleJobLauncher ๋กœ์˜ ๊ฐ•์ œ ํ˜•๋ณ€ํ™˜ ๋˜ํ•œ ๋ถˆ๊ฐ€๋Šฅํ•˜๋‹ค.
๋•Œ๋ฌธ์— BasicBatchConfigurer์—์„œ ํ”„๋ก์‹œ๊ฐ€ ์•„๋‹Œ ์‹ค์ œ ๊ฐ์ฒด๋ฅผ ๊ฐ€์ ธ์™€ ํƒ€์ž… ์บ์ŠคํŒ…์„ ํ•ด์ค€๋‹ค.


๐Ÿ“Œ ๋ฐฐ์น˜ ์„ค์ •

๐Ÿ‘† JobLauncherApplicationRunner

  • ApplicationRunner์˜ ๊ตฌํ˜„์ฒด๋กœ BatchAutoConfifuration์—์„œ ์ƒ์„ฑ๋œ๋‹ค.
  • ๊ธฐ๋ณธ์ ์œผ๋กœ ๋นˆ์œผ๋กœ ๋“ฑ๋ก๋œ ๋ชจ๋“  job์„ ์‹คํ–‰์‹œํ‚จ๋‹ค.(ํŠน์ • job๋งŒ ์‹คํ–‰ํ•˜๋„๋ก ์„ค์ •๋„ ๊ฐ€๋Šฅ.)

๐Ÿ‘† BatchProperties

  • ํ™˜๊ฒฝ ์„ค์ • ํด๋ž˜์Šค๋กœ job ์ด๋ฆ„, ์Šคํ‚ค๋งˆ ์ดˆ๊ธฐํ™” ์„ค์ •, ํ…Œ์ด๋ธ” prefix ๋“ฑ ์„ค์ • ๊ฐ€๋Šฅ.
  • properties | yml ํŒŒ์ผ์— ์„ค์ • ๊ฐ€๋Šฅํ•˜๋‹ค.
    • batch.job.names, batch.initialize-schema: never | always | embedded, batch.tablePrefix: ...

๐Ÿ‘† Job ์‹คํ–‰ ์˜ต์…˜

  • Spring.batch.jhob.names: ${job.name:NONE} ์ง€์ •ํ•œ Job๋งŒ ์‹คํ–‰ํ•˜๋„๋ก ํ•œ๋‹ค.
    • NONE๋Š” ์ž„์˜์˜ ๋ฌธ์ž.
    • --job.name=name1, name2
    • properties๋ฅผ ์„ค์ •ํ•ด๋‘๊ณ  ์˜ต์…˜์„ ์ฃผ์ง€ ์•Š์œผ๋ฉด ์•„๋ฌด Job๋„ ์‹คํ–‰๋˜์ง€ ์•Š๋Š”๋‹ค.


๐Ÿ“Œ Job์˜ ์‹คํ–‰

๐Ÿง JobBuilderFactory

Job์„ ์‰ฝ๊ฒŒ ์ƒ์„ฑํ•˜๊ณ  ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋„๋ก util ์„ฑ๊ฒฉ์˜ ๋นŒ๋” ํด๋ž˜์Šค์ธ JobBuilderFactory๋ฅผ ์ œ๊ณตํ•œ๋‹ค.
JobBuilderFactory ์—์„œ๋Š” JobBuilder(SimpleJobBuilder, FlowBuilder)๋ฅผ ์ƒ์„ฑํ•˜์—ฌ Job์˜ ์ƒ์„ฑ์„ ์œ„์ž„ํ•œ๋‹ค.

img_5.png

  • SimpleJob์˜ ์ƒ์„ฑ

    JobBuilderFactory๋ฅผ ํ†ตํ•ด์„œ JobBuilder๋ฅผ ์ƒ์„ฑํ•˜๊ณ  start(step) ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด SimpleJobBuilder๊ฐ€ ์ƒ์„ฑ๋˜๊ณ  ์ตœ์ข…์ ์œผ๋กœ SimpleJob์ด ์ƒ์„ฑ๋œ๋‹ค.

  • FlowJob, Flow์˜ ์ƒ์„ฑ

    JobBuilder์—์„œ start(flow) ๋˜๋Š” flow(step)์„ ์‹คํ–‰ํ•˜๋ฉด FlowJobBuilder๋ฅผ ์ƒ์„ฑํ•˜๊ณ , ์ตœ์ข…์ ์œผ๋กœ FlowJob์ด ์ƒ์„ฑ๋œ๋‹ค.
    FlowJobBuilder ์—์„œ๋Š” ๋˜ ๋‚ด๋ถ€์ ์œผ๋กœ JobFlowBuilder -> FlowBuilder๋ฅผ ์ƒ์„ฑํ•˜๊ณ , ์—ฌ๊ธฐ์„œ Flow๋ฅผ ์ƒ์„ฑํ•˜๊ฒŒ ๋œ๋‹ค.

public JobBuilder get(String name) {
    JobBuilder builder = new JobBuilder(name).repository(jobRepository);
    return builder;
}

SimpleJobBuilder์™€ FlowJobBuilder๋Š” JobBuilderHelper ํด๋ž˜์Šค๋ฅผ ์ƒ์†ํ•˜๋ฉฐ, ํ•ด๋‹น ํด๋ž˜์Šค๋“ค์—์„œ ๋งŒ๋“ค์–ด์ง„ Job๋“ค์— SimpleJobRepository๊ฐ€ ์ „๋‹ฌ๋˜์–ด CRUD๋ฅผ ํ†ตํ•ด ๋ฉ”ํƒ€์ •๋ณด๋“ค์„ ๊ธฐ๋กํ•˜๊ฒŒ ๋œ๋‹ค.

๐Ÿง SimpleJob API

  • .start(), next()

    • start() ์—์„œ ์ฒ˜์Œ ์‹คํ–‰ํ•  step์„ ์„ค์ •ํ•˜๊ณ  SImpleJobBuilder๋ฅผ ์ƒ์„ฑ, ๋ฐ˜ํ™˜ํ•œ๋‹ค, ํ›„ next() ์—์„œ๋Š” ์ˆœ์ฐจ์ ์œผ๋กœ ์‹คํ–‰ํ•  step์„ ๋“ฑ๋กํ•œ๋‹ค.
  • .incrementer(JobParametersIncrementer): ํŒŒ๋ผ๋ฏธํ„ฐ์˜ ๊ฐ’์„ ์ž๋™์œผ๋กœ ์ฆ๊ฐ€ํ•ด์ฃผ๋Š” ์„ค์ •.

    • JobParameters์˜ ๊ฐ’์„ ์ฆ๊ฐ€์‹œ์ผœ ๋‹ค์Œ์— ์‚ฌ์šฉ๋  ๊ฐ’ ๊ฐ์ฒด๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
      • Long ๊ฐ’์„ ๋„ฃ์–ด์ฃผ๊ณ , ์‹คํ–‰ํ• ๋•Œ ๋งˆ๋‹ค ํ•ด๋‹น ๊ฐ’์„ ์ฆ๊ฐ€์‹œํ‚จ๋‹ค.
      • ๊ธฐ์กด์˜ ํŒŒ๋ผ๋ฏธํ„ฐ ๊ฐ’์—๋Š” ๋ณ€ํ™”๊ฐ€ ์—†์ง€๋งŒ ๋„ฃ์–ด์ค€ Long ๊ฐ’์ด ๋ณ€ํ•˜๊ธฐ ๋–„๋ฌธ์— ์—ฌ๋Ÿฌ๋ฒˆ ์‹คํ–‰ ๊ฐ€๋Šฅํ•˜๋‹ค.
    • RunIdIncrementer ๊ตฌํ˜„์ฒด๋ฅผ ์ง€์›ํ•˜๋ฉฐ, ํ•„์š”ํ•˜๋‹ค๋ฉด ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์ง์ ‘ ๊ตฌํ˜„ํ•˜์—ฌ ์ •์˜ํ•  ์ˆ˜ ์žˆ๋‹ค.
      @Override
      public JobParameters getNext(JobParameters parameters) {
          String date = new SimpleDateFormat("yyyyMMdd-hhmmss").format(new Date());
          return new JobParametersBuilder(parameters).addString("run.date", date).toJobParameters();
      }
      • ๐Ÿ’ก incrementer.getNext()๊ฐ€ ์ ์šฉ๋œ ํ›„ ApplicationRunner๋“ค์„ ์‹คํ–‰ํ•˜๊ธฐ ๋•Œ๋ฌธ์— Runner ํด๋ž˜์Šค์—์„œ jobParameters๋ฅผ ๋„ฃ์–ด์ค€๋‹ค๋ฉด
        incrementer์ด ์ ์šฉ๋˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ๋‹ค.(๋ฎ์–ด์”Œ์›Œ ์ง)
  • .preventRestart(): Job์˜ ์žฌ์‹œ์ž‘ ๊ฐ€๋Šฅ ์—ฌ๋ถ€ ์„ค์ •.

    • restartable ์˜ default ๊ฐ’์€ true, .preventRestart() ํ•˜๋ฉด false๋กœ ๋ณ€๊ฒฝ๋จ.
    • ํ•ด๋‹น ์˜ต์…˜์„ false๋กœ ์ฃผ๊ฒŒ ๋˜๋ฉด job์˜ ์‹คํ–‰์ด ์‹คํŒจํ•ด๋„ ์žฌ์‹œ์ž‘์ด ๋ถˆ๊ฐ€๋Šฅํ•˜๋‹ค. (JobRestartException ๋ฐœ์ƒ)
    • SimpleJobLaunch์—์„œ lastJobExecution์„ ๊ฐ€์ ธ์˜จ ๋’ค ์กฐ๊ฑด์„ ํ™•์ธํ•œ๋‹ค.
  • .validator(jobParameterValidator): ํŒŒ๋ผ๋ฏธํ„ฐ ๊ตฌ์„ฑ ๊ฒ€์ฆ.

    • DefaultJobParametersValidator ๊ตฌํ˜„์ฒด๋ฅผ ์ง€์›ํ•œ๋‹ค.
      • public DefaultJobParametersValidator(String[] requiredKeys, String[] optionalKeys)
      • ํ•„์ˆ˜ํ‚ค๊ฐ€ ์—†๊ฑฐ๋‚˜, ํ•„์ˆ˜ํ‚ค, ์˜ต์…˜ํ‚ค ๋‘˜๋‹ค์— ์—†๋Š” ํŒŒ๋ผ๋ฏธํ„ฐ๊ฐ€ ๋“ค์–ด์˜ค๋ฉด ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.
    • ์ปค์Šคํ…€ํ•œ ์ œ์•ฝ ์กฐ๊ฑด์„ ์ƒ์„ฑํ•˜๊ณ  ์‹ถ๋‹ค๋ฉด ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์ง์ ‘ ๊ตฌํ˜„ํ•  ์ˆ˜๋„ ์žˆ๋‹ค.
      @Override
      public void validate(JobParameters parameters) throws JobParametersInvalidException {
        if(parameters.getString("name") == null) {
            throw  new JobParametersInvalidException("name parameters is null");
        }
      }
  • .listener(JobExecutionListener): Job์˜ ์‹คํ–‰ ์ „, ํ›„์— ์ฝœ๋ฐฑ์„ ์„ค์ •.

    @Component
    public class JobListener implements JobExecutionListener {
    
        @Override
        public void beforeJob(JobExecution jobExecution) {
        }
    
        @Override
        public void afterJob(JobExecution jobExecution) {
        }
    }

๐Ÿง SimpleJob ์•„ํ‚คํ…์ฒ˜

img.png

  1. JobLauncher ์—์„œ Job, JobParameter๋ฅผ ๊ฐ€์ง€๊ณ  JobInstance๋ฅผ ์ƒ์„ฑ
  2. JobExecution์„ ์ƒ์„ฑํ•˜๊ณ , ExecutionContext ํ• ๋‹น.
  3. JobExecutionListener.beforeJob()
  4. ๊ฐ Step์ด ์‹คํ–‰๋˜๋ฉฐ StepExecution,ExecutionContext ์ƒ์„ฑ
  5. StepExecution์— ์ตœ์ข… ์ƒํƒœ ์—…๋ฐ์ดํŠธ.
  6. JobListener.afterJob() ํ˜ธ์ถœ
  7. JobExecution์— ์ตœ์ข… ์ƒํƒœ ์—…๋ฐ์ดํŠธ.(Status, ExitStatus)
  8. JobLauncher์— ๋ฐ˜ํ™˜.

๐Ÿ“Œ Step์˜ ์‹คํ–‰

๐Ÿง StepBuilderFactory

StepBuilder๋ฅผ ์ƒ์„ฑํ•˜๋Š” ํŒฉํ† ๋ฆฌ ํด๋ž˜์Šค. ๊ตฌ์กฐ๋Š” JobBuilderFactory์™€ ์œ ์‚ฌํ•˜๋‹ค.

  • ๐Ÿ‘† StepBuilder ๊ตฌํ˜„์ฒด

    • TaskletStepBuilder

      • API: tasklet(tasklet())
    • SimpleStepBuilder

      • TaskletStepBuilder์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ TaskletStep์„ ์ƒ์„ฑํ•˜์ง€๋งŒ, ๋‚ด๋ถ€์ ์œผ๋กœ ์ฒญํฌ๊ธฐ๋ฐ˜์˜ ์ž‘์—…์„ ์ฒ˜๋ฆฌํ•˜๋Š” ChunkOrientedTasklet์„ ์ƒ์„ฑํ•œ๋‹ค.
      • API: chunk(chunkSize) | chunk(completionPolicy)
    • PartitionStepBuilder

      • PartitionStep์„ ์ƒ์„ฑํ•˜๋ฉฐ ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ ๋ฐฉ์‹์œผ๋กœ Job์„ ์‹คํ–‰ํ•œ๋‹ค.
      • API: partitioner(stepName, partitioner) | partitioner(step)
    • JobStepBuilder

      • JobStep์„ ์ƒ์„ฑํ•˜๊ณ , Step์•ˆ์—์„œ Job์„ ์‹คํ–‰ํ•œ๋‹ค.
      • API: job(job)
    • FlowStepBuilder

      • FlowStep์„ ์ƒ์„ฑํ•˜๊ณ , Step์•ˆ์—์„œ Flow๋ฅผ ์‹คํ–‰ํ•œ๋‹ค.
      • API: flow(flow)

    TaskletStepBuilder์™€ SimpleStepBuilder๋Š” StepBuilderHelper๋ฅผ ์ƒ์†๋ฐ›์€ AbstractTaskletStepBuilder๋ฅผ ์ƒ์†๋ฐ›๊ณ ,
    ๋‚˜๋จธ์ง€ ๋นŒ๋”๋“ค์€ StepBuilderHelper๋ฅผ ์ง์ ‘ ์ƒ์† ๋ฐ›๋Š”๋‹ค.

๐Ÿง TaskletStep

Tasklet์€ ์Šคํ”„๋ง ๋ฐฐ์น˜์—์„œ ์ œ๊ณตํ•˜๋Š” Step์˜ ๊ตฌํ˜„์ฒด๋กœ Tasklet์„ ์‹คํ–‰์‹œํ‚จ๋‹ค.
Task ๊ธฐ๋ฐ˜๊ณผ Chunk ๊ธฐ๋ฐ˜์ด ์žˆ์œผ๋ฉฐ, RepeatTEmplate๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ Tasklet ๊ตฌ๋ฌธ์„ ํŠธ๋žœ์žญ์…˜ ๋‚ด์—์„œ ๋ฐ˜๋ณต ์‹คํ–‰ํ•œ๋‹ค.

  • Task ๊ธฐ๋ฐ˜

    • ๋‹จ์ผ ์ž‘์—…์œผ๋กœ ์ฒ˜๋ฆฌ๋˜๋Š” ๊ฒƒ์ด ๋” ๋‚˜์€ ๊ฒฝ์šฐ ์‚ฌ์šฉํ•œ๋‹ค.
    • Tasklet ๊ตฌํ˜„์ฒด๋ฅผ ์ƒ์„ฑํ•˜์—ฌ ์‚ฌ์šฉํ•œ๋‹ค.
      @Bean
      public Step myStep() {
          return stepBuilderFactory.get("myStep")
          .tasklet(new Tasklet() {
              @Override
              public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
                  return RepeatStatus.FINISHED;
              }
          })
          .build();
          }
  • chunk ๊ธฐ๋ฐ˜

    • n๊ฐœ์˜ ์กฐ๊ฐ์œผ๋กœ ๋‚˜๋ˆ„์–ด ์‹คํ–‰ํ•œ๋‹ค, ๋Œ€๋Ÿ‰ ์ฒ˜๋ฆฌ์— ํšจ๊ณผ์ ์œผ๋กœ ๋Œ€์ฒ˜ํ•  ์ˆ˜ ์žˆ๋„๋ก ์„ค๊ณ„ ๋˜์—ˆ๋‹ค.
    • ChunkOrientedTasklet ๊ตฌํ˜„์ฒด๊ฐ€ ์ œ๊ณต๋˜๋ฉฐ, ItemReader, ItemProcessor, ItemWriter์„ ์‚ฌ์šฉํ•œ๋‹ค.
      @Bean
      public Step chunkStep() {
          return stepBuilderFactory.get("chunkStep")
              .<String, String>chunk(3)
              .reader(new ItemReader<String>() {
                  @Override
                  public String read()
                      throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                      if(!strings.isEmpty()) {
                          return strings.remove(0);
                      }
                      return null;
                  }
              })
              .processor(new ItemProcessor<String, String>() {
                  @Override
                  public String process(String item) throws Exception {
                      return item.toUpperCase(Locale.ROOT);
                  }
              })
              .writer(new ItemWriter<String>() {
                  @Override
                  public void write(List<? extends String> items) throws Exception {
                      items.forEach(System.out::println);
                  }
              })
              .build();
      }
  • ๐Ÿ‘† API

    • .tasklet(Tasklet), chunk(int size)

      • ๋ฐ˜๋ณต์ ์œผ๋กœ ์ˆ˜ํ–‰๋˜๋Š” Tasklet ํƒ€์ž…์˜ ํด๋ž˜์Šค๋ฅผ ์„ค์ •ํ•œ๋‹ค.
      • ๋ฐ˜ํ™˜ ๊ฐ’์— ๋”ฐ๋ผ ๋ฐ˜๋ณต ์—ฌ๋ถ€๋Š” ๋ณ€๊ฒฝ ๊ฐ€๋Šฅ, RepeatStatus.FINISHED, RepeatStatus.CONTINUABLE
      • ํ•œ๊ฐœ๋งŒ ์„ค์ •์ด ๊ฐ€๋Šฅํ•˜๊ณ , ์—ฌ๋Ÿฌ๊ฐœ ์„ค์ •์‹œ ๋งˆ์ง€๋ง‰ ์„ค์ •๋งŒ ์‹คํ–‰๋œ๋‹ค.
      • ์ต๋ช… ๋˜๋Š” Tasklet์„ ๊ตฌํ˜„ํ•œ๋‹ค.
      • execute()๋Š” StepContribution, ChunkContext๋ฅผ ์ธ์ž๋กœ ๋ฐ›๋Š”๋‹ค.
    • .startLimit(int)

      • Step์˜ ์ตœ๋Œ€ ์‹คํ–‰ ํšŸ์ˆ˜ ์„ค์ •, ๊ธฐ๋ณธ๊ฐ’์€ INTEGER.MAX_VALUE, ์ดˆ๊ณผํ•˜๋ฉด ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.
      • Step ๋งˆ๋‹ค ๊ฐœ๋ณ„๋กœ ์„ค์ •ํ•œ๋‹ค.
      • ๋™์ผํ•œ JobInstance์—์„œ ์‹คํ–‰๋˜๋Š” ๋™์ผํ•œ Step์— ๋Œ€ํ•œ ์‹คํ–‰ ํšŸ์ˆ˜ ์ œํ•œ์ด๋‹ค.
    • .allowStartIfComplete(true)

      • Job์„ ์žฌ์‹œ์ž‘ํ•  ๋•Œ Step์˜ ์ด์ „ ์‹คํ–‰์˜ ์„ฑ๊ณต ์—ฌ๋ถ€์™€ ์ƒ๊ด€ ์—†์ด ํ•ญ์ƒ Step์„ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•œ ์„ค์ •์ด๋‹ค.
      • Step 1~4์ค‘์— 1,2๊นŒ์ง€ ์„ฑ๊ณตํ•˜๊ณ  ์‹คํŒจํ•˜์—ฌ Job์„ ์žฌ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์„ ๋•Œ ๊ธฐ๋ณธ์ ์œผ๋กœ๋Š” 3๋ถ€ํ„ฐ ๋‹ค์‹œ ์‹œ์ž‘ํ•œ๋‹ค.
        ํ•˜์ง€๋งŒ 1,2์˜ ์ž‘์—…์ด ๋ฌด์กฐ๊ฑด ์„ ํ–‰๋˜์–ด์•ผ ํ•˜๋Š” Flow๋ผ๋ฉด, ํ•ด๋‹น ์˜ต์…˜์„ ํ™œ์„ฑํ™”ํ•˜์—ฌ 1๋ถ€ํ„ฐ ์‹œ์ž‘ํ•˜๋„๋ก ๋ณ€๊ฒฝํ•  ์ˆ˜ ์žˆ๋‹ค.
    • listener(StepExecutionListener)

      • Step์˜ ์‹คํ–‰ ์ „ ํ›„์˜ ์ฝœ๋ฐฑ.

๐Ÿง TaskletStep ์•„ํ‚คํ…์ณ

img_8.png

  1. ExecutionContext๋ฅผ ๊ฐ€์ง€๋Š” StepExecution์ด ์ƒ์„ฑ๋œ๋‹ค.
  2. TaskletStep์—์„œ StepExecution์„ ๋ฐ›์•„ Step์„ ์‹คํ–‰์‹œํ‚จ๋‹ค.
  3. StepExecutionListener.beforeStep()์„ ํ˜ธ์ถœํ•œ๋‹ค.
  4. RepeatTemplate ์—์„œ Tasklet์„ ๋ฐ˜๋ณต ์‹คํ–‰ํ•œ๋‹ค.
    loop ์—์„œ๋Š” RepeatStatus๋ฅผ ํ™•์ธํ•˜์—ฌ FINISHED ๋ผ๋ฉด ๋ฃจํ”„๋ฅผ ๋น ์ ธ๋‚˜์˜ค๊ณ  CONTINUABLE ์ด๋ผ๋ฉด ๋‹ค์‹œ RepeatTemplate์—์„œ Tasklet์„ ๋ฐ˜๋ณต์‹œํ‚จ๋‹ค.
  5. StepExecution์˜ Status๋ฅผ ์—…๋ฐ์ดํŠธ ํ•œ๋‹ค.
  6. StepExecutionListener.afterStep()์„ ํ˜ธ์ถœํ•œ๋‹ค.
  7. StepExecution์˜ ExitStatus๋ฅผ ์—…๋ฐ์ดํŠธ ํ•œ๋‹ค.

๐Ÿง JobStep

๋˜ ๋‹ค๋ฅธ Job์„ ์‹คํ–‰์‹œํ‚ค๋Š” Step์œผ๋กœ, ์‹œ์Šคํ…œ์„ ์ž‘์€ ๋ชจ๋“ˆ๋กœ ์ชผ๊ฐœ Job์˜ ํ๋ฆ„์„ ๋‚˜๋ˆ„๊ณ ์ž ํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.

  • ๐Ÿ‘† API

    • .job(Job)

      • ์‹คํ–‰ํ•  Job์„ ์„ค์ •ํ•œ๋‹ค.
      • ์ถ”๊ฐ€ํ•œ Job๋„ Bean ์œผ๋กœ ๋“ฑ๋กํ•˜๋ฉด ์ž๋™์œผ๋กœ ์‹คํ–‰๋˜์–ด 2๋ฒˆ ์‹คํ–‰๋  ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ ์„ค์ •์ด ํ•„์š”ํ•˜๋‹ค.
    • .launcher(JobLauncher)

      • Job์„ ์‹คํ–‰ํ•  JobLauncher๋ฅผ ์„ค์ •ํ•œ๋‹ค.
      • null์„ ๋„˜๊ฒจ ์ฃผ๋ฉด SimpleJobLauncher๋กœ ์‹คํ–‰ํ•œ๋‹ค.
        // JobBuilder
        if (jobLauncher == null){
            SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
            ...
        }
    • .parametersExtractor(JobParametersExtractor)

      • Step์˜ ExecutionContext์—์„œ ๊ฐ’์„ ์ถ”์ถœํ•ด JobParameters๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.

      • ์ œ๊ณต๋˜๋Š” DefaultJobParametersExtractor๋‚˜ JobParametersExtractor๋ฅผ ๊ตฌํ˜„ํ•œ ๊ตฌํ˜„์ฒด๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

      • ๋ถ€๋ชจ์˜ JobParameter๋“ค์€ ๊ธฐ๋ณธ์ ์œผ๋กœ ์ถ”๊ฐ€๋˜๊ณ , setKeys() ๋ฅผ ์ด์šฉํ•ด StepExecution์˜ ExecutionContext์—์„œ ๊ฐ’์„ ์ฐพ์•„ ํŒŒ๋ผ๋ฏธํ„ฐ์— ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ๋‹ค.

        // ํ•ด๋‹น Step์˜ ExecutionContext์— ๊ฐ’ ์ถ”๊ฐ€.
        .listener(new StepExecutionListener() {
            @Override
            public void beforeStep(StepExecution stepExecution) {
                stepExecution.getExecutionContext().put("date", new Date());
            }
          
            @Override
            public ExitStatus afterStep(StepExecution stepExecution) {
                return null;
            }
        })
        private JobParametersExtractor jobParametersExtractor() {
            DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
            extractor.setKeys(new String[] {"date"});
        
            return extractor;
        }

        img_9.png

        ๋ถ€๋ชจ Job(7), jobStep์˜ Job(8)


๐Ÿ“Œ Flow

๐Ÿง FlowJob

Step์˜ ์ˆœ์ฐจ์  ์‹คํ–‰์ด ์•„๋‹ˆ๋ผ ์ƒํƒœ์— ๋”ฐ๋ผ ํ๋ฆ„์„ ์ „ํ™˜ํ•˜๋„๋ก ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค.

  • Step์ด ์‹คํŒจํ•˜๋”๋ผ๋„ Job์€ ์‹คํŒจํ•˜์ง€ ์•Š๊ฒŒ ํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ๋‹ค์Œ์— ์‹คํ–‰ํ•  step์„ ๊ตฌ๋ถ„ํ•˜์—ฌ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.
    • ex) ์„ฑ๊ณต์‹œ Step2, ์‹คํŒจ์‹œ Step3 ...
  • ํŠน์ • Step์„ ์‹คํ–‰๋˜์ง€ ์•Š๊ฒŒ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค.

JobBuilderFactory โ–ถ JobBuilder โ–ถ JobFlowBuilder โ–ถ FlowBuilder โ–ถ FlowJob

  • ๐Ÿ‘† API

    • .start(Step), .next(Step)

    • .from(Step)

      • ์ด์ „์— ์ •์˜ํ•œ Step์˜ flow๋ฅผ ์ถ”๊ฐ€์ ์œผ๋กœ ์ •์˜ํ•œ๋‹ค.(Transition์„ ์ƒˆ๋กญ๊ฒŒ ์ •์˜.)
    • .on(String pattern)

      • Step์˜ ExitStatus๋ฅผ ์บ์น˜ํ•˜๊ณ  ํŒจํ„ด๊ณผ ๋งค์นญ๋˜๋ฉด, TransitionBuilder ๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
      • Step์˜ ExitStatus๊ฐ€ on()์˜ ์–ด๋–ค Pattern ๊ณผ๋„ ๋งค์นญ์ด ๋˜์ง€ ์•Š๋Š”๋‹ค๋ฉด ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๊ณ  Job์€ ์‹คํŒจํ•˜๊ฒŒ ๋œ๋‹ค.
        • *: ์™€์ผ๋“œ ์นด๋“œ, ๋ชจ๋“  ExitStatus์™€ ๋งค์นญ(C*, F*, *), ?: ์ •ํ™•ํžˆ 1๊ฐœ์˜ ๋ฌธ์ž์™€ ๋งค์นญ (C?T)
        • ์™€์ผ๋“œ ์นด๋“œ๋Š” else ์ฒ˜๋Ÿผ ์‚ฌ์šฉ๋  ์ˆ˜ ์žˆ๋‹ค. step1์— on("COMPLETED") to(step2())๋ฅผ ์‹คํ–‰ํ•œ๋‹ค๊ณ  ์ง€์ •ํ•ด๋‘๊ณ ,
          from(step1()).on("*") ์‚ฌ์šฉํ•˜๋ฉด COMPLETED๋ฅผ ์ œ์™ธํ•œ ๋ชจ๋“  ํŒจํ„ด์„ ๋œปํ•˜๊ฒŒ ๋œ๋‹ค.
      • TransitionBuilder๋Š” ์•„๋ž˜์˜ API๋ฅผ ๊ฐ€์ง„๋‹ค.
      • .to(Step | Flow | JobExecutionDecider)

        • ๋‹ค์Œ์œผ๋กœ ์‹คํ–‰ํ•  ๊ฒƒ์„ ์ง€์ •ํ•œ๋‹ค.
      • .stop(), .fail(), .end(), .stopAndRestart()

        • flow๋ฅผ ์ค‘์ง€, ์‹คํŒจ, ์ข…๋ฃŒํ•˜๋„๋ก ํ•œ๋‹ค.
        • FlowExecutionStatus๊ฐ€ ๊ฐ๊ฐ STOPPED, FAILED, COMPLETED ๋กœ ์ข…๋ฃŒ๋œ๋‹ค.
        • stopAndRestart() ๋Š” ํ˜„์žฌ๊นŒ์ง€์˜ Step์€ COMPLETED๋กœ, ์ดํ›„๋Š” ์‹คํ–‰ํ•˜์ง€ ์•Š๊ณ  STOPPED ์ƒํƒœ๋กœ Job์„ ์ข…๋ฃŒํ•œ๋‹ค.(์ดํ›„ ์žฌ์‹œ์ž‘์‹œ COMPLETED๋Š” Skip)
        • ์‹ค์ œ Step์ด FAILED๋กœ ์ข…๋ฃŒ๋˜์—ˆ๋”๋ผ๋„ Job์˜ BatchStatus๋ฅผ COMLETED๋กœ ์ข…๋ฃŒํ•˜๋„๋ก ํ•  ์ˆ˜ ์žˆ๋‹ค.(์žฌ์‹œ์ž‘ ๋ถˆ๊ฐ€๋Šฅ ํ•ด์ง)
    • end()

      • FlowBuilder ๋ฅผ ์ข…๋ฃŒํ•˜๊ณ  SimpleFlow ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.
      • FlowJobBuilder์—์„œ๋Š” flowJob์„ ์ƒ์„ฑํ•˜๊ณ  Simpleflow๋ฅผ ์‹คํ–‰์‹œํ‚จ๋‹ค.

start, next, from ์€ flow๋ฅผ ์ •์˜ํ•˜๊ณ , on, to, stop, fail, end, stopAndRestart๋Š” ์กฐ๊ฑด์— ๋”ฐ๋ผ ํ๋ฆ„์„ ์ „ํ™˜์‹œํ‚จ๋‹ค.
on()์„ ํ˜ธ์ถœํ•˜๋ฉด TransitionBuilder๊ฐ€ ์ƒ์„ฑ๋˜๊ณ , to, stop, fail, end, stopAndRestart๋ฅผ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

@Bean
public Job flowJob() {
    return jobBuilderFactory.get("flowJob")
        .start(myStep1())
        .on("COMPLETED").to(myStep3())
        .from(myStep1())
        .on("FAILED").to(myStep2())
        .end()
        .build();
}

FlowJob์„ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๊ตฌ์„ฑํ•˜๋ฉด myStep1์ด ์„ฑ๊ณตํ•˜๋ฉด myStep3๋กœ, ์‹คํŒจํ•˜๋ฉด myStep2๋ฅผ ์‹คํ–‰ํ•œ๋‹ค๋Š” ํ๋ฆ„์ด ๋งŒ๋“ค์–ด์ง„๋‹ค.

img_10.png
DB์— ์ €์žฅ๋œ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ๋ฅผ ํ™•์ธํ•˜๋ฉด myStep 1๊ณผ 3์ด ์‹คํ–‰๋œ ๊ฒƒ์„ ํ™•์ธ ํ•  ์ˆ˜ ์žˆ๋‹ค.

์ด๋ฒˆ์—๋Š” myStep1 ์—์„œ ์˜ˆ์™ธ๋ฅผ ๋ฐœ์ƒ์‹œ์ผœ ์ผ๋ถ€๋Ÿฌ ์‹คํŒจํ•œ ํ›„ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ๊ฐ’์„ ์‚ดํŽด๋ณด๊ฒ ๋‹ค.
img_11.png
on์˜ FAILED ํŒจํ„ด๊ณผ ๋งค์นญ๋˜์–ด myStep2๊ฐ€ ์‹คํ–‰๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.
ํ•œ ๊ฐ€์ง€ ๋” ํŠน์ด ์‚ฌํ•ญ์ด ์žˆ๋‹ค๋ฉด, FlowJob์—์„œ๋Š” Step์˜ ์‹คํŒจ๊ฐ€ Job์˜ ์‹คํŒจ๋กœ ์—ฐ๊ฒฐ๋˜์ง€ ์•Š๋Š”๋‹ค๋Š” ๊ฒƒ์ด๋‹ค. ์œ„์˜ ์ƒํ™ฉ์—์„œ JobExecution์„ ํ™•์ธํ•ด ๋ณด์•˜๋‹ค.
img_12.png
๋ถ„๋ช… myStep1์„ ์‹คํŒจ์‹œ์ผฐ์ง€๋งŒ Job์€ ์„ฑ๊ณต์ ์œผ๋กœ ๋๋‚œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. ๋ชจ๋“  ์ƒํ™ฉ์—์„œ ์ด๋Ÿฐ ๊ฒƒ์€ ์•„๋‹ˆ๊ณ , ์‹คํŒจํ–ˆ์„ ๊ฒฝ์šฐ ์–ด๋–ค ๊ฒƒ์„ ํ•˜๋Š”์ง€ ์ •์˜๊ฐ€ ๋˜์–ด์žˆ์„ ๋•Œ๋งŒ ํ•ด๋‹นํ•œ๋‹ค.
์‹ค์ œ๋กœ COMPLETED์˜ ์กฐ๊ฑด๋งŒ์„ ์ฃผ๊ณ  Step์„ ์‹คํŒจ์‹œ์ผฐ์„ ๋•Œ์—๋Š” Job ๋˜ํ•œ ์‹คํŒจํ–ˆ๋‹ค.

๐Ÿง Transition

Flow ๋‚ด Step์˜ ์กฐ๊ฑด๋ถ€ ์ „ํ™”์„ ์ •์˜ํ•œ๋‹ค. on()์„ ํ˜ธ์ถœํ•˜๋ฉด TransitionBuilde ๊ฐ€ ๋ฐ˜ํ™˜๋˜๊ณ , ํ•ด๋‹น ๊ฐ์ฒด์˜ API๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ Transition Flow๋ฅผ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค.

  • ๐Ÿ‘† ๋ฐฐ์น˜ ์ƒํƒœ

    • BatchStatus

      Job, Step์˜ ์ข…๋ฃŒ ํ›„ ์ตœ์ข… ๊ฒฐ๊ณผ ์ƒํƒœ๋กœ, SimpleJob ์—์„œ๋Š” ๊ฐ€์žฅ ๋งˆ์ง€๋ง‰์— ์‹คํ–‰ ๋œ Step์˜ ์ƒํƒœ๊ฐ€๋˜๊ณ ,
      FlowJob ์—์„œ๋Š” ๋งˆ์ง€๋ง‰ Flow์˜ FlowExecutionStatus ๊ฐ’์ด ๋œ๋‹ค.

      COMPLETE, STARTING, STARTED, STOPPED, FAILED, ABANDONED(์‹คํŒจ, ๊ทธ๋Ÿฌ๋‚˜ ์žฌ์‹œ์ž‘์‹œ ๊ฑด๋„ˆ ๋›ฐ์–ด์•ผํ•˜๋Š” ๋‹จ๊ณ„), UNKOWN

    • ExitStatus

      ์–ด๋–ค ์ƒํƒœ๋กœ ์ข…๋ฃŒ๋˜์—ˆ๋Š”์ง€๋ฅผ ์˜๋ฏธํ•œ๋‹ค. ๊ธฐ๋ณธ์ ์œผ๋กœ๋Š” BatchStatus์™€ ๋™์ผํ•œ ๊ฐ’์œผ๋กœ ์„ค์ •๋˜์ง€๋งŒ, ์ž„์˜๋กœ ๋ณ€๊ฒฝํ•  ์ˆ˜ ์žˆ๋‹ค.(contribution.setExitStatus()
      SimpleJob, FlowJob์—์„œ์˜ ๊ฐ’์˜ ์„ค์ •์€ BatchStatus์™€ ๊ฐ™๋‹ค.

      COMPLETED, FAILED, STOPPED, EXECUTING, UNKNOWN

    • FlowExecutionStatus

      FlowExecution์˜ ์†์„ฑ์œผ๋กœ FLow ์‹คํ–‰ ํ›„ ๊ฒฐ๊ณผ ์ƒํƒœ๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ๋‹ค.
      Flow ๋‚ด์˜ Step์˜ ExitStatus ๊ฐ’์„ FlowExecutionStatus ๊ฐ’์œผ๋กœ ์ €์žฅํ•˜๋ฉฐ FlowJob์˜ ๋ฐฐ์น˜ ๊ฒฐ๊ณผ ์ƒํƒœ์— ๊ด€์—ฌํ•œ๋‹ค.(Step์—๋Š” ์˜ํ–ฅ์„ ์ฃผ์ง€ ์•Š๋Š”๋‹ค)

      COMPLETED, STOPPED, FAILED, UNKNOWN

๐Ÿง ์‚ฌ์šฉ์ž ์ •์˜ ExitStatus

๊ธฐ๋ณธ์ ์œผ๋กœ ์ •์˜๋˜์–ด ์žˆ๋Š” ExitStatus ์ด์™ธ์˜ exitCode๋ฅผ ์ƒˆ๋กญ๊ฒŒ ์ •์˜ ํ•  ์ˆ˜ ์žˆ๋‹ค.
StepExecutionListener์˜ afterStep() ์—์„œ ์ƒ์„ฑํ•œ ํ›„์— ๋งŒ๋“ค์–ด์ง„ ExitStatus๋ฅผ ๋ฐ˜ํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

new ExitStatus("CUSTOM_STATUS")

afterStep()) ์—์„œ ์ƒˆ๋กœ์šด ExitStatus๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋ฉด TaskletStep์˜ exitStatus๋ฅผ ์„ธํŒ…ํ•˜๋Š” ๋ถ€๋ถ„์—์„œ ์ด๋ฅผ ๋ฐ˜์˜ํ•œ๋‹ค.
์›๋ž˜์˜ ExitStatus๋ฅผ ์„ค์ •ํ•œ ํ›„ afterStep()์„ ํ˜ธ์ถœํ•˜์—ฌ ๋‹ค์‹œ ExitStatus๋ฅผ ๊ฐ€์ ธ์˜ค๊ธฐ ๋•Œ๋ฌธ์— ๋ฎ์–ด ์”Œ์›Œ์ง„๋‹ค.

@Bean
public Flow flowA() {
    FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("flowA");
    
    return flowBuilder
        .start(myStep1())
        .on("COMPLETED")
        .to(myStep2())
        .on("PASS")
        .stop()
        .next(myStep3())
        .end();
}
@Bean
public Step myStep2() {
    return stepBuilderFactory.get("myStep2")
        .tasklet(new MyTasklet("myStep2"))
        .listener(new PasscheckingListener())
        .build();
}
public class PasscheckingListener implements StepExecutionListener {

    ...
    
    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        ExitStatus exitCode = stepExecution.getExitStatus();

        if(!exitCode.getExitCode().equals(ExitStatus.FAILED.getExitCode())) {
            return new ExitStatus("PASS");
        }
        return exitCode;
    }
}

img_14.png

myStep2์˜ EXIT_CODE๊ฐ€ PASS๋กœ ๋ณ€๊ฒฝ๋˜์—ˆ๋‹ค.

img_13.png

myStep1์ด COMPLETED๋กœ ๋๋‚˜ myStep2๊ฐ€ ์‹คํ–‰๋˜๊ณ  ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ COMPLETED๋กœ ๋๋‚˜๊ธฐ ๋•Œ๋ฌธ์— afterStep() ์—์„œ ExitStatus๊ฐ€ PASS ๋กœ ๋ณ€๊ฒฝ๋œ๋‹ค.
on("PASS") ํŒจํ„ด์— ๋งค์นญ๋˜์–ด .stop()์ด ํ˜ธ์ถœ๋˜๊ณ , Job์€ STOPPED ์ƒํƒœ๋กœ ๋งˆ์น˜๊ฒŒ ๋œ๋‹ค.

๐Ÿง JobExecutionDecider

ExitStatus์˜ ์กฐ์ž‘์ด๋‚˜ StepEcecutionListener์˜ ๋“ฑ๋ก์—†์ด Transition ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ํด๋ž˜์Šค๋กœStep๊ณผ Transition์˜ ์—ญํ• ์„ ๋ช…ํ™•ํ•˜๊ฒŒ ๋ถ„๋ฆฌํ•  ์ˆ˜ ์žˆ๊ฒŒ ํ•ด์ค€๋‹ค.

๊ธฐ์กด์—๋Š” Step์˜ ExitStatus๊ฐ€ JobExecutionStatus์˜ ์ƒํƒœ ๊ฐ’์— ๋ฐ˜์˜๋˜๊ณ , ์ด ๊ฐ’์ด JobFlow์— ๋ฐ˜์˜ํ•˜๋Š” ๊ฒƒ๊ณผ ๋‹ฌ JobExecutionDecider์—์„œ FlowExecutionStatus ์ƒํƒœ๊ฐ’์„ ์ƒˆ๋กญ๊ฒŒ ์ƒ์„ฑํ•ด์„œ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .incrementer(new RunIdIncrementer())
        .start(firstStep())
        .next((decider()))
        .on("ODD").to(oddStep())
        .on("EVEN").to(evenStep())
        .end()
        .build();
}

@Bean
public JobExecutionDecider decider() {
    return new CustomDecider();
}
public class CustomDecider implements JobExecutionDecider {

    private int count = 0;

    @Override
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        if (++count % 2 == 0) {
            return new FlowExecutionStatus("EVEN");
        }
        return new FlowExecutionStatus("ODD");
    }
}

์ด์ „์— API๋ฅผ ์ด์šฉํ•˜์—ฌ ExitStatus ์ฝ”๋“œ์— ๋”ฐ๋ผ flow๋ฅผ ์ง„ํ–‰ํ•˜๋Š” ๋ฐฉ์‹๊ณผ ๋™์ผํ•˜๊ฒŒ ๋™์ž‘ํ•œ๋‹ค. Job์„ ๊ตฌ์„ฑํ•˜๋Š” ์ƒํ™ฉ์— ๋”ฐ๋ผ ๋” ์•Œ๋งž๋‹ค๊ณ  ์ƒ๊ฐ๋˜๋Š” ๋ฐฉ๋ฒ•์„ ์„ ํƒํ•˜๋ฉด ๋˜๊ฒ ๋‹ค.

๐Ÿง FlowJob ์•„ํ‚คํ…์ฒ˜

img_15.png

๋Œ€๋ถ€๋ถ„์€ SimpleJob๊ณผ ๋™์ผํ•˜๋‹ค.
๋‹ค๋ฅธ ์ ์€ SimpleFlow ์—์„œ State๋ผ๋Š” ์†์„ฑ์„ ๊ฐ€์ง„๋‹ค๋Š” ๊ฒƒ๊ณผ, ์ž‘์—…์ด ์ข…๋ฃŒ๋˜์—ˆ์„ ๋•Œ StepExecution์˜ ์ƒํƒœ๋ฅผ ๋ฐ˜์˜ํ•˜๋Š” ๊ฒƒ์ด ์•„๋‹ˆ๋ผ FlowExecutionStatus
์˜ ์ƒํƒœ๋กœ ์—…๋ฐ์ดํŠธ ํ•œ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค.

๐Ÿง SimpleFlow

Flow์˜ ๊ตฌํ˜„์ฒด๋กœ Step, Flow, JobExecutionDecider์„ ๋‹ด๊ณ  ์žˆ๋Š” State๋ฅผ ์‹คํ–‰์‹œํ‚ค๋Š” ๋„๋ฉ”์ธ ๊ฐ์ฒด๋กœ, FlowBuilder๋ฅผ ํ†ตํ•ด ์ƒ์„ฑ๋œ๋‹ค.
Flow๋Š” ์ค‘์ฒฉ๋  ์ˆ˜ ์žˆ๋‹ค

@Bean
public Job flowJob() {
    return jobBuilderFactory.get("flowJob")
        .start(flowA()) // SimpleFlowA
        .end() // SimpleFlow ์ƒ์„ฑ
        .build();
}

๊ฒฐ๊ณผ์ ์œผ๋กœ FlowJob ( SimpleFlow( SimpleFlowA ) )์™€ ๊ฐ™์€ ํ˜•ํƒœ๊ฐ€ ๋œ๋‹ค.

  • Flow

    • getName()
    • getStatus(stateName)
    • FlowExecution start(flowExcecutor) : Flow๋ฅผ ์‹คํ–‰.
    • resume(stateName, flowExecutor) : ๋‹ค์Œ์— ์‹คํ–‰ํ•  State๋ฅผ ๊ตฌํ•ด FlowExecutor ์—๊ฒŒ ์‹คํ–‰์„ ์œ„์ž„ํ•œ๋‹ค.
    • getStates() : Flow๊ฐ€ ๊ฐ€์ง€๊ณ  ์žˆ๋Š” ๋ชจ๋“  State๋ฅผ Collection ์œผ๋กœ ๋ฐ˜ํ™˜.
  • SimpleFlow implements Flow

    • String name
    • State startState: ๊ฐ€์žฅ ์ฒ˜์Œ์œผ๋กœ ์‹œ์ž‘ํ•  State(StepState, FlowState, DecisionState, SplitState)
    • Map<String, Set<StateTransition>> transitionMap : State ์ด๋ฆ„์œผ๋กœ ๋งคํ•‘ State ๋ณ„ Transition Set
    • Map<String, State> stateMap: ์ด๋ฆ„์œผ๋กœ ๋งคํ•‘๋˜์–ด ์žˆ๋Š” State Map
    • List<StateTransition> stateTransitions : State + Transition ์ •๋ณด๋ฅผ ๊ฐ€์ง„ ๊ฐ์ฒด์˜ ๋ฆฌ์ŠคํŠธ.
      StateTransition ์€ ํ˜„์žฌ State ์™€(state) on()์— ๋งค์นญ๋˜๋Š” ํŒจํ„ด(pattern), ๋‹ค์Œ State(next) ์˜ ์†์„ฑ์œผ๋กœ ์ด๋ฃจ์–ด์ ธ ์žˆ๋‹ค.
  • SimpleFlow ์ƒ์„ฑ

    @Bean
    public Flow flow() {
        FlowBuilder<Flow> flowBuilder = new FlowBuilder<>("flow");
        return flowBuilder
            .start(myStep1())
            .next(myStep())
            .end();
    }

    ๋˜๋Š” flowBuilder.build()๋ฅผ return ํ•ด๋„ ๋œ๋‹ค. (end() ๊ฐ€ ๋‚ด๋ถ€์ ์œผ๋กœ๋Š” build()๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ SimpleFlow ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.)

๐Ÿง SimpleFlow ์•„ํ‚คํ…์ฒ˜

img_16.png

start(), next(), from() ์ „๋‹ฌ๋˜๋Š” ๊ฐ์ฒด์— ๋”ฐ๋ผ State ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•˜์—ฌ ์ „๋‹ฌ๋œ ๊ฐ์ฒด๋ฅผ ์ €์žฅํ•œ๋‹ค.
์ด๋ ‡๊ฒŒ ์ƒ์„ฑ๋œ State๋Š” SimpleFlow ์—์„œ StateTransition ๊ฐ์ฒด๋กœ ๊ด€๋ฆฌ๋˜๋ฉฐ, ํ•ด๋‹น ๊ฐ์ฒด๋ฅผ ํ† ๋Œ€๋กœ SimpleFlow์˜ ๋‹ค๋ฅธ ์†์„ฑ๋“ค์˜ ๊ฐ’์„ ์„ค์ •ํ•˜๊ฒŒ ๋œ๋‹ค.

img_17.png

SimpleFlow๊ฐ€ State ๋ฅผ ์‹คํ–‰์‹œ์นธ๋‹ค.(StateTransition ์„ ์ฐธ๊ณ ํ•˜์—ฌ currentState๋ฅผ ์‹คํ–‰ํ•œ๋‹ค. Map์— ์ €์žฅ๋œ ๋ชจ๋“  State๋ฅผ ์ˆœํšŒํ•˜๋ฉฐ ์‹คํ–‰.)
State ์—์„œ๋Š” Step, Flow, JobExecutionDecider ์š”์†Œ๋“ค์„ ์ €์žฅํ•˜๋ฉฐ,Flow๋ฅผ ๊ตฌ์„ฑํ•˜๋ฉด ์ž๋™์œผ๋กœ State๊ฐ€ ์ƒ์„ฑ๋˜๋ฉฐ Transition๊ณผ ์—ฐ๋™๋œ๋‹ค.
handle() ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ์‹คํ–‰ ํ›„ FlowExecutionStatus๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค. ๋งˆ์ง€๋ง‰ ์‹คํ–‰ ์ƒํƒœ๊ฐ€ FlowJob์˜ ์ตœ์ข… ์ƒํƒœ๊ฐ€ ๋œ๋‹ค.

  • SimpleFlow๋Š” ๋˜ SimpleFlow๋ฅผ ๊ฐ€์งˆ ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ์ค‘์ฒฉ๋˜์–ด ๊ฐ์ฒด๊ฐ€ ์ƒ์„ฑ๋˜๋ฉฐ ์‹คํ–‰๋œ๋‹ค.
  • SplitState ๋Š” ์—ฌ๋Ÿฌ๊ฐœ์˜ SimpleFlow๋ฅผ ๊ฐ€์ง€๊ณ  ๋ณ‘๋ ฌ์ ์œผ๋กœ ์‹คํ–‰์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค.

SimpleFlow์˜ ์‹คํ–‰

1. SimpleFlow์˜ start() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ฒซ State๋ฅผ ์‹คํ–‰์‹œํ‚จ๋‹ค.   
2. ๊ทธ ์ดํ›„ resume() ๋ฉ”์„œ๋“œ ์—์„œ๋Š” loop๋ฅผ ๋Œ๋ฉฐ ๋‹ค์Œ์— ์‹คํ–‰ํ•  State๊ฐ€ ์žˆ๋‹ค๋ฉด ์‹คํ–‰์‹œํ‚ค๊ณ , null์ด๊ฑฐ๋‚˜ ์‹คํ–‰ ๋ถˆ๊ฐ€๋Šฅํ•œ ์ƒํƒœ๋ผ๋ฉด ์ข…๋ฃŒํ•œ๋‹ค.   
3. nextState๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ StateMap์—์„œ ๋‹ค์Œ State๋ฅผ ์‹คํ–‰ํ•œ๋‹ค.

๐Ÿง FlowStep

Step ๋‚ด์— Flow๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ๋Š” ๋„๋ฉ”์ธ ๊ฐ์ฒด. FlowStep์˜ Status ๋“ค์€ Flow์˜ ์ตœ์ข… ์ƒํƒœ๊ฐ’์— ๋”ฐ๋ผ ๊ฒฐ์ •๋œ๋‹ค.

StepBuilderFactory โ–ถ StepBuilder โ–ถ FlowStepBuilder โ–ถ FlowStep

@Bean
public Step flowStep() {
    return stepBuilderFactory.get("flowStep")
        .flow(flowA()) // FlowStepBuilder ๋ฐ˜ํ™˜.
        .build(); // FlowStep ๋ฐ˜ํ™˜.
}

๐Ÿ“Œ @JobScope, @StepScope

@JobScope ์™€ @StepScope๋Š” ๋นˆ์˜ ์ƒ์„ฑ๊ณผ ์‹คํ–‰์— ๊ด€์—ฌํ•˜๋ฉฐ, ๋นˆ์˜ ์ƒ์„ฑ ์‹œ์ ์„ ์กฐ์ž‘ํ•œ๋‹ค.(๊ตฌ๋™์‹œ์  -> ๋นˆ์˜ ์‹คํ–‰ ์‹œ์ )
๋‘ Scope ์• ๋…ธํ…Œ์ด์…˜์€ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ •์˜๋˜์–ด ์žˆ๋‹ค. @Scope(value="job | step", proxyMode = ScopedProxyMode.TARGET_CLASS
์ •์˜์—์„œ ๋ณผ ์ˆ˜ ์žˆ๋“ฏ ํ•ด๋‹น ์• ๋…ธํ…Œ์ด์…˜์„ ์‚ฌ์šฉํ•˜๋ฉด ๊ตฌ๋™์‹œ์ ์—๋Š” ํ”„๋ก์‹œ ๊ฐ์ฒด๋กœ ์ƒ์„ฑ๋˜๊ณ , ์‹คํ–‰ ์‹œ์ ์— ์‹ค์ œ ๋นˆ์„ ํ˜ธ์ถœํ•˜์—ฌ ๋ฉ”์„œ๋“œ๋ฅผ ์‹คํ–‰ํ•œ๋‹ค.

  • @Values ๋ฅผ ์ฃผ์ž…ํ•ด์„œ ๋นˆ์˜ ์‹คํ–‰ ์‹œ์ ์— ํŠน์ • ๊ฐ’์„ ์ฐธ์กฐํ•˜๋Š”๊ฒŒ ๊ฐ€๋Šฅํ•ด์ง„๋‹ค.(Lazy Binding, ํ•„๋“œ ๋˜๋Š” ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ฃผ์ž…๋ฐ›๋Š”๋‹ค)

    • @Values("#{jobParameters[paramName]}"), @Values("#{jobExecutionContext[paramName]}"), @Values("#{stepExecutionContext[paramName]}")
  • ์Šคํ”„๋ง์˜ Bean ์€ ๊ธฐ๋ณธ์ ์œผ๋กœ Singleton ์ด๊ธฐ ๋•Œ๋ฌธ์— ์Šค๋ ˆ๋“œ ์„ธ์ดํ”„ ํ•˜์ง€ ์•Š์€๋ฐ, ํ•ด๋‹น ์• ๋…ธํ…Œ์ด์…˜๋“ค์„ ์‚ฌ์šฉํ•˜๋ฉด ๊ฐ ์Šค๋ ˆ๋“œ๋งˆ๋‹ค ์Šค์ฝ”ํ”„ ๋นˆ์ด ํ• ๋‹น๋˜๊ธฐ ๋•Œ๋ฌธ์— ์Šค๋ ˆ๋“œ ์„ธ์ดํ”„ํ•˜๊ฒŒ ์‹คํ–‰์ด ๊ฐ€๋Šฅํ•ด์ง„๋‹ค.

  • @JobScope

    • Step์˜ ์„ ์–ธ๋ฌธ์— ์ •์˜
    • jobParameter, jobExecutionContext ๊ฐ’์„ ๋ฐ”์ธ๋”ฉ ํ•  ์ˆ˜ ์žˆ๋‹ค.
    @JobScope
    @Bean
    public Step myStep1(@Value("#{jobParameters['message']}") String message) {
        System.out.println("Parameter[message]: " + message);
        return stepBuilderFactory.get("myStep1")
            .tasklet(tasklet(null, null))
            .build();
    }

    ๋Ÿฐํƒ€์ž„ ์‹œ์ ์— ๊ฐ’์ด ๋ฐ”์ธ๋”ฉ ๋˜๊ธฐ ๋•Œ๋ฌธ์— null์„ ๋„˜๊ฒจ์ฃผ์–ด ์ปดํŒŒ์ผ ์—๋Ÿฌ๋ฅผ ๋ฐฉ์ง€ํ•ด์ค€๋‹ค.

  • @StepScope

    • Tasklet, Item Reader, Writer, Processor ์„ ์–ธ๋ฌธ์— ์ •์˜ํ•œ๋‹ค.
    • jobParameter, jobExecutionContext, stepExecutionContext ๊ฐ’์„ ๋ฐ”์ธ๋”ฉ ํ•  ์ˆ˜ ์žˆ๋‹ค.
    @Bean
    @StepScope
    public Tasklet tasklet(@Value("#{jobExecutionContext['name']}") String jobName,
        @Value("#{stepExecutionContext['name']}") String stepName) {
        return ((contribution, chunkContext) -> {
            System.out.println("tasklet has execute");
            System.out.println("jobName: " + jobName + ", " + "stepName: " + stepName);
            return RepeatStatus.FINISHED;
        });
    }

    job, stepExecutionContext์˜ ๊ฐ’์€ ๊ฐ ExecutionListener์—์„œ ๋„ฃ์–ด์ค„ ์ˆ˜ ์žˆ๋‹ค.

๐Ÿง Scope ์•„ํ‚คํ…์ฒ˜

Proxy ๊ฐ์ฒด์˜ ์‹ค์ œ ๋Œ€์ƒ์ด ๋˜๋Š” Bean์„ ๋“ฑ๋กํ•˜๊ณ , ํ•ด์ œํ•˜๋Š” ์—ญํ• ์„ ํ•˜๋Š” JobScope, StepScope ํด๋ž˜์Šค๊ฐ€ ์กด์žฌํ•œ๋‹ค.
ํ•ด๋‹น ํด๋ž˜์Šค๋“ค์€ ์‹ค์ œ ๋นˆ์„ ์ €์žฅํ•˜๊ณ  ์žˆ๋Š” JobContext์™€ StepContext๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ๋‹ค. (๋งˆ์น˜ Spring์˜ ApplicationContext์™€ ๊ฐ™์ด)

img_18.png

์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜ ๊ตฌ๋™ โ–ถ ApplicationContext์—์„œ ๋นˆ์„ ์ƒ์„ฑ โ–ถ @JobScope, StepScope๊ฐ€ ์žˆ๋Š”๊ฐ€? โ–ถ ์žˆ์œผ๋ฉด proxy, ์—†์œผ๋ฉด Singleton Bean ์ƒ์„ฑ
์Šคํ”„๋ง ์ดˆ๊ธฐ์™€ ์™„๋ฃŒ, Job์‹คํ–‰ โ–ถ Job ์—์„œ Proxy ํ˜ธ์ถœ โ–ถ proxy์—์„œ ์‹ค์ œ Step Bean ์ฐธ์กฐ โ–ถ Step Bean ์ด ์žˆ๋‹ค๋ฉด ๊บผ๋‚ด์ฃผ๊ณ  ์—†๋‹ค๋ฉด beanFactory ์—์„œ ์ƒ์„ฑ(@Value ๋ฐ”์ธ๋”ฉ๋„ ์ด๋•Œ)
โ–ถJobScope ํด๋ž˜์Šค์—์„œ ์‹ค์ œ Bean์„ JobContext์— ๋“ฑ๋ก, ๊ด€๋ฆฌ


๐Ÿ“Œ Chunk Process

๐Ÿง Chunk?

img_19.png

Chunk ๋ž€ ์—ฌ๋Ÿฌ๊ฐœ์˜ ์•„์ดํ…œ์„ ๋ฌถ์€ ๋ฉ์–ด๋ฆฌ ๋ธ”๋ก์œผ๋กœ, ์•„์ดํ…œ์„ ์ž…๋ ฅ๋ฐ›์•„ ๋ฉ์–ด๋ฆฌ๋กœ ๋งŒ๋“  ํ›„ Chunk ๋‹จ์œ„๋กœ ํŠธ๋žœ์žญ์…˜์„ ์ฒ˜๋ฆฌํ•œ๋‹ค.
์ผ๋ฐ˜์ ์œผ๋กœ ๋Œ€์šฉํ–ฅ ๋ฐ์ดํ„ฐ๋ฅผ ํ•œ๋ฒˆ์— ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์ด ์•„๋‹Œ chunk ๋‹จ์œ„๋กœ ์ชผ๊ฐœ์–ด ๋ฐ˜๋ณต ์ž…์ถœ๋ ฅ ํ•  ๋•Œ ์‚ฌ์šฉ๋œ๋‹ค.

img_20.png

  • Chunk<I> ๋Š” ItemReader ๋กœ๋ถ€ํ„ฐ ์ฝ์€ ์•„์ดํ…œ์„ Chunk Size ๋งŒํผ ๋ฐ˜๋ณตํ•ด์„œ ์ €์žฅํ•œ๋‹ค.
  • Chunk<O> ๋Š” ItemReader๋กœ ๋ถ€ํ„ฐ ์ „๋‹ฌ๋ฐ›์€ Chunk<I>๋ฅผ ์ฐธ์กฐํ•˜์—ฌ ItemProcessor์—์„œ ๊ฐ€๊ณต๋œ ์•„์ดํ…œ๋“ค์„ ItemWriter ์—๊ฒŒ ์ „๋‹ฌํ•œ๋‹ค.
  • ItemReader ์™€ Processor ๋Š” ์•„์ดํ…œ์„ ๊ฐœ๋ณ„์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•˜์ง€๋งŒ ItemWriter ๋Š” ์ผ๊ด„์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•œ๋‹ค.(List ๋ฅผ ๋ฐ›์•„)
@Bean
public Step chunkStep() {
    return stepBuilderFactory.get("chunkStep")
        .<String, String>chunk(5)
        .reader(new ListItemReader<>(Arrays.asList("item1", "item2", "item3", "item4", "item5")))
        .processor(new ItemProcessor<String, String>() {
            @Override
            public String process(String item) throws Exception {
                // Do Something
                return ... 
            }
        })
        .writer(new ItemWriter<String>() {
            @Override
            public void write(List<? extends String> items) throws Exception {
                // Do Something (์ถœ๋ ฅ, DB ์ €์žฅ, ํŒŒ์ผ ์“ฐ๊ธฐ ๋“ฑ..)
            }
        })
        .build();
}
  • ์†์„ฑ

    • List Items
    • List skips: ์˜ค๋ฅ˜ ๋ฐœ์ƒ์œผ๋กœ ์Šคํ‚ต๋œ ์•„์ดํ…œ
    • List errors
    • iterator()

      Inner Class์ธ ChunkIterator๊ฐ€ ๋ฐ˜ํ™˜๋œ๋‹ค.

๐Ÿง ChunkOrientedTasklet

์Šคํ”„๋ง ๋ฐฐ์น˜์—์„œ ์ œ๊ณตํ•˜๋Š” Tasklet์˜ ๊ตฌํ˜„์ฒด๋กœ, Chunk ๊ธฐ๋ฐ˜ ํ”„๋กœ์„ธ์‹ฑ์„ ๋‹ด๋‹นํ•˜๋Š” ๋„๋ฉ”์ธ ๊ฐ์ฒด์ด๋‹ค.
Tasklet์— ์˜ํ•ด ๋ฐ˜๋ณต์ ์œผ๋กœ ์‹คํ–‰๋˜๋ฉฐ ์‹คํ–‰๋  ๋•Œ๋งˆ๋‹ค ์ƒˆ๋กœ์šด ํŠธ๋žœ์žญ์…˜์ด ์ƒ์„ฑ๋˜์–ด ์ฒ˜๋ฆฌ๊ฐ€ ์ด๋ฃจ์–ด์ง„๋‹ค. ๋•Œ๋ฌธ์— ์˜ˆ์™ธ ๋ฐœ์ƒ์œผ๋กœ ๋กค๋ฐฑ์ด ์ด๋ฃจ์–ด์ ธ๋„ ์ด์ „์— ์ปค๋ฐ‹ํ•œ Chunk๋Š” ๋กค๋ฐฑ๋˜์ง€ ์•Š๋Š”๋‹ค.

๋‚ด๋ถ€์—๋Š” ItemReader ๋ฅผ ํ•ธ๋“ค๋งํ•˜๋Š” ChunkProvider ์™€ ItemProcessor, ItemWriter ๋ฅผ ํ•ธ๋“ค๋งํ•˜๋Š” ChunkProcessor ํƒ€์ž…์˜ ๊ตฌํ˜„์ฒด๊ฐ€ ์กด์žฌํ•œ๋‹ค.

TaskletStep โ–ถ ChunkOrientedTasklet.execute() โ–ถ ChunkProvider โ–ถ ItemReader๋ฅผ ํ†ตํ•ด read(chunkSize ๋งŒํผ ๋ฐ˜๋ณต) โ–ถ ChunkProcessor๋ฅผ ํ†ตํ•ด process(inputs)
โ–ถ ItemProcessor์—๊ฒŒ ์ฒ˜๋ฆฌ ์œ„์ž„ Iterator๋กœ ์ˆœํšŒํ•˜๋ฉฐ ์ฒ˜๋ฆฌ โ–ถ ItemWriter โ–ถ ChunkOrientedTasklet ์œผ๋กœ ๋Œ์•„๊ฐ€ ์•„์ดํ…œ์ด ์—†์„ ๋•Œ ๊นŒ์ง€ ๋ฐ˜๋ณต

Chunk๋ฅผ ์ง„ํ–‰ํ•˜๋ฉฐ ChunkContext ์— item ๋“ค์„ ์บ์‹ฑํ•œ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜์—ฌ ์žฌ ์‹œ๋„ํ•  ๊ฒฝ์šฐ ์•„์ดํ…œ์„ ๋‹ค์‹œ ์ฝ์–ด์˜ค๋Š” ๊ฒƒ์ด ์•„๋‹ˆ๋ผ ์บ์‹ฑ๋œ ์•„์ดํ…œ์„ ๊บผ๋‚ด ๋‹ค์‹œ ์ฒ˜๋ฆฌํ•œ๋‹ค.
์บ์‹ฑ๋œ ๋ฐ์ดํ„ฐ๋Š” ํ•ด๋‹น Chunk๊ฐ€ ๋ชจ๋‘ ์ˆ˜ํ–‰๋˜์—ˆ์„ ๊ฒฝ์šฐ ์ œ๊ฑฐํ•˜๊ฒŒ ๋œ๋‹ค.

๐Ÿ‘† API

  • .<I, O>chunk(size)

    • input, output ์ œ๋„ค๋ฆญ ํƒ€์ž…์˜ ์„ค์ •, commit interval ์ง€์ •.
    • SimpleStepBuilder ๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
  • .<I, O>chunk(CompletionPolicy)

    • Chunk ํ”„๋กœ์„ธ์Šค๋ฅผ ์™„๋ฃŒํ—ˆ๊ธฐ ์œ„ํ•œ ์ •์ฑ…์„ ์„ค์ •ํ•˜๋Š” ํด๋ž˜์Šค.
  • .reader(ItemReader), .processor(ItemProcessor), .writer(ItemWriter))

    • Processor๋Š” ํ•„์ˆ˜์ ์œผ๋กœ ์‚ฌ์šฉํ•˜์ง€ ์•Š์•„๋„ ๋œ๋‹ค.
  • .stream(ItemStream)

    • ์žฌ์‹œ์ž‘ ๋ฐ์ดํ„ฐ๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” ์ฝœ๋ฐฑ์— ๋Œ€ํ•œ ์ŠคํŠธ๋ฆผ.
  • .readerIsTransactionalQueue()

    • MQS, JMS ๊ฐ™์ด ํŠธ๋žœ์žญ์…˜ ์™ธ๋ถ€์—์„œ ์ฝ๊ณ , ์บ์‹œํ•  ๊ฒƒ์ธ์ง€์˜ ์—ฌ๋ถ€, ๊ธฐ๋ณธ์€ false ์ด๋‹ค.
  • .listener(CHunkListener)

๐Ÿง ChunkProvider / ChunkProcessor

  • ChunkProvider

    ItemReader๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์†Œ์Šค๋กœ๋ถ€ํ„ฐ ์•„์ดํ…œ์„ chunk size ๋งŒํด ์ฝ์–ด ์ œ๊ณตํ•˜๋Š” ๋„๋ฉ”์ธ ๊ฐ์ฒด์ด๋‹ค.
    Chunk<I>๋ฅผ ๋งŒ๋“ค๊ณ  ๋ฐ˜๋ณต๋ฌธ์„ ์‚ฌ์šฉํ•ด ItemReader.read()๋ฅผ ํ˜ธ์ถœํ•˜๋ฉฐ ์•„์ดํ…œ์„ chunk์— ์Œ“๊ณ , ์‚ฌ์ด์ฆˆ๋งŒํผ ์•„์ดํ…œ ์ฝ๊ธฐ๋ฅผ ๋งˆ์น˜๋ฉด ChunkProcessor๋กœ ๋„˜์–ด๊ฐ„๋‹ค.
    ๋งŒ์•ฝ ๋”์ด์ƒ ์ฝ์„ ์•„์ดํ…œ์ด ์—†๋Š”๊ฒฝ์šฐ(null) chunk ํ”„๋กœ์„ธ์Šค๋ฅผ ์ข…๋ฃŒํ•œ๋‹ค.

    ๊ธฐ๋ณธ ๊ตฌํ˜„์ฒด๋กœ SimpleChunkProvider, FaultTolerantChunkProvider(์˜ˆ์™ธ ๋ฐœ์ƒ์‹œ skip, retry) ์ด ์žˆ๋‹ค.

  • ChunkProcessor

    ItemProcessor๋ฅผ ์‚ฌ์šฉํ•ด์„œ Item์„ ๊ฐ€๊ณตํ•˜๊ณ , ItemWriter๋ฅผ ์‚ฌ์šฉํ•ด์„œ Chunk ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅ, ์ถœ๋ ฅํ•œ๋‹ค.
    Chunk<O>๋ฅผ ์ƒ์„ฑํ•˜๊ณ  ๋„˜์–ด์˜จ Chunk<I> ์—์„œ ์•„์ดํ…œ์„ ํ•œ ๊ฑด์”ฉ ๊บผ๋‚ด ์ฒ˜๋ฆฌํ•œ ํ›„ Chunk<O> ์— ๊ฒฐ๊ณผ๋ฅผ ์ €์žฅํ•œ๋‹ค.
    ItemProcessor๋Š” ํ•„์ˆ˜ ์‚ฌํ•ญ์ด ์•„๋‹ˆ๊ธฐ ๋•Œ๋ฌธ์— ์—†๋‹ค๋ฉด ์•„๋ฌด์ฒ˜๋ฆฌ ์—†์ด ๊ทธ๋Œ€๋กœ Chunk<O>์— ์ €์žฅ๋˜๊ฒŒ ๋œ๋‹ค.
    ItemWriter ๊นŒ์ง€์˜ ์ฒ˜๋ฆฌ๊ฐ€ ์™„๋ฃŒ๋˜๋ฉด ํ•ด๋‹น Chunk ํŠธ๋žœ์žญ์…˜์ด ์ข…๋ฃŒ๋˜๊ณ , ๋‹ค์Œ ChunkOrientedTasklet์ด ์‹คํ–‰๋œ๋‹ค.

    ๊ธฐ๋ณธ ๊ตฌํ˜„์ฒด๋กœ๋Š” SimpleChunkProcessor ์™€ FaultTolerantChunkProcessor๊ฐ€ ์žˆ๋‹ค.

๐Ÿง ItemReader, ItemWriter, ItemProcessor

  • ItemReader

    • csvm txt ๋“ฑ์˜ ํ”Œ๋žซ ํŒŒ์ผ
    • XML, JSON
    • DB
    • JMS์™€ ๊ฐ™์€ Message Queuing ์„œ๋น„์Šค
    • Custom Reader

    ๋“ฑ์˜ ๋‹ค์–‘ํ•œ ์†Œ์Šค์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด ์ œ๊ณตํ•˜๋Š” ์ธํ„ฐํŽ˜์ด์Šค๋กœ ์•„์ดํ…œ์„ ํ•˜๋‹ˆ์”ฉ์ฝ์–ด ๋ฐ˜ํ™˜ํ•˜๊ณ , ๋” ์ด์ƒ ์—†๋‹ค๋ฉด null์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค.
    ExecutionContext์— read์™€ ๊ด€๋ จ๋œ ์—ฌ๋Ÿฌ ์ƒํƒœ ์ •๋ณด๋ฅผ ์ €์žฅํ•ด ์žฌ์‹œ์ž‘์‹œ ๋‹ค์‹œ ์ฐธ์กฐํ•˜๋„๋ก ์ง€์›ํ•œ๋‹ค.

  • ItemWriter

    ItemReader ์—์„œ ์ฝ์€ ์•„์ดํ…œ๋“ค์„ ๋ฆฌ์ŠคํŠธ๋กœ ์ „๋‹ฌ๋ฐ›์•„ ์ถœ๋ ฅํ•œ๋‹ค. ์ถœ๋ ฅ์ด ์™„๋ฃŒ๋˜๊ณ  ํŠธ๋žœ์žญ์…˜์ด ์ข…๋ฃŒ๋˜๋ฉด ์ƒˆ๋กœ์šด Chunk ๋‹จ์œ„ ํ”„๋กœ์„ธ์Šค๋ฅผ ์ง„ํ–‰ํ•œ๋‹ค.

  • ItemProcessor

    ๋ฐ์ดํ„ฐ๋ฅผ ์ถœ๋ ฅํ•˜๊ธฐ์ „ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€๊ณต, ๋ณ€ํ˜•, ํ•„ํ„ฐ๋ง(null์„ ๋ฐ˜ํ™˜ํ•˜๋ฉด ํ•„ํ„ฐ๋ง ๋œ๋‹ค) ํ•œ๋‹ค. ItemReader ์™€ ItemWriter ์™€ ๋…๋ฆฝ๋˜์–ด ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง์„ ๊ตฌํ˜„ํ•œ๋‹ค. Reader ์—์„œ ๋ฐ›์€ ์•„์ดํ…œ์„ ํŠน์ • ํƒ€์ž…์œผ๋กœ ๋ณ€ํ™˜ํ•˜์—ฌ Wirter์— ๋„˜๊ฒจ์ค€๋‹ค.
    ์ค‘๊ฐ„ ์ฒ˜๋ฆฌ์˜ ์—ญํ• ์ด๊ธฐ ๋•Œ๋ฌธ์— ํ•„์ˆ˜์š”์†Œ๊ฐ€ ์•„๋‹ˆ๊ณ , Processor ๊ฐ€ ์—†์œผ๋ฉด ์•„์ดํ…œ์€ ๊ทธ๋Œ€๋กœ Writer์— ์ „๋‹ฌ๋œ๋‹ค.

๋Œ€๋ถ€๋ถ„ ItemReader์™€ ItemWriter๋Š” ์Šคํ”„๋ง์—์„œ ์ œ๊ณตํ•˜๋Š” ๊ตฌํ˜„์ฒด๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ๋งŽ๊ณ , ItemProcessor๋Š” ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง์„ ๋‹ด๊ธฐ ๋•Œ๋ฌธ์— ์ง์ ‘ ๊ตฌํ˜„ํ•œ๋‹ค.

๐Ÿง ItemStream

ExecutionContext ๋ฅผ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ๋ฐ›์•„ ItemReader, ItemWriter ์ฒ˜๋ฆฌ์‹œ ์ƒํƒœ๋ฅผ ์ €์žฅํ•˜๊ณ , ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ํ•ด๋‹น ์ƒํƒœ๋ฅผ ์ฐธ์กฐํ•˜์—ฌ ์žฌ์‹œ์ž‘ ํ•˜๋„๋ก ์ง€์›ํ•œ๋‹ค.
ItemReader, ItemWriter ์˜ ๊ตฌํ˜„์ฒด๋Š” ItemSteam ์„ ๊ตฌํ˜„ํ•ด์•ผ ํ•œ๋‹ค.(ItemStreamReader, ItemStreamWriter ์„ ๊ตฌํ˜„ํ•˜๋ฉด ๋œ๋‹ค.)

  • open(ExecutionContext)

    • read(), write() ์ „์— ํŒŒ์ผ์ด๋‚˜ ์ปค๋„ฅ์…˜์ดํ•„์š”ํ•œ ๋ฆฌ์†Œ์Šค์ œ ์ ‘๊ทผํ•˜๋„๋ก ์ดˆ๊ธฐํ™”ํ•˜๋Š” ์ž‘์—….
  • update(ExecutionContext)

    • ํ˜„์žฌ๊นŒ์ง€์˜ ์ƒํƒœ๋ฅผ ์ €์žฅ
  • close(ExecutionContext)

    • ์—ด๋ ค์žˆ๋Š” ๋ฆฌ์†Œ์Šค ํ•ด์ œ. (์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ–ˆ์„ ๋•Œ๋„ ํ˜ธ์ถœ๋˜์–ด ๋ฆฌ์†Œ์Šค๋ฅผ ํ•ด์ œํ•œ๋‹ค)

ItemReader, ItemWriter ๊ฐ€ ๋™์ž‘ํ•˜๊ธฐ ์ „์— ItemStream์—์„œ open() ์„ ํ†ตํ•ด ๋ฆฌ์†Œ์Šค๋ฅผ ์—ด๊ณ  ์ดˆ๊ธฐํ™” ํ•œ๋‹ค.
๊ทธ ํ›„ ์•„์ดํ…œ์„ ์ฝ์–ด์˜ฌ ๋•Œ, ์“ธ ๋•Œ chunk ๋งˆ๋‹ค update()๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ DB์— ์ €์žฅํ•œ๋‹ค.

public class CustomItemReader implements ItemStreamReader<Member> {

    private final List<Member> items;
    private int index;
    private boolean restartable;

    public CustomItemReader(List<Member> items) {
        this.items = new ArrayList<>(items);
        this.index = 0;
        this. restartable = false;
    }

    @Override
    public Member read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        Member item = null;

        if(this.index < this.items.size()) {
            item = this.items.get(index++);
        }

        if(index == 8 && !restartable) {
            throw  new RuntimeException("Restart is required");
        }

        return item;
    }

    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if(executionContext.containsKey("index")) {
            index = executionContext.getInt("index");
            this.restartable = true;
        }
        else {
            executionContext.put("index", index);
        }
    }

    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.put("index", index);
    }

    @Override
    public void close() throws ItemStreamException {
        System.out.printf("๋ฆฌ์†Œ์Šค ํ•ด์ œ.");
    }
}
  • open() ์—์„œ ์ด์ „์— ์‹คํ–‰ํ•ด index๊ฐ€ ์กด์žฌํ•œ๋‹ค๋ฉด ํ•ด๋‹น index ๊ฐ’์„ ๋ถˆ๋Ÿฌ์™€ ๊ทธ ์œ„์น˜๋ถ€ํ„ฐ ์‹คํ–‰ํ•œ๋‹ค.
  • close(): ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ–ˆ์„ ๋•Œ ํ˜ธ์ถœ๋˜์–ด, ๋ฆฌ์†Œ์Šค๋ฅผ ํ•ด์ œ์‹œํ‚จ๋‹ค.

Chunk Size๋ฅผ 2๋กœ ์ฃผ๊ณ , ๋ฆฌ์†Œ์Šค๋กœ 10๊ฐœ์˜ ์•„์ดํ…œ์„ ์ฃผ์—ˆ๋‹ค.
8๋ฒˆ์งธ ์•„์ดํ…œ์„ ์ฝ์€ ํ›„์— ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๋„๋ก ์„ค์ •ํ•ด ๋ณด์•˜๋‹ค. ์•„๋ž˜ ์ด๋ฏธ์ง€๋Š” ํ…Œ์ŠคํŠธ์˜ ๊ฒฐ๊ณผ์ด๋‹ค.

img_21.png

์ตœ์ดˆ์— Reader์™€ Writer์˜ Stream์ด Open ๋˜๊ณ , Update ๊ฐ€ ํ•œ๋ฒˆ ํ˜ธ์ถœ ๋œ๋‹ค.
๊ทธ ๋’ค์— ์•„์ดํ…œ์„ ์ฒญํฌ ์‚ฌ์ด์ฆˆ๋งŒํผ ์ฝ๊ณ , Processor๊ฐ€ ๋™์ž‘ํ•œ ํ›„ Write๊ฐ€ ์ด๋ฃจ์–ด ์ง„๋‹ค.(USER1, USER2 ์™€ ๊ฐ™์ด ์ถœ๋ ฅํ•˜๋„๋ก ํ•จ)
read(), process(), write() ๊ฐ€ ํ•œ chunk์— ๋Œ€ํ•ด ๋ชจ๋‘ ์‹คํ–‰๋˜๋ฉด Reaader, Writer์˜ Stream์—์„œ Update()๊ฐ€ ํ˜ธ์ถœ๋˜์–ด ์ƒํƒœ๋ฅผ ์ €์žฅํ•œ๋‹ค.

img_22.png

8๋ฒˆ์งธ ์•„์ดํ…œ์—์„œ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด Close()๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ๋ฆฌ์†Œ์Šค๋ฅผ ํ•ด์ œํ•˜๊ณ  ์ข…๋ฃŒํ•œ๋‹ค.
๋ฌผ๋ก  ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š์•„๋„ Close()๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ๋ฆฌ์†Œ์Šค๋ฅผ ํ•ด์ œํ•œ๋‹ค.

๋‹ค์Œ์— ํ•ด๋‹น ์ธ์Šคํ„ด์Šค๋ฅผ ๋‹ค์‹œ ์‹คํ–‰ํ•˜๊ฒŒ ๋˜๋ฉด ExecutionContext ์—์„œ ์ธ๋ฑ์Šค๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ (8์„ ์‹คํ–‰ํ•˜๋‹ค ๋กค๋ฐฑ ๋˜์—ˆ์œผ๋ฏ€๋กœ ์ด์ „ Chunk์ธ 6๊นŒ์ง€ ์ €์žฅ๋˜์—ˆ๋‹ค.)
restartable์ด true๋กœ ๋ฐ”๋€Œ๊ธฐ ๋–„๋ฌธ์— item 10 ๊นŒ์ง€ ์ •์ƒ์ ์œผ๋กœ ์‹คํ–‰๋œ๋‹ค.

๐Ÿง Chunk Process ์•„ํ‚คํ…์ฒ˜

img_23.png

์„ค๋ช…์€ ์œ„์—์„œ ๊ณ„์† ํ–ˆ์œผ๋‹ˆ ์ƒ๋žตํ•œ๋‹ค.


๐Ÿ“Œ ItemReader ๊ตฌํ˜„์ฒด

๐Ÿง FlatFileItemReader

ํ‘œ์™€ ๊ฐ™์€ 2์ฐจ์› ๋ฐ์ดํ„ฐ๋กœ ํ‘œํ˜„๋œ ์œ ํ˜•์˜ ํŒŒ์ผ์„ ์ฒ˜๋ฆฌํ•œ๋‹ค. ์ผ๋ฐ˜์ ์œผ๋กœ ๊ณ ์ •์œ„์น˜๋กœ ์ •์˜๋œ ๋ฐ์ดํ„ฐ๋‚˜, ํŠน์ˆ˜ ๋ฌธ์ž์— ์˜ํ—ค ๊ตฌ๋ณ„๋œ ๋ฐ์ดํ„ฐ์˜ ํ–‰์„ ์ฝ๋Š”๋‹ค.
Resource(์ฝ์–ด์•ผํ•  ๋ฐ์ดํ„ฐ)์™€ LineMapper(Line String to Object) ๊ฐ€ ํ•„์š”ํ•˜๋‹ค.

  • ๐Ÿ‘† ์†์„ฑ

    • String encoding

    • int linesToSkip

      • ํŒŒ์ผ ์ƒ๋‹จ๋ถ€ํ„ฐ ๋ฌด์‹œํ•  ๋ผ์ธ ์ˆ˜ (ํ—ค๋” ๋“ฑ์„ ์Šคํ‚ตํ• ๋•Œ ์‚ฌ์šฉ)
      • LineCallbackHandler ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ๊ฑด๋„ˆ๋›ด๋‹ค.
    • String[] comments

      • ํ•ด๋‹น ๋ฌธ์ž๊ฐ€ ์žˆ๋Š” ๋ผ์ธ์€ ๋ฌด์‹œํ•œ๋‹ค.
    • Resource resource

      • FileSystemResource, ClassPathResource ...
    • LineMapper<T> lineMapper

      • Line์„ ์ฝ์–ด ๊ฐ์ฒด๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.
      • LineTokenizer
        • ๋ผ์ธ์„ FieldSet ์œผ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค. ํŒŒ์ผ ํ˜•์‹์— ๋งž์ถฐ FieldSet ์œผ๋กœ ๋ณ€ํ™˜ํ•˜๋Š” ์ž‘์—…์„ ์ถ”์ƒํ™”ํ•ด์•ผํ•œ๋‹ค.
        • ๊ตฌ๋ถ„์ž๋ฅผ ์ด์šฉํ•˜๋Š” DelimitedLineTokenizer, ๊ณ ์ •๊ธธ์ด ๋ฐฉ์‹์˜ FixedLengthTokenizer ๊ฐ€ ์žˆ๋‹ค.
      • FieldSet
        • ๋ผ์ธ์„ ๊ตฌ๋ถ„์ž๋กœ ๊ตฌ๋ถ„ํ•ด์„œ ํ† ํฐ ๋ฐฐ์—ด์„ ์ƒ์„ฑํ•œ๋‹ค.
      • FieldSetMapper
        • FieldSet์„ ๊ฐ์ฒด์— ๋งคํ•‘ํ•˜์—ฌ ๋ฐ˜ํ™˜ํ•œ๋‹ค.(๊ฐ์ฒด์˜ ํ•„๋“œ๋ช…๊ณผ ๋งคํ•‘, BeanWrapperFieldSetMapper๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.)
  • ๐Ÿ‘† API

    • .name(String name)
      • ExecutionContext ๋‚ด์—์„œ ๊ตฌ๋ถ„ํ•˜๊ธฐ ์œ„ํ•œ key๋กœ ์ €์žฅ๋œ๋‹ค.
    • .resource(Resource)
    • .delimited().delimiter()
    • .fixedLength()
      • ๊ธธ์ด๋ฅผ ๊ธฐ์ค€์œผ๋กœ ํŒŒ์ผ์„ ์ฝ์Œ
    • .addColumns(Range)
      • ๊ณ ์ • ๊ธธ์ด์˜ ๋ฒ”์œ„
    • .names(String[] fieldNames)
      • ๋งคํ•‘๋  ๊ฐ์ฒด์˜ ํ•„๋“œ๋ช…
    • .targetType(Class)
    • .addComment(String comment)
      • ๋ฌด์‹œํ•  ๋ผ์ธ์˜ ๊ธฐํ˜ธ ์„ค์ •.
    • .stric(false)
      • ๋ผ์ธ์„ ์ฝ์„ ๋•Œ ํŒŒ์‹ฑ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š๋„๋ก ๊ฒ€์ฆ ์ƒ๋žต ์„ค์ •. ๊ธฐ๋ณธ์€ true
    • .encoding(String encoding)
    • .lineToSkip(num)
    • .saveState(false)
      • ์ƒํƒœ ์ •๋ณด๋ฅผ ์ €์žฅํ•  ๊ฒƒ์ธ์ง€, ๊ธฐ๋ณธ์€ true
    • .setLineMapper(LineMapper)
    • .setFieldSetMapper(FieldSetMapper)
    • .setLineTokenizer(LineTokenizer)
@Bean
public ItemReader<? extends Member> itemReader() {
    FlatFileItemReader<Member> itemReader = new FlatFileItemReader<>();

    DefaultLineMapper<Member> lineMapper = new DefaultLineMapper<>();
    lineMapper.setLineTokenizer(new DelimitedLineTokenizer()); // ๊ธฐ๋ณธ ๊ตฌ๋ถ„์ž ','
    lineMapper.setFieldSetMapper(new MemberFieldSetMapper());

    itemReader.setLineMapper(lineMapper);
    itemReader.setResource(new ClassPathResource("/member.csv"));
    itemReader.setLinesToSkip(1);

    return itemReader;
}
  • LineMapper
@Setter
public class DefaultLineMapper<T> implements LineMapper<T> {

    private LineTokenizer lineTokenizer;
    private FieldSetMapper<T> fieldSetMapper;

    @Override
    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(lineTokenizer.tokenize(line));
    }
}
  • FieldSetMapper
public class MemberFieldSetMapper implements FieldSetMapper<Member> {

    @Override
    public Member mapFieldSet(FieldSet fieldSet) throws BindException {
        if(fieldSet == null){
            return null;
        }

        Member member = new Member();
        member.setName(fieldSet.readString(0));
        member.setId(fieldSet.readString(1));

        return member;
    }
}

ํŒŒ์ผ์—์„œ ํ•œ ์ค„์„ ์ฝ์–ด์˜ด โ–ถ LineTokenizer ์—์„œ ํŒŒ์‹ฑํ•ด ํ† ํฐ ๋ฐฐ์—ด ์ƒ์„ฑ(DefaultFieldSet) โ–ถ FieldSetMapper ์—์„œ fieldSet์„ ํ† ๋Œ€๋กœ ๊ฐ์ฒด ์ƒ์„ฑ, ๋ฐ˜ํ™˜
โ–ถ ํŒŒ์ผ์˜ ๋๊นŒ์ง€ ๋ฐ˜๋ณต

names๋ฅผ ๋„ฃ์–ด์ฃผ์ง€ ์•Š์•˜๊ธฐ ๋–„๋ฌธ์— ์ธ๋ฑ์Šค๋กœ ๊ฐ’์„ ๊ฐ€์ ธ์™”๋‹ค. LineTokenizer์˜ setNames()๋ฅผ ์„ค์ •ํ•ด์ฃผ๋ฉด ํ•„๋“œ๋ช…์œผ๋กœ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

ํ•˜๊ธฐ์™€ ๊ฐ™์ด Builder๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋” ๊น”๋”ํ•˜๊ฒŒ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค.

@Bean
public ItemReader itemReader() {
    return new FlatFileItemReaderBuilder<Member>()
    .name("flatFile")
    .resource(new ClassPathResource("/member.csv"))
    .fieldSetMapper(new BeanWrapperFieldSetMapper<>())
    .targetType(Member.class)
    .delimited().delimiter(",")
    .names("name", "id")
    .linesToSkip(1)
    .build();
    }

์ฝ”๋“œ๋ฅผ ๋ณด๋ฉด LineMapper๋ฅผ ์„ค์ •ํ•˜๋Š” ๋ถ€๋ถ„์ด ๋น ์กŒ๋‹ค๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ๋Š”๋ฐ, ์‚ฌ์‹ค ๋”ฐ๋กœ ์ƒ์„ฑํ•ด์„œ ๋„ฃ์–ด์ฃผ์ง€ ์•Š์•„๋„, ์Šคํ”„๋ง์—์„œ ์ œ๊ณตํ•˜๋Š” DefaultLineMapper๊ฐ€ ์กด์žฌํ•˜๊ณ ,์ด๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
๋งˆ์ฐฌ๊ฐ€์ง€๋กœ FieldSetMapper ๋˜ํ•œ ์Šคํ”„๋ง์—์„œ ์ œ๊ณตํ•˜๋Š” BeanWrapperFieldSetMapper๋ฅผ ์‚ฌ์šฉํ•˜๊ณ , ํƒ€๊ฒŸ ํด๋ž˜์Šค๋ฅผ ์ง€์ •ํ•ด ์ฃผ๋ฉด ํ•„๋“œ๋ช…์— ๋งž๊ฒŒ ๋งคํ•‘ํ•ด์ค€๋‹ค.

  • ๊ณ ์ • ๊ธธ์ด๋กœ ๊ตฌ๋ถ„

    .fixedLength() // fixedLengthBuilder ๋ฐ˜ํ™˜
    .names("name", "id")
    .addColumns(new Range(1-5))
    .addCloumns(new Range(6-10))
    ๋ฌธ์ž์—ด์ด ์•„๋‹˜์— ์ฃผ์˜ํ•˜์ž.

๐Ÿ‘† Exception Handling

  • IncorrectTokenCountException
    • ๋„ฃ์–ด์ค€ ํ† ํฐ ํ•„๋“œ์˜ ์ด๋ฆ„(names)์˜ ์ˆ˜๋ณด๋‹ค ์ฝ์–ด๋“ค์ธ ํ† ํฐ์˜ ์ˆ˜๊ฐ€ ๋‹ค๋ฅผ ๋•Œ ๋ฐœ์ƒํ•œ๋‹ค.
  • IncorrectLineLengthException
    • ์ง€์ •ํ•ด์ค€ ์ปฌ๋Ÿผ๋“ค์˜ ๊ธธ์ด๋ณด๋‹ค ๋ผ์ธ ์ „์ฒด ๊ธธ์ด๊ฐ€ ์ผ์น˜ํ•˜์ง€ ์•Š์„ ๋•Œ ๋ฐœ์ƒํ•œ๋‹ค.

๊ธฐ๋ณธ์ ์œผ๋กœ๋Š” stric ์˜ต์…˜์ด true ์ด๊ธฐ ๋•Œ๋ฌธ์— ํ† ํฐํ™”๋ฅผ ์ˆ˜ํ–‰ํ•  ๋•Œ ์ด๋ฅผ ๊ฒ€์ฆํ•˜๊ฒŒ ๋˜๊ณ , ์˜ˆ์™ธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚จ๋‹ค. ํ•˜์ง€๋งŒ ํ•ด๋‹น ์˜ต์…˜์„ false๋กœ ์ฃผ๊ฒŒ ๋œ๋‹ค๋ฉด
๋ผ์ธ ๊ธธ์ด๋‚˜ ์ปฌ๋Ÿผ๋ช…์„ ๊ฒ€์ฆํ•˜์ง€ ์•Š๊ฒŒ๋˜๊ธฐ ๋•Œ๋ฌธ์— ์˜ˆ์™ธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚ค์ง€ ์•Š๊ณ , ๋ฒ”์œ„๋‚˜ ์ด๋ฆ„์— ๋งž์ง€ ์•Š๋Š” ์ปฌ๋Ÿผ์€ ๋นˆ ํ† ํฐ์„ ๊ฐ€์ง€๊ฒŒ ๋œ๋‹ค.

๐Ÿง XML-StaxEventItemReader

StAX ?

Streaming API for XML, DOM ๊ณผ SAX ์˜ ์žฅ, ๋‹จ์ ์„ ๋ณด์™„ํ•œ API ๋ชจ๋ธ๋กœ PUSH, PULL ๋ฐฉ์‹์„ ๋ชจ๋‘ ์ œ๊ณตํ•œ๋‹ค.
XNL ํŒŒ์ผ์˜ ํ•ญ๋ชฉ์„ ์ง์ ‘ ์ด๋™ํ•˜๋ฉด์„œ Stax ํŒŒ์„œ๊ธฐ๋ฅผ ํ†ตํ•ด ๊ตฌ๋ฌธ์„ ๋ถ„์„ํ•œ๋‹ค.

  • Iterator API ๋ฐฉ์‹
    • XMLEventReader์˜ nextEvent()๋ฅผ ํ˜ธ์ถœํ•ด ์ด๋ฒคํŠธ ๊ฐ์ฒด๋ฅผ ๊ฐ€์ ธ์˜จ๋‹ค.
  • Cursor API ๋ฐฉ์‹
    • JDBC Resultset ์ฒ˜๋Ÿผ ๋™์ž‘, XMLStreamReader๋Š” XML ๋ฌธ์„œ์˜ ๋‹ค์Œ ์š”์†Œ๋กœ ์ปค์„œ๋ฅผ ์ด๋™ํ•œ๋‹ค.
    • ์ปค์„œ์—์„œ ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ํ˜„์žฌ ์ด๋ฒคํŠธ์˜ ์ •๋ณด๋ฅผ ์–ป๋Š”๋‹ค.

์Šคํ”„๋ง ๋ฐฐ์น˜์—์„œ๋Š” XML ๋ฐ”์ธ๋”ฉ์„ Spring OXM์—๊ฒŒ ์œ„์ž„ํ•˜๊ณ , ๋ฐ”์ธ๋”ฉ ๊ธฐ์ˆ ์„ ์ œ๊ณตํ•˜๋Š” ๊ตฌํ˜„์ฒด๋ฅผ ์„ ํƒํ•ด์„œ ์ฒ˜๋ฆฌํ•˜๋„๋ก ํ•œ๋‹ค.
Marshaller(๊ฐ์ฒด -> XML), UnMarchaller(XML -> ๊ฐ์ฒด)๋ฅผ ์ง€์›ํ•˜๋Š” ์˜คํ”ˆ์†Œ์Šค๋กœ๋Š” JaxB2, Castor, XmlBeans, Xstream ... ์ด ์žˆ๋‹ค.

์Šคํ”„๋ง ๋ฐฐ์น˜๋Š” StAX ๋ฐฉ์‹์œผ๋กœ ๋ฌธ์„œ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” StaxEventItemReader๋ฅผ ์ œ๊ณตํ•œ๋‹ค.

img_24.png XML ๋ฌธ์„œ๋ฅผ ์กฐ๊ฐ(fragment) ๋‹จ์œ„๋กœ ๋ถ„์„ํ•˜์—ฌ ์ฒ˜๋ฆฌํ•œ๋‹ค.(root element ๋ฅผ ํ•˜๋‚˜์˜ ์กฐ๊ฐ์œผ๋กœ)
์กฐ๊ฐ์„ ์ฝ์„ ๋•Œ๋Š” DOM์˜ Pull ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•˜๊ณ , ์ด๋ฅผ ๊ฐ์ฒด๋กœ ๋ฐ”์ธ๋”ฉ ํ• ๋•Œ๋Š” SAX์˜ Push ๋ฐฉ์‹์„ ์‚ฌ์šฉํ•œ๋‹ค.
fragment ๋‹จ์œ„๋กœ ์ฝ์–ด๋“ค์ธ ํ›„ SpringOXM ์—๊ฒŒ ๊ฐ์ฒด ๋งคํ•‘์„ ์œ„์ž„ํ•œ๋‹ค.

๋ฃจํŠธ ์—˜๋ฆฌ๋จผํŠธ๋ฅผ ๊ฐ์ฒด๋กœ, ๋‚ด๋ถ€์˜ ์ž์‹ ์—˜๋ฆฌ๋จผํŠธ๋“ค์„ ๋งคํ•‘๋  ๊ฐ์ฒด์˜ ํ•„๋“œ๋กœ ๋งคํ•‘ํ•œ๋‹ค.

  • ๐Ÿ‘† ์†์„ฑ

    • FragmentEventReader
      • XML ์กฐ๊ฐ์„ ๋…๋ฆฝํ˜• XML ๋ฌธ์„œ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ์ด๋ฒคํŠธ ํŒ๋…๊ธฐ
    • XMLEventReader
      • XML ์ด๋ฒคํŠธ ๊ตฌ๋ฌธ ๋ถ„์„์„ ์œ„ํ•œ ์ตœ์ƒ์œ„ ์ธํ„ฐํŽ˜์ด์Šค
    • Unmarshaller
      • XML to Object
    • Resource
    • List fragmentRootElementNames
      • ์กฐ๊ฐ ๋‹จ์œ„์˜ ๋ฃจํŠธ ์—˜๋ฆฌ๋จผํŠธ๋ช…์„ ๋‹ด์€ ๋ฆฌ์ŠคํŠธ.
  • ๐Ÿ‘† API

    StaxEventItemRedaderBuilder<T> ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

    • .name(String)
    • .resource(Resource)
    • .addFragmentRootElements(String ...)
      • root Elemnet๋ฅผ ์ง€์ •ํ•œ๋‹ค.
    • .unmarshaller(Unmarshaller)
      • ํƒ€๊ฒŸ ๊ฐ์ฒด ์„ค์ •.
    • .saveState(false)
      • ์ƒํƒœ ์ •๋ณด ์ €์žฅ์˜ ์—ฌ๋ถ€, ๊ธฐ๋ณธ๊ฐ’์€ true ์ด๋‹ค.

์˜์กด์„ฑ ์ถ”๊ฐ€

implementation 'com.thoughtworks.xstream:xstream:1.4.19'
implementation 'org.springframework:spring-oxm:5.3.16'
<?xml version="1.0" encoding="UTF-8" ?>
<members>
  <member id="1">
    <id>1</id>
    <name>user1</name>
  </member>
  <member>
    <id>2</id>
    <name>user2</name>
  </member>
  
   ...
  
</members>
@Bean
public ItemReader<? extends Member> itemReader() {
    return new StaxEventItemReaderBuilder<Member>()
        .name("staXml")
        .resource(new ClassPathResource("/member.xml"))
        .addFragmentRootElements("member")
        .unmarshaller(itemUnmarshaller())
        .build();
}

@Bean
public Unmarshaller itemUnmarshaller() {
    Map<String, Class<?>> aliases = new HashMap<>();
    aliases.put("member", Member.class);
    aliases.put("id", String.class);
    aliases.put("name", String.class);

    XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
    xStreamMarshaller.setAliases(aliases);

    return xStreamMarshaller;
}

Map ์— ์ฒ˜์Œ์œผ๋กœ ๋“ค์–ด๊ฐ€๋Š” ์š”์†Œ๋Š” RootElement์— ํ•ด๋‹นํ•˜๋Š” ๊ฒƒ์œผ๋กœ ๊ฐ์ฒด์™€ ๋งคํ•‘๋˜๊ณ  ๊ทธ ๋‹ค์Œ์˜ ์š”์†Œ๋“ค์€ ๊ฐ๊ฐ ๊ฐ์ฒด์˜ ํ•„๋“œ์™€ ๋งคํ•‘๋œ๋‹ค.

๐Ÿง JsonItemReader

img_25.png

Json ๋ฐ์ดํ„ฐ์˜ ํŒŒ์‹ฑ, ๋ฐ”์ธ๋”ฉ์„ JsonObjectReader ๊ตฌํ˜„์ฒด์—๊ฒŒ ์œ„์ž„ํ•˜์—ฌ ์ฒ˜๋ฆฌํ•œ๋‹ค.

JsonObjectReader ๊ตฌํ˜„์ฒด

  • GsonJsonObjectReader
  • JacksonJsonObjectReader
    • itemType: Json ๋ฐ์ดํ„ฐ๋ฅผ ๋งคํ•‘ํ•  ๊ฐ์ฒด์˜ ํƒ€์ž…
    • JsonParser
    • ObjectMapper
    • InputStream : Json ํŒŒ์ผ๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์˜ค๋Š” ์ž…๋ ฅ ์ŠคํŠธ๋ฆผ.
@Bean
public ItemReader<? extends Member> itemReader() {
    return new JsonItemReaderBuilder<Member>()
        .name("JsonReader")
        .resource(new ClassPathResource("/member.json"))
        .jsonObjectReader(new JacksonJsonObjectReader<>(Member.class))
        .build();
}
[
  {
    "name": "user1",
    "id": "1"
  },
  {
    "name": "user2",
    "id": "2"
  },
  ...
]

๐Ÿง DB-ItemReader

Cursor Based ์ฒ˜๋ฆฌ

JDBC ResultSet์˜ ๋ฉ”์ปค๋‹ˆ์ฆ˜์„ ์‚ฌ์šฉํ•œ๋‹ค. ํ˜„์žฌ ํ–‰์— ์ปค์„œ๋ฅผ ์œ ์ง€ํ•˜๋ฉฐ ๋ฐ์ดํ„ฐ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด ๋‹ค์Œ ํ–‰์œผ๋กœ ์ปค์„œ๋ฅผ ์ด๋™ํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š” Streaming ๋ฐฉ์‹์˜ I/O ์ด๋‹ค.
DB Connection์ด ์—ฐ๊ฒฐ๋˜๋ฉด ๋ฐฐ์น˜๊ฐ€ ์™„๋ฃŒ๋  ๋•Œ ๊นŒ์ง€ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์˜ค๊ธฐ ๋•Œ๋ฌธ์— ์†Œ์ผ“ ํƒ€์ž„์•„์›ƒ์„ ์ด์— ๋งž๊ฒŒ ์„ค์ •ํ•˜์•ผ ํ•œ๋‹ค.

  • ๋ชจ๋“  ๊ฒฐ๊ณผ๋ฅผ ๋ฉ”๋ชจ๋ฆฌ์— ํ• ๋‹นํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰์ด ๋งŽ๋‹ค.
  • Connection ์—ฐ๊ฒฐ ์œ ์ง€ ์‹œ๊ฐ„๊ณผ ๋ฉ”๋ชจ๋ฆฌ ๊ณต๊ฐ„์ด ์ถฉ๋ถ„ํ•˜๋‹ค๋ฉด ๋Œ€์šฉ๋Ÿ‰ ๋ฐ์ดํ„ฐ์˜ ์ฒ˜๋ฆฌ์— ์ ํ•ฉํ•˜๋‹ค.(fatchSize๋กœ ํ•œ๋ฒˆ์— ๊ฐ€์ ธ์˜ค๋Š” ์–‘ ์„ค์ • ๊ฐ€๋Šฅ)

Paging Based ์ฒ˜๋ฆฌ

ํŽ˜์ด์ง• ๋‹จ์œ„๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์กฐํšŒ, PageSize ๋งŒํผ ํ•œ๋ฒˆ์— ๋ฉ”๋ชจ๋ฆฌ์— ์˜ฌ๋ ค๋‘๊ณ , ํ•œ ๊ฐœ์”ฉ ์ฝ๋Š”๋‹ค.
Cursor์™€ ๋‹ฌ๋ฆฌ ํ•œ ํŽ˜์ด์ง€๋ฅผ ์ฝ์„ ๋•Œ ๋งˆ๋‹ค Connection์„ ์žฌ์—ฐ๊ฒฐ ํ•œ๋‹ค.

  • ํŽ˜์ด์ง• ๋‹จ์œ„์˜ ๊ฒฐ๊ณผ๋งŒ ๋ฉ”๋ชจ๋ฆฌ์— ํ• ๋‹นํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋ฉ”๋ชจ๋ฆฌ ์‚ฌ์šฉ๋Ÿ‰์ด ๋” ์ ์„ ์ˆ˜ ์žˆ๋‹ค.

  • ์ปค๋„ค๊ฒฌ ์—ฐ๊ฒฐ ์œ ์ง€์‹œ๊ฐ„์ด ์ ๊ณ , ๋ฉ”๋ชจ๋ฆฌ ๊ณต๊ฐ„์„ ํšจ์œจ์ ์œผ๋กœ ์‚ฌ์šฉํ•ด์•ผ ํ•˜๋Š” ๊ฒฝ์šฐ์— ์ ํ•ฉํ•˜๋‹ค.

  • ๐Ÿ‘† JdbcCursorItemReader

    img_26.png

    ์ปค์„œ ๊ธฐ๋ฐ˜์˜ JDBC ๊ตฌํ˜„์ฒด๋กœ ResultSet๊ณผ ํ•จ๊ป˜ ์‚ฌ์šฉ๋˜๋ฉฐ, Datasource์—์„œ Connection์„ ์–ป์–ด์™€ SQL์„ ์‹คํ–‰ํ•œ๋‹ค.
    Thread-safe ํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์— ๋ฉ€ํ‹ฐ ์Šค๋ ˆ๋“œ ํ™˜๊ฒฝ์—์„œ ๋™๊ธฐํ™” ์ฒ˜๋ฆฌ๊ฐ€ ํ•„์š”ํ•˜๋‹ค.

    Step์—์„œ read() ๊ฐ€ ํ˜ธ์ถœ ๋˜๋ฉด, JdbcCursorItemReader ์—์„œ fetchSize(chunkSize) ๋งŒํผ ์ฝ์–ด์˜จ ํ›„ ๋Œ๋ ค์ค€๋‹ค.

    • API

      JdbcCursorItemReaderBuilde() ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

      • .name(name)
      • .fetchSize(size)
      • .dataSource(DataSource)
      • .rowMapper(RowMapper)
        • ๋ฐ˜ํ™˜๋˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ์ฒด์— ๋งคํ•‘ํ•˜๊ธฐ ์œ„ํ•œ ์„ค์ •.
      • .beanRowMapper()
        • RowMapper ๋Œ€์‹  ํด๋ž˜์Šค ํƒ€์ž…์œผ๋กœ ์„ค์ •ํ•œ๋‹ค.
      • .sql(sql)
      • .queryArguments(Object ...)
        • ์ฟผ๋ฆฌ ํŒŒ๋ผ๋ฏธํ„ฐ ์„ค์ •
      • .maxItemCount(int)
      • .currentItemCount(int)
        • ์กฐํšŒ item์˜ ์‹œ์ž‘ ์ง€์ .
      • maxRows(int)
        • ResultSet ์˜ค๋ธŒ์ ํŠธ๊ฐ€ ํฌํ•จ ํ•  ์ˆ˜ ์žˆ๋Š” ์ตœ๋Œ€ ํ–‰์˜ ์ˆ˜.
      @Bean
      public ItemReader<Member> itemReader() {
          return new JdbcCursorItemReaderBuilder<Member>()
              .name("jdbcCursorItemReader")
              .fetchSize(chunkSize)
              .sql("select id, name from member where name like ? order by id")
              .queryArguments("user%")
              .beanRowMapper(Member.class)
              .dataSource(dataSource)
              .build();
      }
  • ๐Ÿ‘† JpaCursorItemReader

    img_27.png

    SpringBatch 4.3 ๋ถ€ํ„ฐ ์ง€์›ํ•œ๋‹ค. EntityManagerFactory ๊ฐ์ฒด๋ฅผ ํ•„์š”๋กœํ•˜๋ฉฐ ์ฟผ๋ฆฌ๋Š” JPQL๋กœ ์ž‘์„ฑํ•œ๋‹ค.
    ItemStream์—์„œ Query๋ฅผ ํ†ตํ•ด ์ƒ์„ฑ๋œ ๊ฒฐ๊ณผ๋ฅผ ResultStream ์œผ๋กœ ๊ฐ€์ ธ์˜จ๋‹ค. ๊ทธ ํ›„ JpaCursorItemReader ์—์„œ Iterator๋กœ ResultStream์—์„œ ๊ฒฐ๊ณผ๋ฅผ ๋ฝ‘์•„๋‚ธ๋‹ค.

    • API

      JpaCursorItemReaderBuilder() ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉฐ ๊ธฐ๋ณธ์ ์ธ API๋Š” JDBC ๋ฐฉ์‹๊ณผ ๋น„์Šทํ•˜๋‹ค.

      • .queryString(String JPQL)
      • .EntityManagerFactory(EMF)
      • .parameterValue(Map<String, Object>)
        ...
      @Bean
      public ItemReader<Member> itemReader() {
          Map<String, Object> params = new HashMap<>();
          params.put("name", "user%");
      
          return new JpaCursorItemReaderBuilder<Member>()
              .name("jpaCursorItemReader")
              .entityManagerFactory(entityManagerFactory)
              .queryString("select m from Member m where name like :name")
              .parameterValues(params)
              .build();
      }
  • ๐Ÿ‘† JdbcPagingItemReader

    ํŽ˜์ด์ง• ๊ธฐ๋ฐ˜์˜ ๊ตฌํ˜„์ฒด๋กœ ์‹œ์ž‘ row์˜ ๋ฒˆํ˜ธ(offset)์™€ ๋ฐ˜ํ™˜ํ•  row์˜ ์ˆ˜(limit)๋ฅผ ์ง€์ •ํ•˜์—ฌ ์‹คํ–‰ํ•œ๋‹ค.
    ํŽ˜์ด์ง• ๋‹จ์œ„๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์กฐํšŒํ•  ๋•Œ๋งˆ๋‹ค ์ƒˆ๋กœ์šด ์ฟผ๋ฆฌ๊ฐ€ ์‹คํ–‰๋˜๊ณ , ์ปค๋„ฅ์…˜์„ ๋งบ๋Š”๋‹ค. Thread-safe ํ•˜๋‹ค.

    ์ž‘๋™ ํ”„๋กœ์„ธ์Šค๋Š” Cursor ์™€ ์œ ์‚ฌํ•˜์ง€๋งŒ ๋‹ค๋ฅธ ์ ์ด๋ผ๋ฉด ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ฌ ๋•Œ ํŽ˜์ด์ง€ ๋‹จ์œ„๋กœ ๊ฐ€์ ธ์˜ค๊ธฐ ๋•Œ๋ฌธ์— Mapper์— ์˜ํ•ด ๊ฐ์ฒด์˜ List๊ฐ€ ์ƒ์„ฑ๋œ๋‹ค.

    PagingQueryProvider

    • ์ฟผ๋ฆฌ๋ฌธ์„ ItemReader์—๊ฒŒ ์ œ๊ณตํ•˜๋Š” ํด๋ž˜์Šค, ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋งˆ๋‹ค ๋‹ค๋ฅธ ์ข…๋ฅ˜๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

    • API

      JdbcPagingItemReaderBuilder() ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

      • name(String)
      • pageSize(int)
      • dataSource(DataSource)
      • queryProvider(PagingQueryProvider)
      • rowMapper(Class)
      • parameterValues(Map<String, Object>)
    • PagingQueryProvider์˜ ๊ฐ’ ์„ค์ •

      • selectClause(String), fromClause(String), whereClause(String), groupClause(String)
        • select, from, where, group ์ ˆ.
      • sortKeys(Map<String, Order>)
        • ์ •๋ ฌ์„ ์œ„ํ•œ ํ‚ค ์„ค์ •. ํ•„์ˆ˜๋กœ ์ง€์ •ํ•ด ์ฃผ์–ด์•ผ ํ•œ๋‹ค. ...
      @Bean
      public ItemReader<Member> itemReader() throws Exception {
          Map<String, Object> params = new HashMap<>();
          params.put("name", "user%");
      
          return new JdbcPagingItemReaderBuilder<Member>()
              .name("jdbcPagingItemReader")
              .pageSize(5)
              .dataSource(dataSource)
              .rowMapper(new BeanPropertyRowMapper<>(Member.class))
              .queryProvider(createQueryProvider())
              .parameterValues(params)
              .build();
      }
      
      @Bean
      public PagingQueryProvider createQueryProvider() throws Exception {
      
          Map<String, Order> sortKeys = new HashMap<>();
          sortKeys.put("id", Order.ASCENDING);
      
          SqlPagingQueryProviderFactoryBean providerFactoryBean = new SqlPagingQueryProviderFactoryBean();
          providerFactoryBean.setDataSource(dataSource);
          providerFactoryBean.setSelectClause("id, name");
          providerFactoryBean.setFromClause("member");
          providerFactoryBean.setWhereClause("name like :name");
          providerFactoryBean.setSortKeys(sortKeys);
      
          return providerFactoryBean.getObject();
      }
  • ๐Ÿ‘† JpaPagingItemReader

    ํŽ˜์ด์ง• ๊ธฐ๋ฐ˜์˜ JPA ๊ตฌํ˜„์ฒด๋กœ JpaCursor ๋ฐฉ์‹๊ณผ API๋Š” pageSize ์„ค์ •์„ ์ œ์™ธํ•˜๊ณ ๋Š” ๋™์ผํ•˜๋‹ค.

    @Bean
    public ItemReader<Member> itemReader() {
    Map<String, Object> params = new HashMap<>();
    params.put("name", "user%");
    
    return new JpaPagingItemReaderBuilder<Member>()
        .name("jpaCursorItemReader")
        .pageSize(5)
        .entityManagerFactory(entityManagerFactory)
        .queryString("select m from Member m where name like :name")
        .parameterValues(params)
        .build();
    }

๐Ÿง ItemReaderAdapter

๋ฐฐ์น˜ Job ์•ˆ์—์„œ ์ด๋ฏธ ์žˆ๋Š” ๊ธฐ์กด์˜ DAO๋‚˜ ๋‹ค๋ฅธ ์„œ๋น„์Šค๋ฅผ ItemReader ์•ˆ์—์„œ ์‚ฌ์šฉํ•˜๊ณ ์ž ํ• ๋•Œ ์‹คํ–‰์„ ์œ„์ž„ํ•˜๋Š” ์—ญํ• ์„ ํ•œ๋‹ค.
(MemberService ์˜ joinMember() ๋ฉ”์„œ๋“œ์˜ ํ˜ธ์ถœ๊ณผ ๊ฐ™์ด), Java์˜ ๋ฆฌํ”Œ๋ ‰์…˜ ๊ธฐ์ˆ ์„ ์‚ฌ์šฉํ•œ๋‹ค.

@Bean
public ItemReader<Member> itemReader() {
    ItemReaderAdapter<Member> reader = new ItemReaderAdapter<>();
    reader.setTargetObject(MemberService());
    reader.setTargetMethod("readMember");

    return reader;
}
public class MemberService {

    private long id= 0;

    public Member readMember() {
        if(id < 10) {
            return new Member(String.valueOf(++id), "user");
        }
        return null;
    }
}

๊ธฐ์กด์˜ MemberService์˜ readMember() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ํ•œ๊ฑด์”ฉ ์ฝ์–ด์˜ฌ ์ˆ˜ ์žˆ๋‹ค.
10 ๋ฒˆ๋งŒ ์ฝ๊ธฐ ์œ„ํ•ด id < 10 ์กฐ๊ฑด์„ ์ถ”๊ฐ€ํ•˜์˜€๋‹ค. ์ฝ์„ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜จ ํ›„ ํ•ด๋‹น ๋ฆฌ์ŠคํŠธ๋ฅผ remove() ํ•˜๋ฉฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์„ ์ˆ˜ ์žˆ๋‹ค.


๐Ÿ“Œ ItemWriter

๐Ÿง FlatFileItemWriter

๊ณ ์ • ์œ„์น˜ ๋˜๋Š” ํŠน์ˆ˜ ๋ฌธ์ž์— ์˜ํ•ด ๊ตฌ๋ณ„๋œ ๋ฐ์ดํ„ฐ์˜ ํ–‰์„ ๊ธฐ๋กํ•œ๋‹ค.
์ž‘์„ฑํ•ด์•ผํ•  Resource์™€ Object๋ฅผ String์œผ๋กœ ๋ณ€ํ™˜ํ•ด์ฃผ๋Š” LineAggregator๊ฐ€ ํ•„์š”ํ•˜๋‹ค.

์†์„ฑ

  • encoding

  • append

    • ํŒŒ์ผ์ด ์ด๋ฏธ ์กด์žฌํ•˜๋Š” ๊ฒฝ์šฐ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”๊ฐ€ํ•  ๊ฒƒ์ธ์ง€ ์—ฌ๋ถ€ (๊ธฐ๋ณธ๊ฐ’ false, ๋ฎ์–ด์”Œ์›€)
  • Resource

  • LineAggregator

    • Item ์„ ๋ฐ›์•„ String ์œผ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.
      • ๊ตฌํ˜„์ฒด
      • PassThroughLineAggregator: ๋‹จ์ˆœ ๋ฌธ์ž์—ด๋กœ ๋ฐ˜ํ™˜
      • DelimitedLineAggregator: ๊ตฌ๋ถ„์ž๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ ๋ฌธ์ž์—ด์„ ์ƒ์„ฑ.
        • ๊ธฐ๋ณธ๊ฐ’์€ , ์ด๋ฉฐ, delimited().delimiter(String) ์œผ๋กœ ๊ตฌ๋ถ„์ž๋ฅผ ์ง€์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.
      • FormatterLineAggregator: ๊ณ ์ •๊ธธ์ด๋กœ ๊ตฌ๋ถ„ํ•˜์—ฌ ๋ฌธ์ž์—ด ์ƒ์„ฑ
        • formatted().format("%-3s|%-2s") ์™€ ๊ฐ™์ด ํ˜•์‹์„ ์ง€์ •ํ•œ๋‹ค.
    • ๋‚ด๋ถ€์ ์œผ๋กœ FieldExtractor ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
      • ๊ฐ์ฒด์˜ ํ•„๋“œ๋ฅผ ์ถ”์ถœํ•ด์„œ ๋ฐฐ์—ด๋กœ ๋งŒ๋“ค์–ด ๋ฐ˜ํ™˜ํ•œ๋‹ค.
      • BeanWrapperFieldExtractor: ๊ฐ์ฒด์˜ ํ•„๋“œ๋ฅผ ๋ฐฐ์—ด๋กœ ๋ฐ˜ํ™˜.
      • PassThroughFieldExtractor: ์ „๋‹ฌ๋ฐ›์€ Collection์„ ๋ฐฐ์—ด๋กœ ๋ฐ˜ํ™˜.
  • FlatFileHeaderCallback, FlatFileFooterCallback

    • ํ—ค๋”, ํ‘ธํ„ฐ๋ฅผ ํŒŒ์ผ์— ์“ฐ๊ธฐ์œ„ํ•œ ์ธํ„ฐํŽ˜์ด์Šค.

img_28.png

FieldExtractor ์—์„œ ํ•„๋“œ๋ฅผ ์ถ”์ถœํ•ด ๋ฐฐ์—ด์„ ์ƒ์„ฑํ•ด ๋„˜๊ฒจ์ฃผ๋ฉด LineAggregator ์—์„œ ๊ตฌ๋ถ„์ž๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ ๋ฌธ์ž์—ด์„ ์ƒ์„ฑํ•œ๋‹ค.

DelimitedLineAggregator

@Bean
public ItemWriter<? super Member> itemWriter() {
    return new FlatFileItemWriterBuilder<>()
        .name("flatFileWriter")
        .resource(new FileSystemResource("/Users/a1101720/IdeaProjects/TIL/Spring/SpringBatch/src/main/resources/memberOut.csv"))
        .append(true)
        .delimited()
        .delimiter("|")
        .names(new String[] {"name", "id"})
        .build();
}

FormatterLineAggregator

@Bean
public ItemWriter<? super Member> itemWriter() {
    return new FlatFileItemWriterBuilder<>()
        .name("flatFileWriter")
        .resource(new FileSystemResource("/Users/a1101720/IdeaProjects/TIL/Spring/SpringBatch/src/main/resources/memberOut.csv"))
        .append(true)
        .formatted()
        .format("%-5s|%-2s")
        .names(new String[] {"name", "id"})
        .build();
}

๐Ÿง XML-StaxEventItemWriter

์ฝ์„ ๋•Œ์™€ ๋™์ผํ•˜๊ฒŒ Resource, marshaller, rootTagName(์กฐ๊ฐ๋‹จ์œ„์˜ ๋ฃจํŠธ๊ฐ€ ๋  ์ด๋ฆ„) ์ด ํ•„์š”ํ•˜๋‹ค.

Marshaller์— ์˜ํ•ด ๊ฐ์ฒด๊ฐ€ XML ์›์†Œ๋กœ ๋ณ€ํ™˜๋œ๋‹ค.

@Bean
public ItemWriter<? super Member> itemWriter() {
    return new StaxEventItemWriterBuilder<>()
        .name("staxItemWriter")
        .resource(new FileSystemResource("/Users/a1101720/IdeaProjects/TIL/Spring/SpringBatch/src/main/resources/memberout.xml"))
        .rootTagName("member")
        .marshaller(itemMarshaller())
        .overwriteOutput(true) // ๋ฎ์–ด ์”Œ์šฐ๊ธฐ.
        .build();
}

@Bean
public Marshaller itemMarshaller() {
    Map<String, Class<?>> aliases = new HashMap<>();
    aliases.put("member", Member.class);
    aliases.put("name", String.class);
    aliases.put("id", String.class);
    
    XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
    xStreamMarshaller.setAliases(aliases);
    
    return xStreamMarshaller;
}

Marshaller ์˜ ์„ค์ •์€ ์ด์ „์— Reader์—์„œ์™€ ๋™์ผํ•˜๋‹ค.

๐Ÿง JsonFileItemWriter

jsonObjectMarshaller ์— ์˜ํ•ด ๊ฐ์ฒด๊ฐ€ Json ํ˜•์‹์œผ๋กœ ๋ณ€ํ™˜๋œ๋‹ค. jsonObjectMarshaller ๋Š” ๋‚ด๋ถ€์ ์œผ๋กœ ObjectMapper๋ฅผ ๊ฐ€์ง€๊ณ  ๋ฐ์ดํ„ฐ๋ฅผ ๋งคํ•‘ํ•œ๋‹ค.

@Bean
public ItemWriter<? super Member> itemWriter() {
    return new JsonFileItemWriterBuilder<>()
        .name("jsonFileItemWriter")
        .resource(new FileSystemResource("/Users/a1101720/IdeaProjects/TIL/Spring/SpringBatch/src/main/resources/memberout.json"))
        .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
        .append(true)
        .build();
}

๐Ÿง JdbcBatchItemWriter

JDBC์˜ Batch ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•˜์—ฌ bulk Insert, update, delete ๋ฐฉ์‹์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์„ฑ๋Šฅ์  ์ด์ ์„ ๊ฐ€์ง„๋‹ค.

API

  • .dataSource(dataSource)
  • .sql("insert into ")
  • .assertUpdates(true)
    • ํŠธ๋žœ์žญ์…˜ ์ดํ›„ ์ ์–ด๋„ ํ•˜๋‚˜์˜ ํ–‰์ด ๋ณ€๊ฒฝ๋˜์ง€ ์•Š์„ ๊ฒฝ์šฐ ์˜ˆ์™ธ ๋ฐœ์ƒ.
  • .beanMapped()
    • Pojo ๊ธฐ๋ฐ˜์œผ๋กœ Insert SQL์˜ Values ๋ฅผ ๋งคํ•‘
    • BeanPropertyItemSqlParameterSourceProvider ๊ฐ€ ์‚ฌ์šฉ๋œ๋‹ค.
  • .columnMapped()
    • Key, Value ๊ธฐ๋ฐ˜์œผ๋กœ Insert SQL์˜ values๋ฅผ ๋งคํ•‘
    • ColumnMapItemPreparedStatementSetter ๊ฐ€ ์‚ฌ์šฉ๋œ๋‹ค.

img_29.png

์“ธ ๋Œ€์ƒ์˜ ํƒ€์ž…์ด Pojo ํƒ€์ž…์˜ ๊ฐ์ฒด๋ผ๋ฉด beanMapped()๋ฅผ, Map ๊ณผ ๊ฐ™์ด key, value์˜ ์Œ์ด๋ผ๋ฉด columnMapped()๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

@Bean
public ItemWriter<? super Member> itemWriter() {
    return new JdbcBatchItemWriterBuilder<>()
        .dataSource(dataSource)
        .sql("insert into member2(name, id) values(:name, :id)")
        .assertUpdates(true)
        .beanMapped()
        .build();
}

๐Ÿง JpaItemWriter

JPA ์—”ํ‹ฐํ‹ฐ ๊ธฐ๋ฐ˜์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•œ๋‹ค. ์—”ํ‹ฐํ‹ฐ๋ฅผ chunk ํฌ๊ธฐ๋งŒํผ insert, merge ํ•œ ๋‹ค์Œ flush ํ•œ๋‹ค.

API

  • .userPersist(false)
    • ์—”ํ‹ฐํ‹ฐ๋ฅผ persiste ํ•  ๊ฒƒ์ธ์ง€ ์—ฌ๋ถ€. false ์ด๋ฉด merge ์ฒ˜๋ฆฌ ํ•œ๋‹ค.
  • EntityManageFactory()
@Bean
public ItemWriter<? super Member2> itemWriter() {
    return new JpaItemWriterBuilder<>()
        .usePersist(true) // default
        .entityManagerFactory(entityManagerFactory)
        .build();
}

๐Ÿง ItemWriterAdapter

๋ฐฐ์น˜ Job ์•ˆ์—์„œ ์ด๋ฏธ ์žˆ๋Š” ์„œ๋น„์Šค๋ฅผ ItemWriter ์•ˆ์„ธ์„œ ์‚ฌ์šฉํ•  ๋•Œ ์ด๋ฅผ ์œ„์ž„ํ•œ๋‹ค.

@Bean
public ItemWriter<? super Member> itemWriter() {
    ItemWriterAdapter<Member> writerAdapter = new ItemWriterAdapter<>();
    writerAdapter.setTargetObject(MemberService());
    writerAdapter.setTargetMethod("writeMember");

    return new CustomItemWriter();
}

ํƒ€๊ฒŸ ํด๋ž˜์Šค์™€ ๋ฉ”์„œ๋“œ๋ฅผ ์ง€์ •ํ•ด์ค€๋‹ค. ๋ฆฌํ”Œ๋ ‰์…˜์„ ์ด์šฉํ•ด ์‹คํ•ธ๋œ๋‹ค.

public class MemberService {

    public void writeMember(Member member) {
        System.out.println(member);
    }
}

๊ธฐ์กด์˜ ItemWriter ์˜ ๊ตฌํ˜„์ฒด์™€ ๋‹ฌ๋ผ ์•„์ดํ…œ์„ ํ•˜๋‚˜์”ฉ ๋„˜๊ฒจ๋ฐ›์•„ ์ฒ˜๋ฆฌํ•œ๋‹ค.

๐Ÿ“Œ ItemProcessor ๊ตฌํ˜„์ฒด

๐Ÿง CompositeItemProcessor

img_30.png

ItemProcessor ๋“ค์„ ์—ฐ๊ฒฐํ•ด์„œ ์œ„์ž„ํ•˜๋ฉด ๊ฐ ItemProcessor๋ฅผ ์‹คํ–‰์‹œํ‚จ๋‹ค.
์ด์ „ ItemProcessor ๋ฐ˜ํ™˜ ๊ฐ’์€ ๋‹ค์Œ ItemProcessor ๊ฐ’์œผ๋กœ ์—ฐ๊ฒฐ๋œ๋‹ค.

@Bean
public ItemProcessor<? super Member, Member> itemProcessor() {
    List itemProcessors = new ArrayList<>();
    itemProcessors.add(new CustomItemProcessor());
    itemProcessors.add(new CustomItemProcessor2());

    return new CompositeItemProcessorBuilder<>()
        .delegates(itemProcessors)
        .build();
}

List ๋กœ ์ „๋‹ฌํ•  ์ˆ˜๋„ ์žˆ๊ณ , ํ•˜๋‚˜์”ฉ ์ฒด์ด๋‹์œผ๋กœ ์ „๋‹ฌํ•  ์ˆ˜ ๋„ ์žˆ๋‹ค.

๐Ÿง ClassifierCompositeItemProcessor

Classifier๋กœ ๋ผ์šฐํŒ… ํŒจํ„ด์„ ๊ตฌํ˜„ํ•ด์„œ ๋ถ„๋ฅ˜ ์กฐ๊ฑด์— ๋”ฐ๋ผ ์—ฌ๋Ÿฌ๊ฐœ์˜ ItemProcessor ์ค‘ ํ•˜๋‚˜๋ฅผ ํ˜ธ์ถœํ•œ๋‹ค.

@Bean
public ItemProcessor<? super ProcessorInfo, ProcessorInfo> itemProcessor() {
    ClassifierCompositeItemProcessor processor = new ClassifierCompositeItemProcessor();
    ProcessorClassifier<ItemProcessor, ItemProcessor<?, ? extends ProcessorInfo>> processorClassifier = new ProcessorClassifier();

    Map<Integer, ItemProcessor<ProcessorInfo, ProcessorInfo>> processorMap = new HashMap<>();
    processorMap.put(1, new ClassifiableItemProcessor());
    processorMap.put(2, new ClassifiableItemProcessor2());
    processorMap.put(3, new ClassifiableItemProcessor3());

    processorClassifier.setProcessorMap(processorMap);
    processor.setClassifier(processorClassifier);

    return processor;
}

์˜ˆ์ œ์ด๊ธฐ ๋•Œ๋ฌธ์— ๊ตฌํ˜„๋œ ์ฝ”๋“œ์— ์ง‘์ค‘ํ•˜์ง€ ์•Š์•„๋„ ๋œ๋‹ค.
ClassifierCompositeItemProcessor ๋ฅผ ์ƒ์„ฑํ•˜๊ณ , ์ปค์Šคํ…€ํ•˜๊ฒŒ ์ •์˜ ํ•œ Classifier๋ฅผ ์„ค์ •ํ•ด ์ค€ ๋’ค ๋ฐ˜ํ™˜ํ•œ๋‹ค ์ •๋„๋กœ ๋ณด๋ฉด ๋˜๊ฒ ๋‹ค.
Classifier ์—์„œ๋Š” Classify() ๋ฉ”์„œ๋“œ์—์„œ ์‹คํ–‰ํ•  ํ”„๋กœ์„ธ์Šค๋ฅผ ๊ฒฐ์ •ํ•œ๋‹ค.

๐Ÿ”‘ ์ฐธ์กฐ