토리맘의 한글라이즈 프로젝트 logo 토리맘의 한글라이즈 프로젝트

스프링 배치 공식 레퍼런스를 한글로 번역한 문서입니다.

전체 목차는 여기에 있습니다.

목차


모든 배치 처리는 제일 간단하게 설명하면 다량의 데이터를 읽어서 어떤 계산이나 변환을 수행하고 그 결과를 쓰는 작업이다. 스프링 배치는 벌크 read와 write을 위한 세 가지 핵심 인터페이스를 제공한다: ItemReader, ItemProcessor, ItemWriter.


6.1. ItemReader

ItemReader는 간단한 개념이긴 하지만, 매우 다양한 입력으로부터 데이터를 읽는 수단이다. 대부분의 예제는 아래 예시를 포함한다:

다른 예시도 많은데, 이번 챕터에서는 가장 기본적인 것들에 집중하겠다. 사용 가능한 모든 ItemReader 구현체는 Appendix A에 있다.

ItemReader는 일반적인 입력 작업을 위한 포괄적 인터페이스이다. 인터페이스 정의는 다음과 같다:

public interface ItemReader<T> {

    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

read 메소드는 ItemReader의 가장 본질적인 역할을 정의한다. 이 메소드는 아이템 하나를 리턴하거나 더 이상 아이템이 없는 경우 null을 리턴한다. 아이템 하나는 파일의 한 줄을 의미하거나, 데이터베이스의 로(row) 하나가 될 수도 있고, XML 파일에선 하나의 엘리먼트일 수도 있다. 보통 아이템은 도메인 오브젝트로 매핑되는데 (Trade, Foo 등), 꼭 그래야 한다는 법은 없다.

ItemReader의 구현체는 앞에서 뒤로만 읽고 역행하지 말아야 한다 (forward only). 그러나 별도의 트랜잭션 처리가 있는 리소스에서 데이터를 읽는다면 (JMS 큐같이) read 메소드는 롤백 후 다시 호출해도 같은 아이템을 리턴해야 한다. ItemReader가 더 이상 처리할 아이템이 없어도 예외를 발생시키지 않는다는 점을 알아둘 필요가 있다. 예를 들어 결과가 0개인 쿼리로 설정된 데이터베이스 ItemReader는 read를 처음 호출할 때부터 null을 반환한다.


6.2. ItemWriter

ItemWriterItemReader와 비슷하지만 하는 일은 정 반대다. 리소스는 여전히 필요하고, 또 열리고 닫혀야 하지만, ItemWriter는 읽는 게 아니라 쓴다는 점이 다르다. 데이터베이스나 큐를 사용한다면 이 동작은 insert, update 또는 send일 것이다. 결과물의 직렬화 형식은 각 job마다 다르다.

ItemReader처럼 ItemWriter도 꽤 포괄적인 인터페이스다. 인터페이스 정의는 다음과 같다:

public interface ItemWriter<T> {

    void write(List<? extends T> items) throws Exception;

}

ItemReaderread 메소드처럼 write 메소드가 ItemReader의 가장 본질적인 역할을 정의한다. 리소스가 열려있다면 전달받은 아이템 리스트를 write한다. 일반적으로 아이템은 청크로 묶여서 결과물을 만들기 때문에, 이 인터페이스는 아이템 하나가 아니라 아이템 리스트를 받는다. 리스트를 전부 쓰고 난 다음에 필요한 flush 처리는 write 메소드가 결과를 반환하기 전 수행한다. 예를 들어 하이버네이트 DAO로 쓴다면 각 아이템마다 각각, 여러 번 write 메소드를 호출한다. 그러면 writer는 결과를 리턴하기 전 하이버네이트 세션에서 flush를 호출한다.


6.3. ItemProcessor

ItemReaderItemWriter는 각자 맡은 작업을 잘 수행하지만, write 전에 비지니스 로직을 추가하고 싶다면 어떻게 해야 하는가? 한 가지 방법은 composite 패턴을 사용하는 것이다: 다른 ItemWriter를 포함하고 있는 ItemWriter를 만들거나, 반대로 ItemReader가 다른 ItemReader를 포함하게 만들거나. 아래 코드는 이 패턴을 사용한 예제이다:

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(List<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

앞의 클래스는 다른 ItemWriter 하나를 포함하고 있는데, 비지니스 로직을 수행하고 나서 write 처리를 위임한다. 이 패턴을 ItemReader에도 사용할 수 있는데, 이 경우는 메인 ItemReader에서 읽은 데이터에 추가로 다른 참조 데이터를 읽어들일 수 있다. write 메소드 호출을 직접 제어하고 싶을 때도 유용할 것이다. 그렇지만 write 시에 넘겨받은 데이터를 실제로 쓰기 전에 ‘변환’만 하면 된다면, 굳이 write를 직접 제어할 필요 없다. item을 수정하기만 하면 된다. 이런 경우를 위해 스프링 배치는 ItemProcessor 인터페이스를 제공한다. 인터페이스 정의는 다음과 같다:

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

ItemProcessor는 간단하다. 객체 하나를 받아 다른 객체로 변환해서 반환한다. 새로 반환하는 객체는 같은 타입일 수도 있고 아닐 수도 있다. 핵심은 process 메소드 안에서 비지니스 로직을 처리할 수 있으며, 그 로직을 만드는 일은 전적으로 개발자에 달려있다는 것이다. ItemProcessor는 step에 직접 연결할 수 있다. 예를 들어 ItemReaderFoo 클래스를 반환하는데 최종 write 전에 Bar 타입으로 변경해야 한다고 가정해보자. 아래 예제는 Foo를 Bar로 바꾸는 ItemProcessor다:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar>{
    public void write(List<? extends Bar> bars) throws Exception {
        //write bars
    }
}

위 예제에는 Foo 클래스와 Bar 클래스, ItemProcessor 인터페이스를 구현한 FooProcessor 클래스가 있다. 여기선 변환 작업 자체가 간단하지만 다른 복잡한 변환도 가능하다. BarWriterBar 객체 리스트를 쓰며 다른 타입이 전달되면 예외를 발생시킨다. 유사하게 FooProcessor도 전달받은 객체가 Foo가 아니면 예외를 던진다. 아래 예제처럼 FooProcessorStep에 주입할 수 있다:

@Bean
public Job ioSampleJob() {
	return this.jobBuilderFactory.get("ioSampleJOb")
				.start(step1())
				.end()
				.build();
}

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(barWriter())
				.build();
}

6.3.1. Chaining ItemProcessors

변환 하나로도 충분한 경우도 많지만 여러 ItemProcessor 구현체를 ‘연결(chian’)하고 싶으면 어떻게 해야 할까? 이전에 언급한 composite 패턴을 사용하면 된다. 아래 예제는 앞서 나온 예제를 수정해 FooBar로 변환하고, 다시 Foobar 변환해 write한다:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar,Foobar>{
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(List<? extends Foobar> items) throws Exception {
        //write items
    }
}

아래 예제에서는 함께 ‘연결(chained)된’ FooProcessorBarProcessor가 최종 결과물로 Foobar를 만든다:

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooTransformer());
itemProcessors.add(new BarTransformer());
compositeProcessor.setDelegates(itemProcessors);

이전 예제와 같은 방법으로 Step에 composite processor를 설정한다:

@Bean
public Job ioSampleJob() {
	return this.jobBuilderFactory.get("ioSampleJob")
				.start(step1())
				.end()
				.build();
}

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(fooReader())
				.processor(compositeProcessor())
				.writer(foobarWriter())
				.build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
	List<ItemProcessor> delegates = new ArrayList<>(2);
	delegates.add(new FooProcessor());
	delegates.add(new BarProcessor());

	CompositeItemProcessor processor = new CompositeItemProcessor();

	processor.setDelegates(delegates);

	return processor;
}

6.3.2. Filtering Records

item processor는 ItemWriter로 데이터를 넘기기 전 필터링하는 데에도 많이 사용된다. 필터링은 스킵과는 다른 액션이다. 스킵은 데이터가 유효하지 않다는 거고, 필터링은 단순히 데이터를 write하지 않겠다는 뜻이다.

예를 들어 세 가지 유형의 파일을 읽어야 하는 배치 job을 떠올려 봐라: insert할 데이터, update할 데이터, delete할 데이터. 레코드 삭제를 지원하지 않는 시스템이라면, ItemWriter에 삭제 대상 데이터를 넘기지 않으면 된다. 그렇지만 이 데이터가 잘못된 데이터는 아니므로 스킵보단 필터링이 더 적절하다. 결과적으로 ItemWriter에 insert 용과 update 용 데이터만 넘긴다.

아이템을 필터링하고 싶으면 ItemProcessor에서 null을 리턴하면 된다. 결과가 null이라면 프레임워크가 ItemWriter에 전달되는 아이템 리스트에서 제외시킨다. 늘 그렇듯 ItemProcessor에서 예외가 발생하면 스킵된다.

6.3.3. Fault Tolerance

청크가 롤백되면 데이터를 읽을 때 이미 캐시해둔 아이템이 다시 처리될 수도 있다. 내결함성(fault tolerance)이 있는 step이라면 (보통 skip이나 retry가 설정된) 모든 ItemProcessor는 멱등성(idempotence)을 보장해야 한다. 보통은 ItemProcessor의 입력 데이터는 바꾸지 않고 결과로 사용할 인스턴스만 바꾸는 식으로 구현한다.


6.4. ItemStream

ItemReaders, ItemWriters 모두 맡은 역할은 잘 처리하지만, 양쪽 다 다른 인터페이스가 필요한 경우도 있다. 일반적으로 배치 job의 일환으로 reader와 writer는 리소스를 열고(open) 닫아야(close)하며 상태를 저장하기 위한 메커니즘이 필요하다. 아래 예제에 보이는 ItemStream 인터페이스는 그런 역할을 담당한다:

public interface ItemStream {

    void open(ExecutionContext executionContext) throws ItemStreamException;

    void update(ExecutionContext executionContext) throws ItemStreamException;

    void close() throws ItemStreamException;
}

각 메소드를 설명하기 전 ExecutionContext를 짚고 넘어가자. ItemReaderItemStream도 구현한다면 read 메소드 호출 전에 open 메소드를 호출해야 파일이나 커넥션이 필요한 리소스에 접근할 수 있다. ItemStream을 구현한 ItemWriter도 마찬가지로 같은 규칙이 적용된다. 2장에서 설명했듯이, ExecutionContext에 데이터가 있다면 초기 상태가 아닌(처음 실행하는 게 아닌) ItemReaderItemWriter를 실행할 때 함께 사용된다. 반대로 열려 있는 모든 리소스를 안전하게 닫으려면 close 메소드를 호출해야 한다. update 메소드는 주로 현재까지 진행된 모든 상태를 ExecutionContext에 저장할 때 사용한다. 커밋 전 데이터베이스에 현재 상태를 저장하려면 커밋 전에 호출해야 한다.

ItemStreamStep인 특이한 케이스에선(스프링 배치 코어에서) 매 StepExecution 마다 ExecutionContext을 생성해 각 실행 상태를 저장하고, 같은 JobInstance가 실행되면 이 값을 넘겨준다. Quartz에 비유하자면 JobDataMap과 유사하다.


6.5. The Delegate Pattern and Registering with the Step

CompositeItemWriter는 스프링 배치에서 흔히 쓰는 위임(delegation) 패턴 중 하나다. 위임받는 객체(delegate) 자체가 StepListener 같은 콜백 인터페이스를 구현하는 경우도 있다. 스프링 배치 코어의 Step에서 위임 패턴을 사용한다면 거의 모든 경우 수동으로 Step에 등록해야 한다. ItemStream이나 StepListener 인터페이스를 Step과 직접 연결하는 reader, writer, processor로 구현하면 자동으로 등록된다. 그러나 Step은 위임 객체(delegate)는 알 수 없으므로 아래 보이는 예제처럼 listener 또는 stream으로 (필요하다면 둘 다) 직접 연결해야 한다:

@Bean
public Job ioSampleJob() {
	return this.jobBuilderFactory.get("ioSampleJob")
				.start(step1())
				.end()
				.build();
}

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(compositeItemWriter())
				.stream(barWriter())
				.build();
}

@Bean
public CustomCompositeItemWriter compositeItemWriter() {

	CustomCompositeItemWriter writer = new CustomCompositeItemWriter();

	writer.setDelegate(barWriter());

	return writer;
}

@Bean
public BarWriter barWriter() {
	return new BarWriter();
}

6.6. Flat Files

플랫(flat) 파일은 벌크 데이터를 교환할 때 가장 흔히 사용하는 방법 중 하나다. 파일 구성법을 표준(XSD)으로 정한 XML과는 달리 플랫(flat) 파일을 읽으려면 파일의 구조를 알고 있어야만 한다. 일반적으로 플랫(flat) 파일은 두 유형 중 하나에 속한다: 구분자를 사용하거나 고정 길이를 사용하거나. 구분자를 사용하는(delimited) 파일은 쉼표같은 구분자로 필드를 구분한다. 고정된 길이를 갖는 파일은 필드 길이를 미리 설정해서 필드를 구분한다.

6.6.1. The FieldSet

스프링 배치에서 플랫(flat) 파일을 다룬다면 입력 데이터든 출력 데이터든 상관없이 FieldSet이 제일 중요한 클래스 중 하나다. 파일을 읽기 위한 추상 클래스를 지원하는 아키텍처나 라이브러리는 많지만 보통 String이나 String 객체의 배열을 리턴한다. 이건 반만 처리한 거나 마찬가지다. FieldSet은 파일 리소스로부터 필드를 바인딩하기 위해 스프링 배치가 제공하는 인터페이스다. 덕분에 데이터베이스 입력과 매우 유사하게 파일을 처리할 수 있다. FieldSet은 개념적으로 JDBC ResultSet과 유사하다. FieldSetString 배열로 토큰을 넘겨주기만 하면 된다. 아래 예제에서 보이는 것처럼, 원한다면 ResultSet처럼 필드에 이름을 설정해서 인덱스나 이름으로 필드에 접근할 수 있다.

String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);

FieldSetDate, long, BigDecimal 등 다른 값도 지원한다. FieldSet의 가장 큰 장점은 일관성이다. 여러 배치 job이 각자마다의 방법으로 파싱하는 게 아니라, 포맷 예외로 인한 에러를 처리할 때든, 간단한 데이터 변환을 할 때든 모두 같은 방법으로 파싱한다.

6.6.2. FlatFileItemReader

플랫(flat) 파일은 최대 2차원(표)으로 표현된 데이터라면 어떤 것이든 담을 수 있다. 스프링 배치 프레임워크에서는 기본적인 플랫(flat) 파일 읽기와 파싱을 지원하는 FlatFileItemReader로 플랫(flat) 파일을 읽는다. FlatFileItemReader를 사용하려면 가장 중요한 ResourceLineMapper 두 가지가 필요하다. LineMapper 인터페이스는 다음 섹션에서 더 다룰 것이다. resource 프로퍼티는 스프링 코어의 Resource를 나타낸다. 이 유형의 빈을 만드는 법이 궁금하다면 Spring Framework, Chapter 5. Resources를 확인해봐라. 따라서 이번 가이드에서는 Resource 객체를 만드는 방법은 아래 간단한 예제를 끝으로 더 자세히 다루지 않는다.

Resource resource = new FileSystemResource("resources/trades.csv");

복잡한 배치 환경에서 디렉토리 구조는 종종 EAI 인프라가 관리하며, FTP에서 배치 처리로 또는 그 반대로 파일을 이동하기 위해 외부 인터페이스 전용 드롭존(drop zone)을 설정한다. 파일 이동 유틸리티는 스프링 배치 아키텍처를 벗어나는 주제긴 하지만, step으로 사용하는 경우도 드물지 않다. 배치 아키텍처는 처리할 파일을 어떻게 이동시킬지만 알면 된다. 스프링 배치는 시작점에서 파이프에 데이터 공급(feeding)을 시작한다. 물론 Spring Integration은 더 다양한 서비스를 제공한다.

아래 테이블에 있는 FlatFileItemReader의 다른 프로퍼티로 데이터를 어떻게 해석할지를 더 상세하게 지정할 수 있다:

Table 15. FlatFileItemReader Properties

Property Type Description
comments String[] 행 전체를 주석 처리하는 라인 프리픽스.
encoding String 사용할 텍스트 인코딩. 디폴트는 Charset.defaultCharset().
lineMapper LineMapper String을 item Object로 변환한다.
linesToSkip int 파일 상단에 있는 무시할 라인 수.
recordSeparatorPolicy RecordSeparatorPolicy 라인이 끝나는 지점과, 따옴표로 묶인 문자열 안에서 라인이 끝나면 같은 라인으로 처리할지 등을 결정할 때 사용.
resource Resource 읽어야 할 리소스.
skippedLinesCallback LineCallbackHandler 건너뛸 라인의 원래 내용을 전달하는 인터페이스. linesToSkip이 2면 이 인터페이스를 두 번 호출한다.
strict boolean strict 모드에선 입력 리소스가 없으면 ExecutionContext에서 예외를 발생시킨다. 반대 경우는 로그를 남기고 넘어간다.

LineMapper

ResultSet 같은 저수준의 구조를 처리해 Object를 반환하는 RowMapper처럼 플랫(flat) 파일도 String 한 줄을 Object로 변환한다. 인터페이스 정의는 다음과 같다:

public interface LineMapper<T> {

    T mapLine(String line, int lineNumber) throws Exception;

}

기본 역할은 현재 라인과 라인 넘버를 받아 도메인 객체를 반환하는 것이다. ResultSet 안에 있는 각 로(row)가 로(row) 넘버와 함께 처리되는 것처럼 각 라인을 라인 넘버와 처리한다는 점에서 RowMapper와 비슷하다. 따라서 동일성(identity)을 비교하거나 더 많은 정보를 로깅할 수 있다. 하지만 RowMapper와는 달리 LineMapper는 전에 말한 바와 같이 반만 처리한 것과 마찬가지인 단순 문자열을 받는다. 뒷부분에서 다룰 내용이지만, 문자열은 객체로 매핑할 수 있는 FieldSet으로 토큰화해야 한다.

LineTokenizer

플랫(flat) 파일은 파일마다 형식이 다르기 때문에 문자열을 FieldSet으로 변환하는 작업을 추상화시켜야 한다. 스프링 배치가 제공하는 인터페이스는 LineTokenizer다:

public interface LineTokenizer {

    FieldSet tokenize(String line);

}

LineTokenizer는 입력받은 라인을 (이론상 문자열은 두 줄 이상을 포함할 수도 있다), FieldSet으로 변환해서 리턴한다. 이 FieldSetFieldSetMapper로 넘겨 처리할 수 있다. 스프링 배치는 아래 구현체를 포함한 LineTokenizer 구현체를 제공한다:

FieldSetMapper

FieldSetMapperFieldSet 객체를 받아 다른 객체로 매핑시키는 메소드 하나를 정의하고 있는 인터페이스다. 이 객체는 job 성격에 따라 커스텀 DTO일 수도 있고, 도메인 객체나 배열일 수도 있다. FieldSetMapperLineTokenizer와 함께 사용하면 읽어온 라인을 원하는 유형의 객체로 변환할 수 있다. 인터페이스 정의는 다음과 같다:

public interface FieldSetMapper<T> {

    T mapFieldSet(FieldSet fieldSet) throws BindException;

}

JdbcTemplate에서 RowMapper를 사용하는 것과 동일한 패턴을 사용한다.

DefaultLineMapper

플랫(flat) 파일을 읽기 위한 기본적인 인터페이스를 정의했으니, 아래 세 가지 기본적인 절차가 필요하다는 게 분명해졌다:

위에서 다룬 두 인터페이스는 두 가지 독립적인 처리를 한다: 문자열을 FieldSet으로 변환하고 FieldSet을 도메인 객체에 매핑한다. LineTokenizer의 입력이 LineMapper 입력(문자열)과 일치하고 FieldSetMapper의 결과도 LineMapper 결과와 일치하므로, LineTokenizerFieldSetMapper 둘 다 사용하는 디폴트 구현체를 제공한다. 아래 클래스 정의에 나오는 DefaultLineMapper는 사용자들이 대부분 필요로 하는 작업을 처리한다:

public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {

    private LineTokenizer tokenizer;

    private FieldSetMapper<T> fieldSetMapper;

    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }

    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
        this.fieldSetMapper = fieldSetMapper;
    }
}

위 기능은 reader 자체에 포함하지 않고 (이전 버전의 프레임워크에서 그래 왔었다) 디폴트 구현체로 제공했는데, 이를 통해 더 유연하게 파싱을 제어할 수 있다. 특히 파일의 원본 라인에 접근해야 하는 경우 더 그렇다.

Simple Delimited File Reading Example

다음은 실제 환경에서 어떻게 플랫(flat) 파일로 도메인을 처리하는지 보여주는 예시이다. 이번 배치 job은 아래 파일에서 축구 선수 정보를 읽는다.

ID,lastName,firstName,position,birthYear,debutYear
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
"AdamCh00,Adams,Charlie,wr,1979,2003"

파일 내용은 아래 Player 도메인 객체로 매핑한다:

public class Player implements Serializable {

    private String ID;
    private String lastName;
    private String firstName;
    private String position;
    private int birthYear;
    private int debutYear;

    public String toString() {
        return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
            ",First Name=" + firstName + ",Position=" + position +
            ",Birth Year=" + birthYear + ",DebutYear=" +
            debutYear;
    }

    // setters and getters...
}

FieldSetPlayer 객체에 매핑하려면 아래 보이는 것처럼 player를 리턴하는 FieldSetMapper를 정의해야 한다:

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fieldSet) {
        Player player = new Player();

        player.setID(fieldSet.readString(0));
        player.setLastName(fieldSet.readString(1));
        player.setFirstName(fieldSet.readString(2));
        player.setPosition(fieldSet.readString(3));
        player.setBirthYear(fieldSet.readInt(4));
        player.setDebutYear(fieldSet.readInt(5));

        return player;
    }
}

아래처럼 FlatFileItemReader를 설정하고 read 메소드를 호출하면 파일 내용을 읽을 수 있다:

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
//DelimitedLineTokenizer defaults to comma as its delimiter
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();

read를 호출할 때마다 파일 각 라인을 읽어 Player 객체를 반환한다. 파일을 다 읽으면 null을 리턴한다.

Mapping Fields by Name

DelimitedLineTokenizer, FixedLengthTokenizer는 다른 기능이 하나 더 있는데, ResultSet과 유사한 기능이다. 필드명을 이 두 LineTokenizer 구현체 중 하나에 주입해주면 좀 더 가독성 있게 매핑할 수 있다. 가장 먼저, 아래 예제처럼 토크나이저에 파일 내 모든 필드의 컬럼명을 알려준다.

tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"});

그러면 아래처럼 FieldSetMapper에서 이 정보를 사용할 수 있다:

public class PlayerMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fs) {

       if(fs == null){
           return null;
       }

       Player player = new Player();
       player.setID(fs.readString("ID"));
       player.setLastName(fs.readString("lastName"));
       player.setFirstName(fs.readString("firstName"));
       player.setPosition(fs.readString("position"));
       player.setDebutYear(fs.readInt("debutYear"));
       player.setBirthYear(fs.readInt("birthYear"));

       return player;
   }
}

Automapping FieldSets to Domain Objects

매번 FieldSetMapper에 매핑 규칙을 나열하는 건 JdbcTemplateRowMapper를 만드는 것만큼이나 번거로운 작업이다. 스프링 배치에선 그럴 필요가 없는데, FieldSetMapper가 자바빈 명세(JavaBean specification)를 사용해 객체의 setter와 일치하는 필드명을 자동으로 매핑해주기 때문이다. 다시 축구 예제를 가지고 BeanWrapperFieldSetMapper를 설정해보자:

@Bean
public FieldSetMapper fieldSetMapper() {
	BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();

	fieldSetMapper.setPrototypeBeanName("player");

	return fieldSetMapper;
}

@Bean
@Scope("prototype")
public Player player() {
	return new Player();
}

스프링 컨테이너가 프로퍼티명과 일치하는 setter를 찾는 방식과 동일하게, 매퍼도 FieldSet의 각 엔트리마다 Player 오브젝트의 새 인스턴스에서 (이 때문에 프로토타입 스코프가 필요하다) 일치하는 setter를 찾는다. FieldSet의 모든 필드는 자동으로 매핑되고, 그 결과로 Player 오브젝트가 리턴되며, 매핑 규칙을 나열하던 코드는 더 이상 필요 없다.

Fixed Length File Formats

지금까지는 구분자를 사용하는(delimited) 파일만 자세히 다뤘다. 하지만 구분자 파일은 파일의 절반만 대표한다. 플랫(flat) 파일을 다루는 회사라면 고정 길이 포맷도 많이 사용한다. 다음은 고정 길이 파일 예시다:

UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5

긴 필드 하나처럼 보이지만, 사실은 4가지 필드가 있다:

  1. ISIN: 주문할 item의 유니크 식별자 - 12자.
  2. Quantity: 주문 수량 - 3자.
  3. Price: 아이템 가격 - 5자.
  4. Customer: 주문 고객 ID - 9자.

FixedLengthLineTokenizer를 설정하려면 아래 예제처럼 각 길이를 범위 형식으로 알려줘야 한다:

범위를 위에서 사용한 문법으로 사용하려면 ApplicationContext 내에 RangeArrayPropertyEditor를 설정해야 한다. 단, 배치 네임스페이스를 사용했다면 이 빈은 자동으로 ApplicationContext 안에 선언된다.

@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
	FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();

	tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
	tokenizer.setColumns(new Range(1-12),
						new Range(13-15),
						new Range(16-20),
						new Range(21-29));

	return tokenizer;
}

FixedLengthLineTokenizer는 위에서 다룬 LineTokenizer 인터페이스를 그대로 사용하기 때문에 구분자(delimiter)를 사용했을 때와 같은 FieldSet을 반환한다. 따라서 결과값에 BeanWrapperFieldSetMapper를 사용하는 등의 처리를 동일하게 적용할 수 있다.

Multiple Record Types within a Single File

지금까지 다룬 파일 읽기 예제는 모두 단순화를 위해 파일의 모든 레코드 형식이 동일하다고 가정했었다. 하지만 현실은 항상 그렇지 않다. 파일 한 개 안에서 다르게 토큰화하고 다른 객체로 매핑해야 하는 경우도 흔하다. 다음은 파일 일부 예시이다:

USER;Smith;Peter;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI

이 파일에는 “USER”, “LINEA”, “LINEB”, 세 종류의 레코드가 있다. “USER” 라인은 User 객체로 매핑한다. “LINEA”가 “LINEB”보다 정보가 많긴 하지만, “LINEA”와 “LINEB”는 모두 Line 객체에 매핑한다.

ItemReader는 각 라인을 따로 처리하지만, ItemWriter가 알맞은 item을 받으려면 LineTokenizerFieldSetMapper를 다르게 지정해야 한다. 아래 보이는 것처럼 PatternMatchingCompositeLineMapper을 사용하면 LineTokenizerFieldSetMapper 인스턴스에 패턴을 각각 따로 지정할 수 있다:

@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
	PatternMatchingCompositeLineMapper lineMapper =
		new PatternMatchingCompositeLineMapper();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
	tokenizers.put("USER*", userTokenizer());
	tokenizers.put("LINEA*", lineATokenizer());
	tokenizers.put("LINEB*", lineBTokenizer());

	lineMapper.setTokenizers(tokenizers);

	Map<String, FieldSetMapper> mappers = new HashMap<>(2);
	mappers.put("USER*", userFieldSetMapper());
	mappers.put("LINE*", lineFieldSetMapper());

	lineMapper.setFieldSetMappers(mappers);

	return lineMapper;
}

이 예제에선 “LINEA”와 “LINEB”가 각자 다른 LineTokenizer 인스턴스에 매핑되지만, 같은 FieldSetMapper를 사용한다.

PatternMatchingCompositeLineMapper는 각 라인별로 그에 맞는 객체에 위임(delegate)하기 위해 PatternMatcher#match 메소드를 사용한다. PatternMatcher는 특별한 의미를 가진 와일드 카드 문자 두 개를 허용한다: 물음표(“?”)는 문자 한 개를, 별(“*“)은 0개 이상의 문자를 의미한다. 앞에선 라인 프리픽스를 지정하기 위해 모든 패턴을 별로 끝냈다는 것에 주목하라. PatternMatcher는 설정된 순서에 상관없이 항상 가장 구체적인 패턴부터 처리한다. 즉 패턴에 “LINE*“과 “LINEA*“가 둘 다 있다면, “LINEA”는 “LINEA*“에, “LINEB”는 “LINE*“에 매칭된다. 덧붙이자면, 아래 예제처럼 별 한 개(“*“)만 사용하면 다른 패턴과 매칭되지 않은 모든 라인과 매칭된다.

...
tokenizers.put("*", defaultLineTokenizer());
...

토크나이저만 설정해 쓸 수 있는 PatternMatchingCompositeLineTokenizer도 있다.

레코드 하나가 여러 줄에 걸쳐있는 플랫(flat) 파일도 흔하다. 이땐 좀 더 복잡한 전략이 필요하다. 이러한 주요 패턴의 데모는 multiLineRecords 샘플에서 확인할 수 있다.

Exception Handling in Flat Files

라인을 토큰화할 때 예외가 발생하는 일도 다반사다. 형식이 잘못된 레코드가 있는, 불완전한 플랫(flat) 파일도 많다. 대부분 잘못된 라인은 라인 원본과 라인 번호를 로깅하고 그냥 넘어가길 선택한다. 나중에 로그를 수동으로 확인하거나 다른 배치 job으로 점검하는 식이다. 이런 경우 파싱 예외를 처리할 수 있도록 스프링 배치는 exception 계층을 제공한다: FlatFileParseExceptionFlatFileFormatException. FlatFileParseExceptionFlatFileItemReader가 파일을 읽어들이는 동안 에러가 발생했을 때 던져진다. FlatFileFormatExceptionLineTokenizer 인터페이스 구현부에서 던져지는데, 토큰화 중 좀 더 구체적인 에러가 발생한 케이스다.

IncorrectTokenCountException

DelimitedLineTokenizer, FixedLengthLineTokenizer 모두 FieldSet을 만들 때 사용할 컬럼명을 지정할 수 있다. 그러나 컬럼명 갯수가 실제 토큰화한 컬럼 수와 다르다면 FieldSet을 만들 수 없으므로 아래 예제처럼 실제 토큰 수와 원래 기대한 토큰 수를 포함하고 있는 IncorrectTokenCountException이 발생한다:

tokenizer.setNames(new String[] {"A", "B", "C", "D"});

try {
    tokenizer.tokenize("a,b,c");
}
catch(IncorrectTokenCountException e){
    assertEquals(4, e.getExpectedCount());
    assertEquals(3, e.getActualCount());
}

토크나이저에 컬럼명을 4개로 설정했는데 파일에서 발견된 토큰이 3개뿐이므로 IncorrectTokenCountException을 던진다.

IncorrectLineLengthException

고정 길이를 사용하는 파일은 구분자를 사용하는 파일과는 달리, 각 컬럼의 길이가 미리 정의되어있기 때문에 요구사항이 하나 더 있다. 아래처럼 라인 전체 길이가 컬럼의 가장 큰 수(컬럼 길이의 총합)와 일치하지 않으면 예외를 던진다:

tokenizer.setColumns(new Range[] { new Range(1, 5),
                                   new Range(6, 10),
                                   new Range(11, 15) });
try {
    tokenizer.tokenize("12345");
    fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
    assertEquals(15, ex.getExpectedLength());
    assertEquals(5, ex.getActualLength());
}

토크나이저에 설정된 범위는 1-5, 6-10, 11-15이다. 따라서 라인의 총 길이는 15다. 하지만 앞의 예제에선 5글자짜리 라인을 넘겨받았으므로 IncorrectLineLengthException이 발생한다. 첫 번째 컬럼은 매핑할 수 있지만 그러지 않고 바로 예외를 던졌는데, 이렇게 하면 FieldSetMapper에서 두 번째 컬럼을 처리하다 실패했을 때보다 더 일찍 실패하고, 더 많은 정보를 담을 수 있다. 하지만 라인 길이가 항상 같지 않은 파일도 있다. 이때는 아래 예제처럼 ‘strict’ 프로퍼티로 라인 길이를 검증하지 않게 만들 수도 있다:

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));

위 예제는 tokenizer.setStrict(false)를 호출한 부분만 빼면 이전 예제와 거의 동일하다. 이 설정은 토크나이저가 라인을 처리할 때 라인 길이를 강제하지 않게 만든다. 이제 FieldSet을 성공적으로 만들고 리턴할 수 있다. 그러나 나머지 값에 대해선 빈 토큰만 가지고 있다.

6.6.3. FlatFileItemWriter

플랫(flat)을 write할 때는 read할 때와 같은 이슈가 있다. step은 트랜잭션을 지원하면서 구분자(delimit) 형식이나 고정 길이 형식으로 write할 수 있어야 한다.

LineAggregator

LineTokenizer 인터페이스가 필요했던 것처럼, item을 String으로 바꿔 파일에 기록하려면 여러 필드를 하나의 string으로 만들 방법이 필요하다. 스프링 배치에선 아래 정의에 있는 LineAggregator가 그 역할을 한다:

public interface LineAggregator<T> {

    public String aggregate(T item);

}

LineAggregator은 논리적으로 LineTokenizer와 정 반대다. LineTokenizerString을 받아 FieldSet을 리턴하는 반면, LineAggregatoritem을 받아 String을 리턴한다.

PassThroughLineAggregator

가장 흔히 사용하는 LineAggregator 인터페이스의 구현체는 PassThroughLineAggregator로, 아래 코드처럼 객체가 이미 문자열이거나 객체의 문자열이 바로 쓸 수 있는 형태라는 전제가 있다:

public class PassThroughLineAggregator<T> implements LineAggregator<T> {

    public String aggregate(T item) {
        return item.toString();
    }
}

이 구현체는 문자열을 직접 만들어야 하지만 트랜잭션이나 재시작 지원 등의 이유로 FlatFileItemWriter가 필요할 때 유용하다.

Simplified File Writing Example

LineAggregator의 인페이스와 가장 기본적인 구현체 PassThroughLineAggregator를 정의했으니 이제 write의 기본 흐름을 이해할 수 있다:

  1. write할 객체를 LineAggregator로 넘겨 String을 리턴받는다.
  2. String을 설정해둔 파일에 쓴다.

다음 코드는 FlatFileItemWriter에서 가져온 건데, 이 흐름을 코드로 나타내고 있다:

public void write(T item) throws Exception {
    write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}

간단하게는 아래처럼 설정할 수 있다:

@Bean
public FlatFileItemWriter itemWriter() {
	return  new FlatFileItemWriterBuilder<Foo>()
           			.name("itemWriter")
           			.resource(new FileSystemResource("target/test-outputs/output.txt"))
           			.lineAggregator(new PassThroughLineAggregator<>())
           			.build();
}

FieldExtractor

앞에 나온 예제도 유용하지만, FlatFileItemWriter는 대부분 도메인 객체와 사용하며, 따라서 그 객체를 문자열로 바꿔야 한다. 파일을 읽을 땐 다음의 처리가 필요했다:

  1. 파일에서 한 줄을 읽는다.
  2. 문자열을 LineTokenizer#tokenize() method에 전달해서 FieldSet를 리턴받는다.
  3. 토큰화한 FieldSetFieldSetMapper에 전달해 ItemReader#read() 메소드 결과를 받는다.

파일에 기록할 때는 유사하지만 정 반대 단계를 거친다:

  1. item을 writer에 전달한다.
  2. item의 필드를 배열로 변환한다.
  3. 배열을 합쳐 문자열로 만든다.

프레임워크에선 객체의 어떤 필드를 write해야 할지 알 수 없으므로 FieldExtractor를 구현해서 item을 배열로 바꿔야 한다. 인터페이스 정의는 다음과 같다:

public interface FieldExtractor<T> {

    Object[] extract(T item);

}

FieldExtractor 인터페이스 구현체는 전달받은 객체의 필드를 보고 배열을 만들고, 덕분에 구분자 사이나 혹은 고정 길이 라인 일부에 필드를 쓸 수 있다.

PassThroughFieldExtractor

배열이나 Collection, FieldSet 같은 컬렉션을 쓰는 경우도 자주 있다. 이런 컬렉션에서 배열을 “추출”하기는 매우 쉽다. 컬렉션을 배열로 바꾸면 그만이다. 이런 경우 PassThroughFieldExtractor를 사용한다. 주의할 점이 있는데, 전달받은 객체가 컬렉션이 아니라면 PassThroughFieldExtractor는 해당 item을 하나만 담고 있는 배열을 리턴한다.

BeanWrapperFieldExtractor

파일 read를 설명할 때 다뤘던 BeanWrapperFieldSetMapper처럼 직접 도메인 객체를 변환하기보단 도메인 객체를 객체 배열로 바꾸게끔 설정하는 게 더 좋다. 아래 있는 BeanWrapperFieldExtractor로 그렇게 할 수 있다:

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] { "first", "last", "born" });

String first = "Alan";
String last = "Turing";
int born = 1912;

Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);

assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);

이 구현체는 한 가지 프로퍼티, 즉 매핑할 필드들의 이름만 있으면 된다. BeanWrapperFieldSetMapperFieldSet의 필드를 객체의 setter와 매핑할 때 필드명이 필요한 것처럼 BeanWrapperFieldExtractor도 객체의 배열을 만들 때 getter와 매핑하기 위해 필드명이 필요하다. 이름 순서대로 배열 내 필드 순서가 결정된다는 점은 알아둘 필요가 있다.

Delimited File Writing Example

가장 흔한 플랫(flat) 파일은 모든 필드를 구분자로 나누는 파일이다. 이때는 DelimitedLineAggregator를 사용한다. 다음은 고객 계좌의 잔고를 나타내는 간단한 도메인 객체를 write하는 예제다:

public class CustomerCredit {

    private int id;
    private String name;
    private BigDecimal credit;

    //getters and setters removed for clarity
}

도메인 객체를 사용하므로 아래 예제처럼 FieldExtractor 인터페이스 구현체와 구분자가 필요하다:

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
	fieldExtractor.setNames(new String[] {"name", "credit"});
	fieldExtractor.afterPropertiesSet();

	DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
	lineAggregator.setDelimiter(",");
	lineAggregator.setFieldExtractor(fieldExtractor);

	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.lineAggregator(lineAggregator)
				.build();
}

이 예제에선 앞에서 설명한 BeanWrapperFieldExtractor를 사용해 CustomerCredit의 name, credit 필드를 오브젝트 배열로 변환하고, 그 배열을 이용해서 각 필드를 쉼표로 구분해 파일에 쓴다.

아래 예시처럼 FlatFileItemWriterBuilder.DelimitedBuilder를 사용해서 BeanWrapperFieldExtractorDelimitedLineAggregator를 자동으로 생성할 수도 있다:

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.delimited()
				.delimiter("|")
				.names(new String[] {"name", "credit"})
				.build();
}

Fixed Width File Writing Example

구분자(delimited) 파일이 유일한 플랫(flat) 파일은 아니다. 각 컬럼 길이를 정해서 필드를 구분하는 걸 선호하는 사람도 많은데, 보통 이 포맷을 ‘고정 길이(fixed width)’라고 한다. 스프링 배치는 이를 위한 FormatterLineAggregator를 제공한다. 위에서 다룬 CustomerCredit 도메인 객체를 사용하면 아래처럼 설정할 수 있다:

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
	fieldExtractor.setNames(new String[] {"name", "credit"});
	fieldExtractor.afterPropertiesSet();

	FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
	lineAggregator.setFormat("%-9s%-2.0f");
	lineAggregator.setFieldExtractor(fieldExtractor);

	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.lineAggregator(lineAggregator)
				.build();
}

위 예제는 대부분 익숙해 느껴질 것이다. 하지만 아래 보이는 format 프로퍼티는 처음 등장했다:

...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...

기본 구현은 자바 5에서 추가된 포맷터와 동일한 Formatter로 구현했다. 자바 Formatter는 C 프로그래밍 언어의 printf 기능을 기반으로 한다. 포맷터 구성 방법에 대한 자세한 내용은 대부분 Formatter Javadoc에서 확인할 수 있다.

아래 예시처럼 FlatFileItemWriterBuilder.FormattedBuilder를 사용해서 BeanWrapperFieldExtractorFormatterLineAggregator를 자동으로 생성할 수도 있다:

@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.formatted()
				.format("%-9s%-2.0f")
				.names(new String[] {"name", "credit"})
				.build();
}

Handling File Creation

FlatFileItemReader과 파일 리소스 관계는 매우 간단하다. reader가 초기화되면 파일을 열고 (존재하면), 파일이 없으면 예외를 던진다. 하지만 파일을 쓰는 경우라면 그렇게 간단하지 않다. 얼핏 생각하면 FlatFileItemWriter도 유사하게 간단한 규칙이 있을 것 같다: 파일이 이미 있다면 예외를 던지고, 없다면 생성해서 쓰는 것. 하지만 Job을 재시작하면 문제가 시작된다. 일반적인 재시작 시나리오라면 반대로 행동해야 한다: 파일이 있다면 마지막으로 썼던 위치에서부터 쓰고, 없다면 예외를 던진다. 하지만 job의 파일명의 항상 동일하다면 어떻게 될까? 이런 경우엔 재시작만 아니라면 이미 존재하는 파일을 지우고 싶을 것이다. 이런 경우를 대비해 FlatFileItemWritershouldDeleteIfExists라는 프로퍼티를 가지고 있다. 이 프로퍼티를 true로 바꾸면 writer가 열릴 때 같은 이름의 파일이 존재하면 삭제한다.


6.7. XML Item Readers and Writers

스프링 배치는 XML을 읽어 자바 객체로 매핑하고, 자바 객체를 XML로 쓸 수 있는 트랜잭션 구조를 지원한다.

XML 스트리밍 제약사항

다른 표준 XML 파싱 API는 배치 처리 요구사항을 충족하지 않으므로 StAX API로 I/O를 처리한다 (DOM 방식은 전체 XML을 한 번에 메모리에 로딩하고 SAX 방식은 콜백을 한 번밖에 사용할 수 없다).

스프링 배치에서 XML 입출력을 어떻게 처리하는지 알아둘 필요가 있다. 먼저, 파일을 읽고 쓸 때 달라지긴 하지만 스프링 배치 XML 처리에서 공통적으로 사용되는 몇 가지 개념이 있다. XML을 처리할 땐 레코드 라인을 토큰화하는 대신 (FieldSet 인스턴스) 아래 그림처럼 XML 리소스를 개별 레코드를 나타내는 ‘조각(fragment)’의 컬렉션으로 생각한다:

XML Input

위 그림에선 ‘trade’ 태그가 ‘루트 엘리먼트(root element)’로 정의돼 있다. ‘<trade>‘와 ‘</trade>’ 사이에 있는 것들은 전부 하나의 ‘조각(fragment)’을 구성한다. 스프링 배치는 Object/XML Mapping (OXM)을 사용해서 각 조각을 객체로 바인딩한다. 하지만 스프링 배치는 특정한 XML 바인딩 기술을 강요하지 않는다. 보통은 가장 많이 쓰이는 OXM 기술을 균일하게 추상화해 놓은 스프링 OXM에 위임한다. 스프링 OXM 의존성(dependency)은 선택이며 원하는 스프링 배치 인터페이스를 구현하면 된다. OXM이 지원하는 기술들의 관계는 아래 그림과 같다:

OXM Binding

OXM과 XML 조각(fragment)으로 레코드를 표현하는 법을 소개했으니 이제 reader와 writer를 자세히 살펴보겠다.

6.7.1. StaxEventItemReader

StaxEventItemReader는 XML 입력 스트림으로 레코드를 처리할 때 필요한 전형적인 설정을 지원한다. 먼저 StaxEventItemReader로 아래 XML을 처리한다고 생각해 보자:

<?xml version="1.0" encoding="UTF-8"?>
<records>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0001</isin>
        <quantity>5</quantity>
        <price>11.39</price>
        <customer>Customer1</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0002</isin>
        <quantity>2</quantity>
        <price>72.99</price>
        <customer>Customer2c</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0003</isin>
        <quantity>9</quantity>
        <price>99.99</price>
        <customer>Customer3</customer>
    </trade>
</records>

XML을 처리하려면 다음이 필요하다:

아래 예제는 trade라는 루트 엘리먼트, org/springframework/batch/item/xml/domain/trades.xml 리소스, tradeMarshaller라는 언마샬러로 StaxEventItemReader를 구성하는 법을 보여준다.

@Bean
public StaxEventItemReader itemReader() {
	return new StaxEventItemReaderBuilder<Trade>()
			.name("itemReader")
			.resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
			.addFragmentRootElements("trade")
			.unmarshaller(tradeMarshaller())
			.build();

}

이번 예제에서는 XStreamMarshaller를 사용하기로 했다. XStreamMarshaller은 맵으로 alias를 지정할 수 있는데, 이 맵의 첫 번째 키는 첫 번째 조각(fragment) 이름(즉 루트 엘리먼트), 값은 바인딩할 객체 타입이다. 따라서 FieldSet과 유사하게, 객체의 필드에 매핑할 각 엘리먼트 이름을 맵의 키/값으로 표현한다. 아래처럼 스프링 설정 유틸리티로 필요한 alias를 설정할 수 있다:

@Bean
public XStreamMarshaller tradeMarshaller() {
	Map<String, Class> aliases = new HashMap<>();
	aliases.put("trade", Trade.class);
	aliases.put("price", BigDecimal.class);
	aliases.put("isin", String.class);
	aliases.put("customer", String.class);
	aliases.put("quantity", Long.class);

	XStreamMarshaller marshaller = new XStreamMarshaller();

	marshaller.setAliases(aliases);

	return marshaller;
}

입력을 처리할 때 reader는 새 조각(fragment)를 만나기 전까지 XML 리소스를 읽는다. 기본적으로 reader는 엘리먼트 이름을 통해 새 조각(fragment)이 시작하는 지점을 알아낸다. reader는 조각(fragment)을 독립적인 XML 문서(document)로 만들어, XML을 자바 객체로 매핑해주는 deserializer (보통 스프링 OXM Unmarshaller를 감싸는 래퍼(wrapper) )에 넘긴다.

이 단계를 요약하면 다음 자바 코드와 유사한데, 아래서는 스프링 설정이 제공하는 주입(injection)을 사용한다:

StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());

boolean hasNext = true;

Trade trade = null;

while (hasNext) {
    trade = xmlStaxEventItemReader.read();
    if (trade == null) {
        hasNext = false;
    }
    else {
        System.out.println(trade);
    }
}

6.7.2. StaxEventItemWriter

출력 처리는 입력과는 대칭적이다. StaxEventItemWriterResource, 마샬러, rootTagName이 필요하다. 자바 객체는 마샬러 (일반적으로 표준 스프링 OXM Marshaller)로 전달되는데, 마샬러는 커스텀 이벤트 writer를 사용해 OXM 도구에서 각 조각(fragment)마다 발생시키는 StartDocument, EndDocument 이벤트를 필터링해서 Resource에 write한다. 다음은 StaxEventItemWriter를 사용하는 예제다:

@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
	return new StaxEventItemWriterBuilder<Trade>()
			.name("tradesWriter")
			.marshaller(tradeMarshaller())
			.resource(outputResource)
			.rootTagName("trade")
			.overwriteOutput(true)
			.build();

}

위에선 세 가지 프로퍼티와, 이 챕터 앞부분에서 언급했던 이미 존재하는 파일을 덮어쓸지를 결정하는 선택적인 속성 overwriteOutput=true를 설정했다. 아래 예제에서 writer가 사용하는 마샬러는 앞에서 read 때 사용한 마샬러와 동일하다:

@Bean
public XStreamMarshaller customerCreditMarshaller() {
	XStreamMarshaller marshaller = new XStreamMarshaller();

	Map<String, Class> aliases = new HashMap<>();
	aliases.put("trade", Trade.class);
	aliases.put("price", BigDecimal.class);
	aliases.put("isin", String.class);
	aliases.put("customer", String.class);
	aliases.put("quantity", Long.class);

	marshaller.setAliases(aliases);

	return marshaller;
}

아래 자바 코드는 필요한 프로퍼티를 프로그래밍 방식으로 설정하고 있는데, 앞에서 설명한 내용을 모두 함축하고 있다:

FileSystemResource resource = new FileSystemResource("data/outputFile.xml")

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

StaxEventItemWriter staxItemWriter =
	new StaxEventItemWriterBuilder<Trade>()
				.name("tradesWriter")
				.marshaller(marshaller)
				.resource(resource)
				.rootTagName("trade")
				.overwriteOutput(true)
				.build();

staxItemWriter.afterPropertiesSet();

ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);

6.8. JSON Item Readers And Writers

스프링 배치를 사용하면 아래 같은 JSON 리소스도 읽고 쓸 수 있다:

[
  {
    "isin": "123",
    "quantity": 1,
    "price": 1.2,
    "customer": "foo"
  },
  {
    "isin": "456",
    "quantity": 2,
    "price": 1.4,
    "customer": "bar"
  }
]

JSON 리소스는 각 item을 나타내는 JSON 객체의 배열이다. 스프링 배치는 특정 JSON 라이브러리에 얽매이지 않는다.

6.8.1. JsonItemReader

JsonItemReader는 JSON 파싱과 바인딩을
org.springframework.batch.item.json.JsonObjectReader 인터페이스 구현체에 위임한다. 이 인터페이스는 JSON 오브젝트를 청크로 읽을 수 있는 스트리밍 API로 구현한다. 현재는 두 가지 구현체가 제공된다:

JSON으로 write하려면 다음이 필요하다:

다음은 JsonFileItemWriter를 정의하는 방법을 보여준다:

@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
   return new JsonFileItemWriterBuilder<Trade>()
                 .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonFileItemWriter")
                 .build();
}

6.8.2. JsonFileItemWriter

JsonFileItemWriter는 마샬링을
org.springframework.batch.item.json.JsonObjectMarshaller 인터페이스에 위임한다. 이 인터페이스 역할은 객체를 받아 JSON String으로 마샬링하는 것이다. 현재는 두 가지 구현체가 제공된다:

JSON을 처리하려면 다음이 필요하다:

아래 예제는 앞에 나온 JSON 리소스 org/springframework/batch/item/json/trades.json와 Jackson 기반 JsonObjectReader를 사용하는 JsonItemReader를 정의한다:

@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
   return new JsonFileItemWriterBuilder<Trade>()
                 .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonFileItemWriter")
                 .build();
}

6.9. Multi-File Input

Step 하나에서 여러 파일을 쓰는 경우도 흔하다. 모든 파일이 같은 포맷을 사용한다면 MultiResourceItemReader로 XML이나 플랫(flat) 파일을 처리할 수 있다. 한 디렉토리 안 아래 파일이 있다고 생각해보자:

file-1.txt  file-2.txt  ignored.txt

file-1.txtfile-2.txt는 같은 형식을 사용하며 함께 처리해야 비지니스 요구사항을 만족시킬 수 있다. 아래 보이는 예제처럼 와일드카드를 사용하면 MultiResourceItemReader로 두 파일을 함께 읽을 수 있다:

@Bean
public MultiResourceItemReader multiResourceReader() {
	return new MultiResourceItemReaderBuilder<Foo>()
					.delegate(flatFileItemReader())
					.resources(resources())
					.build();
}

참조된 위임 객체는 간단한 FlatFileItemReader다. 위 설정대로면 롤백과 재시작을 고려해서 두 파일을 읽을 수 있다. ItemReader로 입력을 추가한다면 (여기서는 파일) 재시작 할 때 문제가 발생할 수 있다는 점을 알아둬야 한다. 배치 job은 각자의 디렉토리에만 가지고 실행하는 게 좋다.

입력 리소스는 MultiResourceItemReader#setComparator(Comparator)로 정렬돼서 job이 재시작돼도 같은 순서로 실행된다.


6.10. Database

대부분의 엔터프라이즈 어플리케이션 스타일과 마찬가지로 배치에서도 데이터베이스가 중앙 스토리지를 담당한다. 하지만 배치는 시스템이 처리해야 하는 데이터 셋 사이즈가 다르다는 점에서 다른 어플리케이션과 구분된다. 백만 개의 로(row)를 리턴하는 SQL을 사용하면 결과셋이 모든 로(row)를 다 읽을 때까지 메모리에 유지된다. 스프링 배치는 이를 해결할 두 가지 솔루션을 제공한다:

6.10.1. Cursor-based ItemReader Implementations

데이터베이스 커서는 관계형 데이터를 ‘스트리밍’ 해주는 데이터베이스의 솔루션이기 때문에, 배치에서도 가장 일반적으로 사용하는 접근법이다. 자바 ResultSet 클래스는 본질적으로 커서를 조작하기 위한 객체다. ResultSet은 커서를 현재 로(row)에 유지한다. ResultSetnext를 호출하면 이 커서가 다음 로(row)를 가리킨다. 스프링 배치의 커서 기반 ItemReader 구현체는 초기화할 때 커서를 열고 read를 호출할 때마다 커서를 한 행씩 이동시켜서, 나중에 처리할 수 있는 매핑된 객체를 반환한다. 그다음 모든 리소스를 반환할 수 있게 close 메소드를 호출한다. 스프링 코어 JdbcTemplate은 콜백 패턴을 사용해서 ResultSet의 모든 로(row)를 매핑하고 제어가 호출부로 넘어가기 전 close시킨다. 하지만 배치에서는 step이 종료될 때까지 기다려야 한다. 아래 이미지는 커서 기반 ItemReader의 동작 원리를 표현하는 일반적인 다이어그램이다. 예시에선 SQL을 사용하지만 (SQL이 가장 잘 알려졌으므로), 구현은 어떤 기술로 해도 상관없다.

Cursor Example

이 예제는 기본적인 패턴을 보여준다. ID, NAME, BAR 필드를 가지고 있는 ‘FOO’ 테이블에서 ID가 1보다 크고 7보다 작은 모든 로(row)를 찾는다. 그러면 커서의 시작점(첫 번째 행)은 ID 2를 가리킨다. 이 로(row)는 Foo 객체에 매핑된다. read()를 다시 호출하면 커서는 ID가 3인 다른 로(row)로 이동한다. read를 호출할 때마다 바로 결과를 쓰기 때문에 가비지 컬렉터에 수집될 수 있다 (다른 인스턴스가 참조를 유지하지 않는다고 가정하면).

JdbcCursorItemReader

JdbcCursorItemReader는 커서 기반 테크닉을 구현한 JDBC 구현체다. ResultSet과 함께 동작하며, DataSource에서 커넥션을 얻어와 SQL을 실행한다. 다음은 예시로 사용할 데이터베이스 스키마다:

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

대부분 각 로(row)를 도메인 객체로 사용하기 때문에 아래 예제에서는 RowMapper 인터페이스 구현체로 CustomerCredit 객체에 매핑했다.

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

JdbcCursorItemReaderJdbcTemplate과 주요 인터페이스를 공유하므로, 같은 데이터를 JdbcTemplate으로 읽는 예제를 ItemReader를 사용했을 때와 비교해보는 것도 좋다. 비교를 위해 CUSTOMER 데이터베이스에 1000개의 로(row)가 있다고 가정한다. 가장 먼저 JdbcTemplate을 사용한 예제다:

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

위 코드를 실행하고 나면 customerCredits리스트에는 1000개의 CustomerCredit 객체가 있을 것이다. query 메소드에서 DataSource로부터 커넥션을 얻어와 주어진 SQL을 실행하며, ResultSet안에 있는 각 로(row)마다 mapRow 메소드를 호출한다. 아래 코드를 보면 JdbcCursorItemReader가 담당하는 일을 알 수 있다:

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

앞의 코드를 실행하고 나면 counter 값은 1000이 된다. 만약 위 코드에서 리턴된 customerCredit을 리스트에 넣었다면 JdbcTemplate 예제 결과와 완전히 같았을 것이다. 하지만 중요한 건 ItemReader는 아이템을 ‘스트림 처리(streamed)’ 해준다는 점이다. read 메소드를 한 번 호출한 다음 ItemWriter로 아이템을 쓸 수 있고, 다음 아이템을 다시 read할 수 있다. 이를 통해 아이템을 주기적으로 커밋하면서 ‘청크’단위로 읽고 쓸 수 있으며, 이는 고성능 배치 처리의 핵심이다. 게다가 아래에서 알 수 있듯이 스프링 Step에 주입하기도 매우 쉽다:

@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
	return new JdbcCursorItemReaderBuilder<CustomerCredit>()
			.dataSource(this.dataSource)
			.name("creditReader")
			.sql("select ID, NAME, CREDIT from CUSTOMER")
			.rowMapper(new CustomerCreditRowMapper())
			.build();

}

Additional Properties

자바에서 커서를 열 때는 매우 다양한 옵션이 있기 때문에 JdbcCursorItemReader에 설정할 수 있는 프로퍼티도 다양하다. 프로퍼티는 아래 테이블로 정리해놨다:

Table 16. JdbcCursorItemReader Properties

ignoreWarnings SQLWarnings를 로깅하고 넘어갈지 예외를 발생시킬지 결정한다. 디폴트는 true다 (warning을 로깅한다는 뜻).
fetchSize ItemReader에서 사용하는 ResultSet객체에 더 많은 로(row)가 필요한 경우 JDBC 드라이버에 데이터베이스에서 fetch해야 하는 로(row) 수에 대한 힌트를 제공한다. 디폴트는 힌트를 제공하지 않는다.
maxRows 한 번에 ResultSet으로 가져올 수 있는 로(row) 수를 제한한다.
queryTimeout 드라이버가 Statement 객체 실행을 얼마 동안 기다릴지 초 단위로 설정한다. 이 제한을 넘어가면 DataAccessException이 발생한다 (자세한 내용은 드라이버 벤더 문서를 참조하라).
verifyCursorPosition ItemReader는 동일한 ResultSetRowMapper에 전달하므로, 사용자가 ResultSet.next()를 직접 호출하면 reader 내부 count에 이슈가 생길 수 있다. 이 값을 true로 지정하면 RowMapper를 호출한 후 커서 위치가 이전과 달라졌을 때 예외를 발생시킨다.
saveState ItemStream#update(ExecutionContext) 메소드로 ExecutionContext에 reader의 상태를 저장할지 결정한다. 디폴트는 true다.
driverSupportsAbsolute JDBC 드라이버가 ResultSet 커서 강제 이동을 지원하는지를 나타낸다. ResultSet.absolute()를 지원하는 JDBC 드라이버를 사용한다면 성능을 위해 true로 설정하는 게 좋다. 특히 대규모 데이터셋을 다루는 step이 중간에 실패한다면 더 그렇다. 디폴트는 false다.
setUseSharedExtendedConnection 커서에 사용된 커넥션을 다른 프로세싱에서도 사용하고 트랜잭션을 공유할지를 나타낸다. 이 값이 false면 커서는 소유한 커넥션에서만 열리며, step 내 다른 곳에서 트랜잭션이 시작돼도 관여하지 않는다. true로 설정하면 커넥션이 매 커밋마다 닫혀 반환되지 않도록 반드시 데이터소스를 ExtendedConnectionDataSourceProxy로 감싸야 한다. true로 설정했을 땐 커서를 열 때 사용하는 statement를 ‘READ_ONLY’와 ‘HOLD_CURSORS_OVER_COMMIT’ 상태로 생성한다. 이를 통해 step에서 트랜잭션이 시작되고 커밋되는 동안 커서를 열린 채로 유지할 수 있다. 이 기능을 사용하려면 이를 지원하는 데이터베이스와 JDBC 3.0 이상을 지원하는 JDBC 드라이버가 필요하다. 디폴트는 false다.

HibernateCursorItemReader

일반 스프링을 사용할 때 ORM 솔루션을 사용할지 말지, 즉 JdbcTemplate, HibernateTemplate 중 어떤 걸 쓸지 고민하는 것처럼 스프링 배치에도 똑같은 옵션이 있다. HibernateCursorItemReader는 커서 테크닉을 사용하는 하이버네이트(Hibernate) 구현체다. 하이버네이트를 스프링에서 사용하는 것은 상당히 논란거리였다. 하이버네이트는 온라인 어플리케이션을 위해 개발됐기 때문이다. 하지만 배치에서 사용할 수 없다는 뜻은 아니다. 이 문제를 해결하는 가장 쉬운 방법은 표준 세션 대신 StatelessSession을 사용하는 것이다. 이걸 사용하면 배치에서 문제 될 수 있는 하이버네이트의 캐싱과 엔티티 변경 체크(dirty checking)를 모두 제거해 준다. 상태가 없는(stateless) 세션과 일반적인 하이버네이트 세션의 차이가 궁금하다면 하이버네이트 릴리즈 문서를 참고해라. HibernateCursorItemReader에 HQL 문을 선언하면 SessionFactory로 전달하고, JdbcCursorItemReader와 같은 방식으로 read를 호출할 때마다 아이템 하나를 돌려준다. 아래 예제는 JDBC reader에서 다룬 ‘고객 잔고’ 예제를 그대로 사용한다:

HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
    customerCredit = itemReader.read();
    counter++;
}
itemReader.close();

Customer 테이블을 위한 적절한 하이버네이트 매핑 파일이 있다면, 여기서 설정한 ItemReaderJdbcCursorItemReader에서 설명한 방법과 정확하게 일치하는 방법으로 CustomerCredit를 리턴한다. ‘useStatelessSession’ 프로퍼티는 디폴트값이 true지만 옵션을 끄고 킬 수 있다는 걸 보여주기 위해 추가했다. 기본 커서의 페치 사이즈는 setFetchSize로 설정할 수 있다는 점도 알아두자. 아래 보이는 것처럼 JdbcCursorItemReader 만큼이나 설정하기 쉽다:

@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
	return new HibernateCursorItemReaderBuilder<CustomerCredit>()
			.name("creditReader")
			.sessionFactory(sessionFactory)
			.queryString("from CustomerCredit")
			.build();
}

StoredProcedureItemReader

가끔은 저장 프로시저(stored procedure)를 실행해서 커서를 얻어와야 할 때도 있다. StoredProcedureItemReader는 커서를 얻기 위해 쿼리를 실행하는 게 아니라 커서를 리턴하는 저장 프로시저를 실행한다는 점만 빼면 JdbcCursorItemReader과 유사하다. 저장 프로시저는 세 가지 방식으로 커서를 리턴한다:

아래 예제에서도 위에서처럼 같은 ‘고객 잔고’ 예시를 사용한다:

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());

	return reader;
}

위 예제는 ResultSet을 리턴하는 (위의 옵션 1번) 저장 프로시저를 사용한다.

저장 프로시저가 ref-cursor를 리턴한다면 (옵션 2) ref-cursor를 리턴하는 out 파라미터 위치를 알려줘야 한다. 다음은 첫 번째 파라미터를 ref-cursor로 사용하는 예제다:

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setRefCursorPosition(1);

	return reader;
}

저장 함수에서 커서를 반환한다면 (옵션 3) “function” 프로퍼티를 true로 바꿔야 한다. 디폴트는 false다. 프로퍼티 설정법은 아래 코드에 나와 있다:

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("sp_customer_credit");
	reader.setRowMapper(new CustomerCreditRowMapper());
	reader.setFunction(true);

	return reader;
}

어떤 방식을 사용하더라도 RowMapper, DataSource와 실제 프로시저 이름을 정의해야 한다.

저장 프로시저나 펑션이 파라미터를 받는다면 역시 parameters 프로퍼티로 설정해줘야 한다. 오라클을 사용하는 아래 예제에서는 파라미터 세 가지를 선언했다. 첫 번째는 ref-cursor를 리턴하는 out 파라미터고 나머지는 INTEGER 타입을 받는 파라미터다.

@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
	List<SqlParameter> parameters = new ArrayList<>();
	parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
	parameters.add(new SqlParameter("amount", Types.INTEGER);
	parameters.add(new SqlParameter("custId", Types.INTEGER);

	StoredProcedureItemReader reader = new StoredProcedureItemReader();

	reader.setDataSource(dataSource);
	reader.setProcedureName("spring.cursor_func");
	reader.setParameters(parameters);
	reader.setRefCursorPosition(1);
	reader.setRowMapper(rowMapper());
	reader.setPreparedStatementSetter(parameterSetter());

	return reader;
}

파라미터 선언 외에도 호출에 필요한 파라미터 값을 설정하는 PreparedStatementSetter 구현체를 지정해야 한다. 이 reader는 위에서 설명한 JdbcCursorItemReader와 같은 방법으로 동작한다. Additional Properties에 있는 모든 프로퍼티는 StoredProcedureItemReader에도 동일하게 사용할 수 있다.

6.10.2. Paging ItemReader Implementations

데이터베이스 커서를 사용하는 다른 방법은 결과의 일부만 가져오는 쿼리를 여러 번 실행하는 것이다. 이 결과의 일부를 페이지(page)라고 한다. 각 쿼리는 로(row) 넘버와 페이지 안에 포함할 로(row) 갯수를 명시해야 한다.

JdbcPagingItemReader

JdbcPagingItemReaderItemReader의 페이징 처리 구현체다. JdbcPagingItemReaderPagingQueryProvider로부터 로(row)를 페이지로 구성해 돌려주는 SQL 쿼리를 제공받아야 한다. 데이터베이스마다 페이징 지원 전략이 다르기 때문에 각 데이터베이스 지원 유형마다 다른 PagingQueryProvider를 사용한다. 사용할 데이트베이스를 자동으로 감지하고 적절한 PagingQueryProvider 구현체를 결정해주는 SqlPagingQueryProviderFactoryBean이라는 것도 있다. 이는 설정을 단순화해주기 때문에 가장 권장하는 방법이다.

SqlPagingQueryProviderFactoryBean을 사용하려면 select 절과 from 절이 필요하다. where 절도 지원하지만 필수는 아니다. 이것들과 필수 값인 sortKey로 SQL 구문을 만든다.

sortKey에 유니크키 제약 조건(unique key constraint)이 있어야 실행 간에 데이터를 누락시키지 않는다.

reader가 열리고 나면 ItemReader의 기본 방식대로 read를 호출할 때마다 아이템을 하나씩 돌려준다. 페이징은 추가 로(row)가 필요할 때마다 뒷단에서 처리한다.

아래는 커서 기반 ItemReader에서 사용했던 익숙한 ‘고객 잔고’ 예시를 사용한 설정이다:

@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
	Map<String, Object> parameterValues = new HashMap<>();
	parameterValues.put("status", "NEW");

	return new JdbcPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.dataSource(dataSource)
           				.queryProvider(queryProvider)
           				.parameterValues(parameterValues)
           				.rowMapper(customerCreditMapper())
           				.pageSize(1000)
           				.build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
	SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

	provider.setSelectClause("select id, name, credit");
	provider.setFromClause("from customer");
	provider.setWhereClause("where status=:status");
	provider.setSortKey("id");

	return provider;
}

위의 ItemReaderCustomerCredit 객체를 리턴하는데 이를 위해서는 RowMapper를 반드시 지정해야 한다. ‘pageSize’ 프로퍼티는 쿼리를 실행할 때마다 데이터베이스에서 읽어올 엔티티 수를 의미한다.

‘parameterValues’ 프로퍼티를 이용하면 쿼리에서 사용할 파라미터 값을 Map으로 명시할 수 있다. where 절에 이름으로 접근하는 파라미터(named parameter)가 있다면 각 엔트리 키는 이 파라미터 이름과 일치해야 한다. 전통적인 ‘?’ 플레이스홀더를 사용하면 각 엔트리 키는 1부터 시작하는 플레이스홀더 숫자여야 한다.

JpaPagingItemReader

JpaPagingItemReader는 페이징 처리를 지원하는 또 하나의 ItemReader 구현체다. JPA에는 하이버네이트의 StatelessSession 같은 개념이 없기 때문에 JPA 명세에서 제공하는 다른 기능을 사용해야 한다. JPA는 원래 페이징을 지원하기 때문에, 배치에서 JPA를 사용할 때도 페이징이 자연스러운 선택이다. 각 페이지를 읽고 나면 엔티티는 준영속 상태(detached)가 되고 영속성 컨텍스트(persistence context)는 비워지므로, 해당 페이지를 다 처리하고 나면 가비지 컬렉션이 엔티티를 수집할 수 있다.

JpaPagingItemReader는 JPQL 문을 사용해서 EntityManagerFactory에 전달한다. ItemReader의 기본 방식대로 read를 호출할 때마다 아이템을 하나씩 돌려준다. 페이징은 추가 로(row)가 필요할 때마다 뒷단에서 처리한다. 다음은 JDBC reader에서 사용했던 ‘고객 잔고’ 예시를 사용한 설정이다:

@Bean
public JpaPagingItemReader itemReader() {
	return new JpaPagingItemReaderBuilder<CustomerCredit>()
           				.name("creditReader")
           				.entityManagerFactory(entityManagerFactory())
           				.queryString("select c from CustomerCredit c")
           				.pageSize(1000)
           				.build();
}

CustomerCredit에 적절한 JPA 어노테이션을 선언하거나 ORM 매핑 파일을 사용했다면 위 ItemReader는 위에서 설명한 JdbcPagingItemReader와 정확히 같은 방식으로 CustomerCredit 객체를 리턴한다. ‘pageSize’ 프로퍼티는 쿼리를 실행할 때마다 데이터베이스에서 읽어올 엔티티 수를 의미한다.

6.10.3. Database ItemWriters

플랫(flat) 파일과 XML 파일은 별도의 ItemWriter 인스턴스가 필요하지만 데이터베이스 세계에선 그렇지 않다. 데이터베이스가 트랜잭션이 필요한 모든 기능을 제공하기 때문이다. 파일에 쓸 때는 트랜잭션 처리를 위해 쓴 아이템을 추적하고, 적절하게 flush 또는 clean해줘야 하므로 별도의 ItemWriter구현체가 필요했다. 데이터베이스의 write는 이미 트랜잭션을 포함하고 있기 때문에 이런 기능이 필요 없다. ItemWriter 인터페이스를 구현하는 DAO를 만들거나 일반적인 문제를 처리하는 커스텀 ItemWriter를 사용하면 된다. 어느 쪽이든 처리에 문제가 없을 것이다. 하지만 데이터베이스 출력을 배치로 처리할 때는 성능과 에러 핸들링 능력에 주의해야 한다. 하이버네이트를 ItemWriter로 사용하는 경우가 제일 많지만 JDBC 배치 모드를 사용할 때와 동일한 이슈가 있다. 데이터에 문제가 없고 데이터를 주의해서 flush한다면 데이터베이스 출력 배치 처리에 내제된 결함은 없다. 하지만 아래 이미지에 나타낸 것처럼, 쓰는 동안 발생한 에러는 충돌을 일으킬 수 있다. 즉, 아이템 하나 때문에 문제가 발생했다 해도 어떤 아이템이 예외를 발생시켰는지 알 수 없다.

Error On Flush

아이템을 쓰기 전 버퍼에 넣는다면 커밋 직전 버퍼를 flush하기 전에 발생한 에러는 예외를 발생시키지 않는다. 예를 들어 아이템 20개 단위 청크를 write하고, 15번째 아이템에서 DataIntegrityViolationException이 발생했다고 가정해보자. Step에서는 실제로 write하기 전까진 에러가 발생한 것을 알 수 없으므로 20개 아이템 모두 성공적으로 write됐다고 생각한다. Session#flush()를 호출하면 버퍼가 비워지며 이때 exception이 발생한다. 하지만 이때는 Step이 할 수 있는 게 없다. 이 트랜잭션은 반드시 롤백돼야 한다. 일반적으로 예외가 발생하면 아이템을 스킵하고 (skip/retry 정책에 따라 다름) 다시 write하지 않는다. 하지만 배치 시나리오에서는 어떤 아이템이 문제인지 알 수 없다. exception이 발생하는 건 이미 전체 버퍼가 쓰여진 다음이다. 이 문제를 해결하려면 아래 그림처럼 아이템마다 flush하는 방법밖에 없다:

Error On Write

이 방법은 하이버네이트에서도 흔히 사용하는 방법이고 write()를 호출할 때마다 flush하는 식으로 ItemWriter를 구현하라고 가이드하고 있다. 이렇게 하면 스프링 배치 내부에서 에러가 발생한 ItemWriter 호출을 개별적으로 처리하기 때문에 아이템을 안정적으로 건너뛸 수 있다.


6.11. Reusing Existing Services

배치 시스템은 다른 어플리케이션과 함께 운영되는 경우가 많다. 온라인 시스템과의 조합이 가장 많긴 하지만, 각 어플리케이션이 사용하는 벌크 데이터를 이동시킴으로써 심지어 하드웨어와 운영체제를 갖춘 클라이언트(thick client) 어플리케이션과도 통합할 수 있다. 이런 이유로 배치 job 안에서 이미 있는 DAO나 다른 서비스를 사용하기도 한다. 이런 요구사항은 스프링 컨테이너가 필요한 클래스를 주입해주기 때문에 쉽게 달성할 수 있다. 하지만 다른 스프링 배치 클래스의 의존성 때문에 혹은 사용하려는 서비스 자체가 step의 메인 ItemReader인 경우, 이미 있는 서비스를 ItemReaderItemWriter으로 사용해야 할 때도 있다. 필요한 서비스를 감싸서 어댑터(adapter) 클래스를 만드는 건 사소한 일이지만 자주 사용하는 패턴이기 때문에 스프링 배치는 ItemReaderAdapterItemWriterAdapter 구현체를 제공한다. 두 클래스 모두 위임(delegate) 패턴으로 표준 스프링 메소드를 구현했으며 설정하기도 매우 쉽다. ItemReaderAdapter를 사용하는 예시는 다음과 같다:

@Bean
public ItemReaderAdapter itemReader() {
	ItemReaderAdapter reader = new ItemReaderAdapter();

	reader.setTargetObject(fooService());
	reader.setTargetMethod("generateFoo");

	return reader;
}

@Bean
public FooService fooService() {
	return new FooService();
}

여기서 중요한 점은 targetMethod의 역할이 read 메소드 역할과 같아야 한다는 것이다. 데이터가 더 이상 없다면 null을 리턴한다. 그렇지 않으면 Object를 리턴한다. 다른 값을 리턴하면 프레임워크가 처리가 끝나는 지점을 알 수 없어서 ItemWriter 구현에 따라 무한 루프에 빠지거나 의도하지 않게 실패로 끝날 수 있다. ItemWriterAdapter를 사용하는 예시는 다음과 같다:

@Bean
public ItemWriterAdapter itemWriter() {
	ItemWriterAdapter writer = new ItemWriterAdapter();

	writer.setTargetObject(fooService());
	writer.setTargetMethod("processFoo");

	return writer;
}

@Bean
public FooService fooService() {
	return new FooService();
}

6.12. Validating Input

이 첩터에서 입력을 파싱하기 위한 여러 가지 방법을 소개했다. 각 주요 구현체는 입력의 형식이 잘못됐으면 예외를 던진다. FixedLengthTokenizer는 데이터 범위가 누락되면 예외를 던진다. 유사하게 RowMapperFieldSetMapper에 존재하지 않는 인덱스에 접근하거나 형식이 다른 걸 사용하면 예외가 발생한다. 이럴 때 던져지는 모든 예외는 read 메소드가 리턴하기 전에 던진다. 하지만 리턴된 아이템이 유효한지 아닌지는 상관하지 않는다. 예를 들어 나이를 나타내는 필드가 있다면 절대 음수일 수 없다. 이 값은 존재하는 값이고 숫자이기 때문에 문제없이 파싱되며 예외를 던지지 않는다. 검증 프레임워크는 이미 넘치기 때문에 스프링 배치는 또 다른 검증 프레임워크를 제공하지 않는다. 대신에 Validator라는 간단한 인터페이스를 제공하며 어떤 프레임워크로도 구현할 수 있다. 인터페이스 정의는 다음과 같다:

public interface Validator<T> {

    void validate(T value) throws ValidationException;

}

validate의 기본 역할은 객체가 유효하지 않으면 예외를 던지고 유효하면 그대로 반환하는 것이다. 스프링 배치는 아래 빈 정의에 보이는 ValidatingItemProcessor를 제공한다.

@Bean
public ValidatingItemProcessor itemProcessor() {
	ValidatingItemProcessor processor = new ValidatingItemProcessor();

	processor.setValidator(validator());

	return processor;
}

@Bean
public SpringValidator validator() {
	SpringValidator validator = new SpringValidator();

	validator.setValidator(new TradeValidator());

	return validator;
}

Bean Validation API (JSR-303)를 선언한 아이템은 BeanValidatingItemProcessor로 검증할 수 있다. 아래 Person 객체로 예를 들면:

class Person {

    @NotEmpty
    private String name;

    public Person(String name) {
     this.name = name;
    }

    public String getName() {
     return name;
    }

    public void setName(String name) {
     this.name = name;
    }

}

어플리케이션 컨텍스트에 BeanValidatingItemProcessor 빈을 정의하고 step의 프로세서로 등록하면 된다:

@Bean
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
    BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
    beanValidatingItemProcessor.setFilter(true);

    return beanValidatingItemProcessor;
}

6.13. Preventing State Persistence

기본적으로 모든 ItemReaderItemWriter 구현체는 커밋 전에 현재 상태를 ExecutionContext에 저장한다. 하지만 이게 항상 바람직한 건 아니다. 예를 들어 많은 개발자가 처리 식별자를 사용해 데이터베이스 reader를 재사용할 수 있게 만든다. 이땐 입력 데이터가 처리됐는지를 식별하기 위한 별도 컬럼을 이용한다. 특정 레코드를 읽으면 (혹은 쓰면) 이 플래그를 false에서 true로 바꾼다. SQL 문에 where PROCESSED_IND = false 같은 구문을 추가하면 재시작해도 이미 처리된 레코드는 조회되지 않는다. 이런 경우라면 재시작할 때 현재 로(row) 넘버 같은 상태 값을 사용하지 않으므로 저장할 필요가 없다. 이런 이유로 아래 보이는 것처럼, 모든 reader와 writer는 ‘saveState’라는 프로퍼티가 있다:

@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
	return new JdbcCursorItemReaderBuilder<PlayerSummary>()
				.dataSource(dataSource)
				.rowMapper(new PlayerSummaryMapper())
				.saveState(false)
				.sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
				  + "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
				  + "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
				  + "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
				  + "from games, players where players.player_id ="
				  + "games.player_id group by games.player_id, games.year_no")
				.build();

}

위에 있는 ItemReader는 몇 번을 실행해도 ExecutionContext에 엔트리를 저장하지 않는다.


6.14. Creating Custom ItemReaders and ItemWriters

지금까지 이 챕터에서 스프링 배치의 read, write의 기본 역할과 몇 가지 주요 구현체를 다뤘다. 하지만 이 구현체는 상당히 포괄적이어서 이 구현체만으로 해결하지 못하는 케이스도 많다. 이번 섹션에선 커스텀 ItemReader, ItemWriter 구현체를 만들고 올바르게 구현하는 법을 간단한 예제와 함께 설명하겠다. reader와 writer를 재시작할 수 있게 만드는 법을 보여주기 위해 ItemReaderItemStream도 구현했다.

6.14.1. Custom ItemReader Example

목표한 대로 이번 예제에서는 주어진 리스트를 읽는 간단한 ItemReader 구현체를 만든다. 먼저 ItemReader의 가장 기본적인 역할을 담당하는 read 메소드를 구현하는 것부터 시작한다:

public class CustomItemReader<T> implements ItemReader<T>{

    List<T> items;

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
       NonTransientResourceException, ParseException {

        if (!items.isEmpty()) {
            return items.remove(0);
        }
        return null;
    }
}

위에 있는 클래스는 아이템 리스트를 받아 한 번에 한 개씩 아이템을 지우면서 리턴한다. 아래 코드에서도 알 수 있듯, ItemReader의 가장 기본적인 요구사항대로 리스트가 비면 null을 리턴한다.

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");

ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());

Making the ItemReader Restartable

마지막 목표는 ItemReader를 재시작 가능한 구조로 만드는 것이다. 지금까지 작성한 코드로는 처리 중간에 실패해서 다시 시작한다면 ItemReader는 처음부터 다시 시작해야 한다. 이게 유효한 경우도 많지만 어떨 땐 배치 job을 처리를 중단했던 곳에서부터 다시 시작하고 싶을 때도 있다. 이는 reader가 상태가 있는지(stateful) 없는지(stateless)에 따라 갈린다. 상태가 없는 reader는 재시작을 걱정할 필요 없지만, 상태가 있다면 재시작할 때 알고 있는 마지막 상태로 재구성해야 한다. 이런 이유로 가능하다면 reader를 상태를 가지 않게 커스텀하길 권장한다.

그래도 상태를 저장하고 싶다면 ItemStream 인터페이스를 사용해야 한다:

public class CustomItemReader<T> implements ItemReader<T>, ItemStream {

    List<T> items;
    int currentIndex = 0;
    private static final String CURRENT_INDEX = "current.index";

    public CustomItemReader(List<T> items) {
        this.items = items;
    }

    public T read() throws Exception, UnexpectedInputException,
        ParseException, NonTransientResourceException {

        if (currentIndex < items.size()) {
            return items.get(currentIndex++);
        }

        return null;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if(executionContext.containsKey(CURRENT_INDEX)){
            currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
        }
        else{
            currentIndex = 0;
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
    }

    public void close() throws ItemStreamException {}
}

ItemStreamupdate 메소드를 호출할 때마다 ItemReader의 현재 인덱스를 ‘current.index’ 키와 함께 ExecutionContext에 저장한다. ItemStreamopen 메소드를 호출하면 ExecutionContext에 이 키를 가진 엔트리가 있는지 검사한다. 이 키를 찾으면 현재 인덱스를 그 위치로 이동시킨다. 매우 간단한 예제지만 일반적인 요구사항을 충족한다:

ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);

List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);

((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());

ItemReader 대부분은 훨씬 정교한 재시작 로직을 가지고 있다. 예를 들어 JdbcCursorItemReader는 커서에 마지막으로 처리된 로(row)의 ID를 저장한다.

ExecutionContext에 사용할 키는 흔하지 않은 이름을 써야 한다는 점을 알아둘 필요가 있다. Step에 있는 모든 ItemStream이 같은 ExecutionContext을 사용하기 때문이다. 대부분 간단히 클래스명을 키로 사용하는 걸로 충분하다. 하지만 드물게 같은 step에서 같은 타입의 ItemStream을 두 번 사용하는 경우 (두 파일에 write한다면 가능한 일이다), 다른 유니크한 이름이 필요하다. 이런 이유로 많은 스프링 배치 ItemReader, ItemWriter 구현체가 이름을 재정의하기 위한 setName()를 제공한다.

6.14.2. Custom ItemWriter Example

커스텀 ItemWriter 구현체를 만드는 건 여러 가지 면에서 위 ItemReader와 비슷하지만, 다른 점도 있어서 별도 예제를 넣었다. 하지만 재시작 메커니즘은 완전히 동일하므로 이 예제에서 다시 다루지 않는다. ItemReader 예제처럼 최대한 단순화하기 위해 List를 사용했다:

public class CustomItemWriter<T> implements ItemWriter<T> {

    List<T> output = TransactionAwareProxyFactory.createTransactionalList();

    public void write(List<? extends T> items) throws Exception {
        output.addAll(items);
    }

    public List<T> getOutput() {
        return output;
    }
}

Making the ItemWriter Restartable

ItemWriter을 재시작 가능한 구조로 만들려면 ItemReader처럼 ItemStream를 추가로 구현해서 실행 컨텍스트를 동기화시키면 된다. 예를 들어 처리된 아이템 수를 세서 꼬리말 레코드를 추가한다고 생각해 보자. 그러려면 ItemWriter에서 ItemStream를 구현해서, 스트림이 다시 열리면 실행 컨텍스트로 카운터를 재구성하면 된다. 현실적으로 커스텀 ItemWriter는 재시작 가능한 다른 writer에게 위임하거나 (예를 들어 파일에 쓸 때), 트랜잭션 리소스에 write하기 때문에, 상태를 유지하거나 재시작을 고려할 필요가 없는 경우가 많다. writer에 상태가 있다면(stateful) 반드시 ItemStreamItemWriter를 함께 구현해야 한다. writer를 호출하는 쪽에서도 ItemStream의 존재를 알아야 하기 때문에 설정에서 stream으로 등록해줘야 한다는 것도 잊지 말자.


6.15. Item Reader and Writer Implementations

이번에는 이전 섹션에서 다루지 않은 reader와 writer를 다룬다.

6.15.1. Decorators

어떨 때는 이미 있는 ItemReader에 다른 기능을 추가하고 싶을 것이다. 스프링 배치는 ItemReader, ItemWriter 구현체에 다른 기능을 추가할 수 있는 데코레이터를 지원한다.

스프링 배치의 데코레이터는 아래와 같다:

SynchronizedItemStreamReader

thread safe하지 않은 ItemReader를 사용한다면 SynchronizedItemStreamReader 데코레이터를 사용해서 ItemReader를 thread safe하게 만들 수 있다. 스프링 배치는 SynchronizedItemStreamReader 인스턴스를 생성하는 SynchronizedItemStreamReaderBuilder를 제공한다.

SingleItemPeekableItemReader

스프링 배치는 ItemReader에 peak 메소드를 추가한 데코레이터를 제공한다. peek 메소드로 아이템 하나를 미리 볼 수 있다. peek 메소드는 계속 호출해도 같은 아이템을 리턴하며, 이 아이템은 다음 read 메소드에서 반환할 아이템이다. 스프링 배치는 SingleItemPeekableItemReader 인스턴스를 생성하는 SingleItemPeekableItemReaderBuilder를 제공한다.

SingleItemPeekableItemReader의 peak 메소드는 여러 쓰레드에게 미리보기를 제공하지 않으므로 thread safe하지 않다. peek를 호출한 여러 쓰레드 중 하나만 다음번 read에서 반환할 아이템을 받을 수 있다.

MultiResourceItemWriter

ResourceAwareItemWriterItemStream를 감싸고 있는 MultiResourceItemWriter는, 현재 리소스에 쓰여진 아이템 수가 itemCountLimitPerResource를 초과하면 새 리소스를 만들어 새 리소스에 쓴다. 스프링 배치는 MultiResourceItemWriter 인스턴스를 생성하는 MultiResourceItemWriterBuilder를 제공한다.

ClassifierCompositeItemWriter

ClassifierCompositeItemWriterClassifier로 라우터(router) 패턴을 구현해서 각 아이템마다 ItemWriter 중 하나를 호출한다. 모든 위임 객체(delegate)가 thread safe하면 이 구현체도 thread safe하다. 스프링 배치는 ClassifierCompositeItemWriter 인스턴스를 생성하는 ClassifierCompositeItemWriterBuilder를 제공한다.

ClassifierCompositeItemProcessor

ClassifierCompositeItemProcessorClassifier로 라우터 패턴을 구현해서 ItemProcessor 구현체 중 하나를 호출하는 ItemProcessor다. 스프링 배치는 ClassifierCompositeItemProcessor 인스턴스를 생성하는 ClassifierCompositeItemProcessorBuilder를 제공한다.

6.15.2. Messaging Readers And Writers

스프링 배치는 자주 사용하는 메세징 시스템을 위해 아래 reader와 writer를 제공한다:

AmqpItemReader

AmqpItemReaderAmqpTemplate으로 exchange에서 메세지를 읽거나 변환한다. 스프링 배치는 AmqpItemReader 인스턴스를 생성하는 AmqpItemReaderBuilder를 제공한다.

AmqpItemWriter

AmqpItemWriterAmqpTemplate으로 AMQP exchange에 메세지를 보내는 ItemWriter다. AmqpTemplate에 이름을 명시하지 않으면 이름이 없는(nameless) exchange에 메세지를 전송한다. 스프링 배치는 AmqpItemWriter 인스턴스를 생성하는 AmqpItemWriterBuilder를 제공한다.

JmsItemReader

JmsItemReaderJmsTemplate을 사용하는 JMS 전용 ItemReader다. 템플릿에 read() 메소드가 item을 읽을 때 사용할 디폴트 destination이 있어야 한다. 스프링 배치는 JmsItemReader 인스턴스를 생성하는 JmsItemReaderBuilder를 제공한다.

JmsItemWriter

JmsItemWriterJmsTemplate을 사용하는 JMS 전용 ItemWriter다. 템플릿에 write(List) 메소드가 item을 전송 때 사용할 디폴트 destination이 있어야 한다. 스프링 배치는 JmsItemWriter 인스턴스를 생성하는 JmsItemWriterBuilder를 제공한다.

KafkaItemReader

KafkaItemReader는 아파치 카프카 토픽을 읽는 ItemReader다. 토픽 한 개에서 여러 파티션의 메세지를 읽도록 설정할 수 있다. 재시작을 대비해 실행 컨텍스트에 메세지 오프셋을 저장한다. 스프링 배치는 KafkaItemReader 인스턴스를 생성하는 KafkaItemReaderBuilder를 제공한다.

KafkaItemWriter

KafkaItemWriterKafkaTemplate로 디폴트 토픽에 이벤트를 전송하는 아파치 카프카 전용 ItemWriter다. 스프링 배치는 KafkaItemWriter 인스턴스를 생성하는 KafkaItemWriterBuilder를 제공한다.

6.15.3. Database Readers

스프링은 아래 데이터베이스 reader를 제공한다:

Neo4jItemReader

Neo4jItemReader는 페이징 기법으로 그래프 데이터베이스 Neo4j에서 객체를 읽는 ItemReader다. 스프링 배치는 Neo4jItemReader 인스턴스를 생성하는 Neo4jItemReaderBuilder를 제공한다.

MongoItemReader

MongoItemReader는 페이징 기법으로 MongoDB에서 도큐먼트를 읽는 ItemReader다. 스프링 배치는 MongoItemReader 인스턴스를 생성하는 MongoItemReaderBuilder를 제공한다.

HibernateCursorItemReader (2)

HibernateCursorItemReader는 하이버네이트 위에서 데이터베이스 레코드를 읽는 ItemStreamReader다. HQL 쿼리를 실행하고 초기화되면, read() 메소드를 호출할 때마다 결과셋을 순회해서 현재 로(row)와 일치하는 객체를 리턴한다. 스프링 배치는 HibernateCursorItemReader 인스턴스를 생성하는 HibernateCursorItemReaderBuilder를 제공한다.

HibernatePagingItemReader

HibernatePagingItemReader는 하이버네이트 위에서 데이터베이스 레코드를 읽지만, 한 번에 아이템을 고정된 수만큼만 읽는 ItemReader다. 스프링 배치는 HibernatePagingItemReader 인스턴스를 생성하는 HibernatePagingItemReaderBuilder를 제공한다.

RepositoryItemReader

RepositoryItemReaderPagingAndSortingRepository로 레코드를 읽는 ItemReader다. 스프링 배치는 RepositoryItemReader 인스턴스를 생성하는 RepositoryItemReaderBuilder를 제공한다.

6.15.4. Database Writers

스프링은 아래 데이터베이스 writer를 제공한다:

Neo4jItemWriter

Neo4jItemWriter는 Neo4j 데이터베이스에 write하는 ItemWriter 구현체다. 스프링 배치는 Neo4jItemWriter 인스턴스를 생성하는 Neo4jItemWriterBuilder를 제공한다.

MongoItemWriter

MongoItemWriter는 스프링 데이터의 MongoOperations 구현체를 사용해 MongoDB에 write하는 ItemWriter 구현체다. 스프링 배치는 MongoItemWriter 인스턴스를 생성하는 MongoItemWriterBuilder를 제공한다.

RepositoryItemWriter

RepositoryItemWriter는 스프링 데이터의 CrudRepository를 감싸고 있는 ItemWriter다. 스프링 배치는 RepositoryItemWriter 인스턴스를 생성하는 RepositoryItemWriterBuilder를 제공한다.

HibernateItemWriter

HibernateItemWriter는 하이버네이트 세션을 사용해서 현재 하이버네이트 세션에 속하지 않은 엔티티를 저장하거나 업데이트하는 ItemWriter다. 스프링 배치는 HibernateItemWriter 인스턴스를 생성하는 HibernateItemWriterBuilder를 제공한다.

JdbcBatchItemWriter

JdbcBatchItemWriterNamedParameterJdbcTemplate의 배치 기능을 사용해 모든 아이템에 배치 명령을 실행하는 ItemWriter다. 스프링 배치는 JdbcBatchItemWriter 인스턴스를 생성하는 JdbcBatchItemWriterBuilder를 제공한다.

JpaItemWriter

JpaItemWriter는 JPA EntityManagerFactory로 영속성 컨텍스트에 속하지 않은 엔티티를 병합하는 ItemWriter다. 스프링 배치는 JpaItemWriter 인스턴스를 생성하는 JpaItemWriterBuilder를 제공한다.

GemfireItemWriter

GemfireItemWriterGemfireTemplate으로 아이템을 키/값 쌍으로 GemFire에 저장하는 ItemWriter다. 스프링 배치는 GemfireItemWriter 인스턴스를 생성하는 GemfireItemWriterBuilder를 제공한다.

6.15.5. Specialized Readers

스프링 배치는 아래 특화된 reader를 제공한다:

LdifReader

LdifReaderread 메소드를 호출할 때마다 Resource에서 LDIF (LDAP Data Interchange Format) 레코드를 읽어 파싱하고 LdapAttribute 객체로 반환한다. 스프링 배치는 LdifReader 인스턴스를 생성하는 LdifReaderBuilder를 제공한다.

MappingLdifReader

MappingLdifReaderResource에서 LDIF (LDAP Data Interchange Format) 레코드를 읽어 각 LDIF 레코드를 파싱하고 POJO (Plain Old Java Object)로 매핑한다. read를 호출할 때마다 POJO를 반환한다. 스프링 배치는 MappingLdifReader 인스턴스를 생성하는 MappingLdifReaderBuilder를 제공한다.

AvroItemReader

AvroItemReaderResource에서 직렬화된 Avro 데이터를 읽는다. read 메소드를 호출할 때마다 자바 클래스나 Avro 스키마로 명시한 타입의 인스턴스를 반환한다. 입력 데이터의 Avro 스키마를 사용하도록 설정할 수 있지만 필수는 아니다. 스프링 배치는 AvroItemReader 인스턴스를 생성하는 AvroItemReaderBuilder를 제공한다.

6.15.6. Specialized Writers

스프링 배치는 아래 특화된 writer를 제공한다:

SimpleMailMessageItemWriter

SimpleMailMessageItemWriter는 메일을 보낼 수 있는 ItemWriter다. 실제 메세지 전송은 MailSender에 위임한다. 스프링 배치는 SimpleMailMessageItemWriter 인스턴스를 생성하는 SimpleMailMessageItemWriterBuilder를 제공한다.

AvroItemWriter

AvroItemWrite는 주어진 객체 타입이나 스키마를 사용해서 자바 객체를 WriteableResource로 직렬화한다. 출력 데이터에 Avro 스키마를 포함하도록 설정할 수 있지만 필수는 아니다. 스프링 배치는 AvroItemWriter 인스턴스를 생성하는 AvroItemWriterBuilder를 제공한다.

6.15.7. Specialized Processors

스프링 배치는 아래 특화된 processor를 제공한다:

ScriptItemProcessor

ScriptItemProcessor는 현재 아이템을 스크립트로 전달해서 처리하고 스크립트의 결과를 리턴하는 ItemProcessor다. 스프링 배치는 ScriptItemProcessor 인스턴스를 생성하는 ScriptItemProcessorBuilder를 제공한다.


Next :
Scaling and Parallel Processing
스프링 배치 병렬 처리 한글 번역

전체 목차는 여기에 있습니다.

<< >>

TOP