토리맘의 한글라이즈 프로젝트 logo 토리맘의 한글라이즈 프로젝트

스프링 인티그레이션 공식 레퍼런스를 한글로 번역한 문서입니다.

전체 목차는 여기에 있습니다.


목차


10.1. Message Endpoints

이 챕터에선 제일 먼저 배경 이론 몇 가지를 다루고 Spring Integration의 다양한 메시지 처리 구성 요소들을 구동하는 내부 API를 깊게 파해쳐본다. 여기에서 다루는 내용을 알아두면 뒷단에서 어떤 일이 일어나는지 이해할 수 있을 거다. 하지만 간단한 네임스페이스로 다양한 요소들을 설정하고 바로 실행해보고 싶다면 당장은 엔드포인트 네임스페이스 지원으로 건너뛰어도 좋다.

Spring Integration을 처음 소개하면서도 언급했지만, 메시지 엔드포인트는 메시지 처리에 필요한 구성 요소들을 채널에 연결해주는 일을 맡고있다. 이어지는 챕터에선 메시지를 컨슘하는 다양한 구성 요소들을 다룬다. 그 중 일부는 응답 메시지를 전송하기도 한다. 메시지를 전송하는 것은 꽤 간단하다. 앞서 메시지 채널에서 설명한 것처럼, 메시지 채널로 메시지를 전송하면 된다. 하지만 메시지를 받는 건 조금 복잡한데, 폴링 컨슈머이벤트 기반event-driven 컨슈머라는 두 가지 유형의 컨슈머가 있는 것이 주원인이다.

이 둘 중에는 이벤트 기반 컨슈머가 훨씬 간단하다. 이벤트 기반 컨슈머는 별도 폴러 스레드를 관리하거나 스케줄링할 필요가 없으며, 사실상 콜백 메소드를 하나 가지고 있는 리스너라고 볼 수 있다. Spring Integration이 제공하는 subscribable 채널 중 하나에 연결할 때는 이벤트 기반 컨슈머를 사용하면 되서 간단하다. 하지만 버퍼링(pollable) 채널에 연결할 땐, 특정 구성 요소를 이용해 폴링 스레드를 스케줄링하고 관리해야만 한다. Spring Integration은 이 두 가지 유형의 컨슈머를 모두 지원할 수 있도록, 두 종류의 엔드포인트 구현체를 제공한다. 따라서 컨슈머 자체는 콜백 인터페이스만 구현하면 된다. 폴링이 필요한 경우엔 이 엔드포인트가 컨슈머 인스턴스의 컨테이너 역할을 담당할 거다. 이점은 메시지 기반 빈들을 호스팅하기 위해 컨테이너를 사용하는 것과 유사하지만, 이 컨슈머들은 ApplicationContext 내에서 실행되는 스프핑 관리 객체이기 때문에 스프링 자체의 MessageListener 컨테이너에 더 가깝다.

10.1.1. Message Handler

프레임워크 내에는 Spring Integration의 MessageHandler 인터페이스를 구현한 구성 요소가 많이 있다. 다른 말로 하면, 이 인터페이스는 public API가 아니며, 일반적으로 MessageHandler를 직접 구현할 일은 없다는 뜻이다. 그렇지만 메시지 컨슈머가 실제로 컨슘한 메시지를 처리할 때 사용하는 전략 인터페이스이기 때문에, 이 인터페이스를 알아두면 컨슈머의 전반적인 역할을 이해하는 데 도움이 될 거다. 이 인터페이스는 다음과 같이 정의돼있다:

public interface MessageHandler {

    void handleMessage(Message<?> message);

}

인터페이스는 단순하지만, 다음 챕터에서 다루는 대부분의 구성 요소들(라우터, 트랜스포머, 스플리터, aggregator, 서비스 activator 등)의 토대가 될 인터페이스다. 이 구성 요소들은 저마다 메시지를 다르게 처리하지만, 실제로 메시지를 수신하기 위한 요구 사항은 동일하며, 폴링과 이벤트 기반 동작을 선택하는 것 또한 동일하다. Spring Integration은 이 콜백 기반 핸들러들을 호스팅하고 메시지 채널에 연결해주는 두 가지 엔드포인트 구현체를 제공한다.

10.1.2. Event-driven Consumer

두 가지 중 더 간단한 이벤트 기반 컨슈머 엔드포인트를 먼저 다뤄보자. SubscribableChannel 인터페이스는 subscribe() 메소드를 제공하고, 이 메소드는 MessageHandler 파라미터를 받는다는 점을 기억할 거다 (SubscribableChannel 참고). 아래 코드를 보면 subscribe 메소드의 정의를 알 수 있다:

subscribableChannel.subscribe(messageHandler);

채널을 구독하는 핸들러는 구독 중인 채널을 능동적으로 폴링할 필요가 없기 때문에, 여기서는 이벤트 기반 컨슈머를 사용한다. 아래 예제에서 볼 수 있듯이, Spring Integration이 제공하는 구현체는 SubscribableChannelMessageHandler를 받는다:

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);

10.1.3. Polling Consumer

Spring Integration은 PollingConsumer도 제공하는데, 아래 예제에서 알 수 있듯이 PollableChannel을 구현한 채널이어야 한다는 점만 빼면 같은 방법으로 인스턴스화할 수 있다:

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);

폴링 컨슈머에 대한 자세한 내용은 폴러채널 어댑터를 참고해라.

폴링 컨슈머엔 다양한 옵션을 설정할 수 있다. 필수 프로퍼티 중에서 예를 들면 트리거가 있다. 다음은 트리거를 설정하는 방법을 보여주는 예시다:

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(30, TimeUnit.SECONDS));

PeriodicTrigger를 정의할 땐 보통 간단히 인터벌을 지정하지만 (밀리세컨드 단위), initialDelay와 boolean 프로퍼티 fixedRate도 지원한다 (디폴트는 false다 — 즉, 딜레이를 고정하지 않는다). 아래 예제에선 두 가지 프로퍼티를 모두 세팅하고 있다:

PeriodicTrigger trigger = new PeriodicTrigger(1000);
trigger.setInitialDelay(5000);
trigger.setFixedRate(true);

위 예제에서 세 가지 설정으로 만들어지는 트리거는 5초를 대기했다가 1초 간격으로 태스크를 트리거한다.

CronTrigger는 유효한 cron 표현식이 있어야 한다. 자세한 내용은 Javadoc을 확인해봐라. 다음은 CronTrigger를 새로 세팅하는 예시다:

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

위 예시에서 정의한 트리거는 월요일부터 금요일까지 10초 간격으로 태스크를 트리거한다.

트리거 외에도, 폴링 관련 설정 프로퍼티로는 maxMessagesPerPoll, receiveTimeout을 지정할 수 있다. 다음은 이 두 가지 프로퍼티를 설정하는 예시다:

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);

maxMessagesPerPoll 프로퍼티는 한 번의 폴링 사이클에서 최대로 수신할 메시지 수를 지정한다. 즉, 폴러는 null을 반환받거나 최대값에 도달할 때까지 대기 없이 receive()를 계속해서 호출한다. 예를 들어, 폴러가 10초 간격 트리거를 가지고 있고 maxMessagesPerPoll25로 설정돼있다고 가정하면, 큐 안에 100개의 메시지가 들어있는 채널을 폴링하는 경우, 이 100개의 메시지는 40초 안에 전부 조회할 수 있다. 25개를 수신한 다음 10초를 기다렸다가 그 다음 25개를 수신하는 식이다. maxMessagesPerPoll을 음수로 설정한다면 null을 반환받을 때까지 하나의 폴링 사이클 안에서 MessageSource.receive()를 계속 호출한다. 5.5 버전부턴 0이 특별한 의미를 지니는데, MessageSource.receive() 호출을 전부 건너뛴다. 즉, 나중에 컨트롤 버스 등을 통해 maxMessagesPerPoll이 0이 아닌 값으로 바뀌기 전까진 이 폴링 엔드포인트를 일시 중단하는 것으로 보면 된다.

receiveTimeout 프로퍼티는 receive 연산을 실행할 때 사용 가능한 메시지가 없다면 폴러가 대기해야 하는 시간을 지정한다. 예를 들어서, 겉보기엔 비슷해 보여도 실제로는 상당히 다른 두 가지 세팅을 비교해보자: 첫 번째는 5초의 인터벌을 지닌 트리거를 가지고 있으며, receive 타임아웃은 50ms다. 또 다른 설정은 50ms 인터벌의 트리거를 가지고 있으며, receive 타임아웃이 5초다. 첫 번째 폴러는 메시지가 채널에 도착한 시간보다 최대 4950ms까지 늦게 메시지를 받을 수 있다 (폴링 사이클 하나가 끝난 직후에 메시지가 도착한다면). 반면 두 번째 설정에서 메시지를 수신하는 시간은 50ms 이상으로 뒤쳐지지 않는다. 차이점이 있다면 두 번째 옵션에선 스레드가 대기하고 있어야 한다는 점이다. 하지만 결과적으로 보면 메시지가 도착할 시 훨씬 더 빠르게 응답할 수 있다. “long polling”으로 알려져있는 이 기술은 폴링해오는 소스에서 이벤트 기반 동작을 시뮬레이션할 때 활용할 수 있다.

폴링 컨슈머는 다음과 같이 스프링 TaskExecutor에 동작을 위임할 수도 있다:

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);

더 나아가면 PollingConsumeradviceChain이란 프로퍼티를 가지고 있다. 이 프로퍼티를 사용하면 트랜잭션을 포함한 그밖의 횡단 관심사cross cutting concern를 위한 AOP 어드바이스 List를 지정할 수 있다. 이 어드바이스들은 doPoll() 메소드 전후에 적용된다. 좀더 상세한 내용은 엔드포인트 네임스페이스 지원 밑에 있는 AOP 어드바이스 체인과 트랜잭션 지원에 관한 섹션을 참고해라.

앞에 있는 예제들은 의존성 lookup을 사용한다. 하지만 이런 컨슈머는 대부분 스프링 빈으로 정의한다는 점을 기억해두자. 실제로 Spring Integration은 채널 유형에 따라 적당한 컨슈머 타입을 생성해주는 ConsumerEndpointFactoryBean이란 FactoryBean도 제공하고 있다. 뿐만 아니라, Spring Integration은 이런 세부 사항은 감출 수 있도록 XML 네임스페이스를 전폭적으로 지원하고 있다. 이 가이드 문서에선 각 컴포넌트 타입을 소개할 때 네임스페이스 기반 설정을 함께 다루고 있다.

많은 MessageHandler 구현체들이 응답 메시지를 생성할 수 있다. 앞서 언급했듯이 메시지를 전송하는 것은 메시지를 받는 것에 비하면 꽤나 간단하다. 그렇다고 하더라도, 응답 메시지를 언제 얼마나 전송할지는 핸들러 타입에 따라 달라진다. 예를 들어서 aggregator는 많은 메시지가 도착할 때까지 기다리며, 메시지 하나로 여러 가지 응답을 생성할 수 있는 splitter의 다운스트림 컨슈머로 구성되는 경우가 많다. 네임스페이스 설정을 사용한다면 이런 세부 사항을 정확히 알지 못해도 괜찮다. 하지만 일부 구성 요소들은 공통 기본 클래스 AbstractReplyProducingMessageHandler를 통해 setOutputChannel(..) 메소드를 제공한다는 사실을 알아두면 좋다.

10.1.4. Endpoint Namespace Support

라우터, 트랜스포머, 서비스 activator 등의 엔드포인트 요소를 설정하는 상황별 예제는 이 레퍼런스 매뉴얼 곳곳에서 찾을 수 있다. input-channel 속성은 대부분이 지원하며, output-channel 속성을 지원하는 엔드포인트도 다양하다. 이 엔드포인트 요소를 파싱하고 나면 참조하는 input-channel 유형에 따라, PollableChannel이라면 PollingConsumer를, SubscribableChannel이라면 EventDrivenConsumer 인스턴스를 생성한다. pollable 채널을 참조할 땐 엔드포인트 요소의 하위 요소 poller와 그 속성을 기반으로 폴링 동작이 정의된다.

다음은 poller에 설정할 수 있는 모든 옵션을 나타낸 예시다:

<int:poller cron=""                                  <!-- (1) -->
            default="false"                          <!-- (2) -->
            error-channel=""                         <!-- (3) -->
            fixed-delay=""                           <!-- (4) -->
            fixed-rate=""                            <!-- (5) -->
            id=""                                    <!-- (6) -->
            max-messages-per-poll=""                 <!-- (7) -->
            receive-timeout=""                       <!-- (8) -->
            ref=""                                   <!-- (9) -->
            task-executor=""                         <!-- (10) -->
            time-unit="MILLISECONDS"                 <!-- (11) -->
            trigger="">                              <!-- (12) -->
            <int:advice-chain />                     <!-- (13) -->
            <int:transactional />                    <!-- (14) -->
</int:poller>

(1) 폴러는 Cron 표현식을 사용해 설정할 수 있다.
내부 구현체는 org.springframework.scheduling.support.CronTrigger를 사용한다.
이 속성을 설정한다면 fixed-delay, trigger, fixed-rate, ref 속성은 지정해선 안 된다.

(2) 이 속성을 true로 설정하면 단 하나의 글로벌 디폴트 폴러만 정의할 수 있다.
애플리케이션 컨텍스트에 디폴트 폴러를 둘 이상 정의하면 예외가 발생한다.
PollableChannel에 연결된 엔드포인트(PollingConsumer)나 SourcePollingChannelAdapter 중 명시적으로 폴러를 설정하지 않은 엔드포인트는 이 글로벌 디폴트 폴러를 사용한다.
디폴트는 false다.
생략할 수 있다.

(3) 이 폴러를 실행하다 에러가 발생하면, 에러 메시지를 전송할 채널.
예외가 발생한 것을 숨기려면 nullChannel을 참조하면 된다.
생략할 수 있다.

(4) fixed delay 트리거는 내부적으로 PeriodicTrigger를 사용한다.
time-unit 속성을 사용하지 않는다면 밀리세컨드로 단위로 값을 지정해야 한다.
이 속성을 설정한다면 fixed-rate, trigger, cron, ref 속성은 지정해선 안 된다.

(5) fixed rate 트리거는 내부적으로 PeriodicTrigger를 사용한다.
time-unit 속성을 사용하지 않는다면 밀리세컨드로 단위로 값을 지정해야 한다.
이 속성을 설정한다면 fixed-delay, trigger, cron, ref 속성은 지정해선 안 된다.

(6) 폴러의 내부 빈 정의를 참조하는 ID (org.springframework.integration.scheduling.PollerMetadata).
디폴트 폴러가 아니라면 (default="true") 최상위 poller 요소엔 이 id 속성이 있어야 한다.

(7) 자세한 내용은 인바운드 채널 어댑터 설정하기를 참고해라.
따로 지정하지 않았을 때 사용하는 기본값은 컨텍스트에 따라 다르다.
PollingConsumer를 사용한다면 이 속성의 기본값은 -1이다.
하지만 SourcePollingChannelAdapter를 사용하는 경우 max-messages-per-poll 속성은 1이 디폴트다.
생략할 수 있다.

(8) 이 값은 내부에서 사용하는 클래스 PollerMetadata에 세팅된다.
따로 지정하지 않으 1000이 디폴트다 (밀리세컨드).
생략할 수 있다.

(9) 또 다른 최상위 poller 빈에 대한 참조.
이 최상위 poller 요소엔 ref 속성이 있어선 안 된다.
이 속성을 설정한다면 fixed-rate, trigger, cron, fixed-delay 속성은 지정해선 안 된다.

(10) 커스텀 태스크 executor를 참조하는 기능을 제공한다.
자세한 내용은 TaskExecutor 지원을 참고해라.
생략할 수 있다.

(11) 이 속성은 내부 org.springframework.scheduling.support.PeriodicTrigger에서 사용할 java.util.concurrent.TimeUnit enum 값을 지정한다.
따라서 이 속성은 fixed-delayfixed-rate과 함께 사용해야 한다.
cron이나 참조 속성 trigger와 함께 사용하면 오류가 발생한다.
PeriodicTrigger는 최대 밀리세컨드까지 지원한다.
그렇기 때문에 밀리세컨드와 세컨드만 사용할 수 있다.
값을 지정하지 않으면 fixed-delay 혹은 fixed-rate 값은 밀리세컨드로 해석된다.
기본적으로 이 enum은 초 단위 인터벌을 사용하는 트리거를 세팅할 때 사용하기 좋다.
hourly, daily, monthly 설정이 필요하다면 이 속성 대신 cron 트리거를 사용하는 것을 권장한다.

(12) org.springframework.scheduling.Trigger 인터페이스를 구현한 스프링 빈을 참조할 수 있다.
하지만 이 속성을 설정한다면 fixed-delay, fixed-rate, cron, ref 속성 중엔 어떤 것도 지정해선 안 된다.
생략할 수 있다.

(13) 그밖의 횡단 관심사cross-cutting concern를 위한 AOP 어바이스들을 지정할 수 있다.
자세한 내용은 트랜잭션 지원을 참고해라.
생략할 수 있다.

(14) 폴러에 트랜잭션을 적용할 수 있다.
자세한 내용은 AOP 어드바이스 체인을 참고해라.
생략할 수 있다.

Examples

1초 간격으로 트리거하는 간단한 폴러는 다음과 같이 설정할 수 있다:

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller fixed-rate="1000"/>
</int:transformer>

fixed-rate 대신 fixed-delay 속성을 사용하는 방법도 있다.

Cron 표현식을 기반으로 동작하는 폴러의 경우, 다음 예제와 같이 cron 속성을 사용한다:

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>

입력 채널이 PollableChannel일 땐 폴러 설정이 필요하다. 좀더 직접적으로 말하면, 앞서 말한대로 triggerPollingConsumer 클래스의 필수 프로퍼티다. 따라서 폴링 컨슈머 엔드포인트 설정에서 하위 요소 poller를 생략하면 예외가 발생할 수 있다. 반대로, pollable이 아닌 채널에 연결된 요소에 폴러를 설정하려고 해도 예외가 발생할 수 있다.

아래 예제처럼 최상위 레벨에도 폴러를 만들 수 있다. 이 경우엔 ref 속성만 있으면 된다:

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller ref="weekdayPoller"/>
</int:transformer>

ref 속성은 내부 폴러 정의에서만 허용한다. 이 속성을 최상위 레벨 폴러에 정의하면 애플리케이션 컨텍스트를 초기화하는 중에 설정 예외가 발생한다.

Global Default Poller

글로벌 디폴트 폴러를 정의하면 여기서 설정이 더 간단해진다. XML DSL 안에 최상위 레벨 폴러가 하나라면 default 속성을 true로 설정할 수 있다. 자바 설정이었다면 반드시 PollerMetadata.DEFAULT_POLLER란 이름으로 PollerMetadata 빈을 선언해야 한다. 이렇게 설정하면, 같은 ApplicationContext에 정의된 PollableChannel을 입력 채널로 가지고 있으면서, 명시해둔 poller 설정이 없는 엔드포인트는 전부 이 디폴트 폴러를 사용한다. 다음은 디폴트 폴러와, 이 폴러를 사용하는 트랜스포머를 보여주는 예시다:

Java DSL Java Kotlin DSL XML
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
    return IntegrationFlows.from(MessageChannels.queue("pollable"))
                           .transform(transformer) // No 'poller' attribute because there is a default global poller
                           .channel("output")
                           .get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

@Bean
public QueueChannel pollable() {
   return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
    ...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
    PollerMetadata()
        .also {
            it.maxMessagesPerPoll = 5
            it.trigger = PeriodicTrigger(3000)
        }

@Bean
fun convertFlow() =
    integrationFlow(MessageChannels.queue("pollable")) {
        transform(transformer) // No 'poller' attribute because there is a default global poller
        channel("output")
}
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>

<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
                 ref="transformer"
                 output-channel="output"/>
Transaction Support

Spring Integration은 폴러에 트랜잭션을 적용할 수 있게 지원하고 있기 때문에, 각각의 receive-and-forward 연산을 하나의 원자적인 작업 단위로 수행할 수 있다. 폴러에 트랜잭션을 설정하려면 하위 요소 <transactional/>을 추가해라. 다음은 사용 가능한 속성들을 보여주는 예시다:

<int:poller fixed-delay="1000">
    <int:transactional transaction-manager="txManager"
                       propagation="REQUIRED"
                       isolation="REPEATABLE_READ"
                       timeout="10000"
                       read-only="false"/>
</int:poller>

좀 더 자세한 정보는 폴러 트랜잭션 지원을 참고해라.

AOP Advice chains

스프링은 프록시 메커니즘을 통해 트랜잭션을 지원한다. 이 메커니즘에선 TransactionInterceptor(AOP 어드바이스)가 폴러가 시작한 메시지 플로우의 트랜잭션 관련 동작을 처리해준다. 그렇기 때문에 반드시 폴러와 관련된 다른 횡단 관심사cross cutting behavior를 위한 별도 어드바이스들을 제공해야 할 때가 있다. 이럴 땐 polleradvice-chain 요소를 사용하면 된다. 이 요소를 이용하면 MethodInterceptor 인터페이스 구현 클래스에 다른 어드바이스들을 더 추가할 수 있다. 다음은 polleradvice-chain을 정의하는 예시다:

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
		method="good" output-channel="output">
	<int:poller max-messages-per-poll="1" fixed-rate="10000">
		 <int:advice-chain>
			<ref bean="adviceA" />
			<beans:bean class="org.something.SampleAdvice" />
			<ref bean="txAdvice" />
		</int:advice-chain>
	</int:poller>
</int:service-activator>

MethodInterceptor 인터페이스를 구현하는 자세한 방법은 스프링 프레임워크 레퍼런스 가이드에 있는 AOP 섹션을 참고해라. 꼭 트랜잭션 설정이 있는 게 아니더라도, 폴러에 어드바이스 체인을 적용하면 메시지 플로우 동작을 여러 가지로 개선할 수 있다.

어드바이스 체인을 사용할 땐, 하위 요소 <transactional/>을 지정할 수 없다. 그대신 <tx:advice/> 빈을 선언하고 <advice-chain/>에 추가해라. 전체 설정에 관한 상세 정보는 폴러 트랜잭션 지원을 참고해라.

TaskExecutor Support

폴링 스레드는 스프링의 TaskExecutor 인스턴스로도 실행할 수 있다. 이렇게 하면 엔드포인트 혹은 엔드포인트 그룹에 동시성을 부여하게 된다. 스프링 3.0부터 코어 스프링 프레임워크는 task 네임스페이스를 지원하는데, 여기 있는 <executor/> 요소는 간단한 스레드 풀 executor를 생성해준다. 이 요소는 풀 사이즈와 큐 용량같은 동시성 설정을 위한 공통 속성들을 받는다. 스레드 풀링 executor를 설정하면 부하 속에서 엔드포인트가 동작하는 방식에 상당한 차이를 만들어낼 수 있다. 엔드포인트의 성능은 반드시 고려해야 하는 중요 인자 중 하나이기 때문에 (또 다른 중요 인자로는 엔드포인트가 구독하는 채널의 예상 볼륨이 있다), 이런 설정들은 엔드포인트별로 따로 이용할 수 있다. XML 네임스페이스를 이용해 설정한 폴링 엔드포인트에서 동시성 효과를 누리려면, <poller/> 요소에 task-executor 참조를 제공하고, 아래 예시에 보이는 프로퍼티들 중 원하는 것들을 지정해라:

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

task-executor를 제공하지 않으면 컨슈머의 핸들러는 호출자의 스레드에서 실행된다. 일반적인 상황에서 호출자는 보통 디폴트 TaskScheduler다 (태스크 스케줄러 설정하기 참고). task-executor 속성은 스프링의 TaskExecutor 인터페이스를 구현한 빈의 이름을 참조할 수 있다는 점도 알아두면 좋다. 위 예제에 보이는 executor 요소는 편의상 표기했으니 참고해라.

폴링 컨슈머의 배경 이론에 대해 설명하면서 언급했듯이, 폴링 컨슈머는 이벤트 기반 동작을 시뮬레이션할 때에도 활용할 수 있다. receive 타임아웃은 길고 인터벌은 짧은 트리거를 이용하면, 메시지 소스를 폴링하는 방식이더라도 메시지가 도착하면 매우 빠르게 반응할 수 있다. 단, 이 테크닉은 소스를 호출하면 타임아웃되기 전까지 블로킹되는 경우에만 가능하다. 예를 들면 파일 폴러는 블로킹되지 않는다. receive()를 호출할 때마다 즉시 반환되며, 새 파일이 있을 수도 없을 수도 있다. 따라서 파일 폴러에 receive-timeout을 길게 설정하더라도, 블로킹되지 않기 때문에 타임아웃을 이용할 수 없다. 한편 Spring Integration의 자체 큐 기반 채널을 사용한다면 타임아웃을 적절히 활용할 수 있다. 다음 예제는 폴링 컨슈머로 메시지를 거의 도착하는 즉시 수신하는 방법을 보여준다:

<int:service-activator input-channel="someQueueChannel"
    output-channel="output">
    <int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>

이 테크닉은 내부에서 정해진 시간 동안 대기하는 스레드를 하나 사용하는 것이 전부기 때문에 오버헤드가 많이 따라오지 않는다. 이 스레드는 (예를 들어) 스래싱thrashing, 무한 while 루프에 비하면 CPU 리소스를 거의 잡아먹지 않는다.

10.1.5. Changing Polling Rate at Runtime

폴러를 fixed-delayfixed-rate 속성으로 설정할 땐, 디폴트 구현체는 PeriodicTrigger 인스턴스를 사용한다. PeriodicTrigger는 코어 스프링 프레임워크에 들어있다. 이 클래스는 인터벌을 생성자 인자로만 받기 때문에 런타임에 변경이 불가능하다.

하지만 org.springframework.scheduling.Trigger 인터페이스는 직접 구현할 수 있다. 물론, 처음엔 PeriodicTrigger를 사용해보는 것도 좋다. 이후에 인터벌(period)을 위한 setter를 추가하거나, 트리거 자체에 스로틀링throttling 로직을 직접 넣을 수도 있다. nextExecutionTime을 호출해서 다음번 폴링을 스케줄링할 때마다 이 period 프로퍼티를 사용하는 거다. 이 커스텀 트리거를 폴러 안에서 사용하려면 애플리케이션 컨텍스트에 커스텀 트리거 빈을 정의하고, trigger 속성에서 이 인스턴스를 참조해서 폴러 설정에 의존성을 주입해줘라. 그러면 트리거 빈에 대한 참조를 가져와서 폴링 인터벌을 변경할 수 있다.

예제가 필요하다면 Spring Integration Samples 프로젝트를 확인해봐라. dynamic-poller라는 샘플에서 커스텀 트리거를 사용해서 런타임에 폴링 인터벌을 변경하는 것을 확인할 수 있다.

이 샘플에선 org.springframework.scheduling.Trigger 인터페이스를 구현한 커스텀 트리거를 사용한다. 이 샘플의 트리거는 스프링의 PeriodicTrigger 구현체를 기반으로 만들었다. 하지만 커스텀 트리거의 필드들은 final이 아니며, 프로퍼티들엔 명시적인 getter와 setter가 있어 런타임에 폴링 인터벌을 동적으로 변경할 수 있다.

그렇지만 Trigger 메소드는 nextExecutionTime()이기 때문에, 다이나믹 트리거를 변경하더라도 다음번 폴링 전까진 기존 설정을 그대로 사용한다는 점을 이해해야 한다. 다음번 실행 시간으로 설정된 시간이 되기 전에 트리거를 강제로 동작시킬 수 있는 방법은 없다.

10.1.6. Payload Type Conversion

이 문서에선 메시지나 임의의 Object를 입력 파라미터로 받는 다양한 엔드포인트들의 상황별 설정과 구현 예시도 만날 수 있다. Object를 파라미터로 받을 땐, Object를 메시지 페이로드에 매핑하거나, 페이로드 혹은 헤더에 있는 특정 필드에 매핑시킨다 (SpELSpring Expression Language 사용 시). 하지만 엔드포인트 메소드의 입력 파라미터 타입이 페이로드나 하위 필드 타입과 항상 일치하는 것은 아니다. 이럴 땐 타입을 변환해줘야 한다. Spring Integration을 사용하면 타입 컨버터를 손쉽게 등록할 수 있다 (스프링 ConversionService를 이용해서). Spring Integration은 integrationConversionService라는 자체 변환 서비스 빈을 가지고 있다. 이 빈은 컨버터를 하나라도 정의하면 Spring Integration 인프라에서 즉시 자동으로 생성한다. 컨버터를 등록하려면 org.springframework.core.convert.converter.Converter나, org.springframework.core.convert.converter.GenericConverter 또는 org.springframework.core.convert.converter.ConverterFactory를 구현하면 된다.

Converter 구현체는 무엇보다도 간결한데, 어떤 타입을 다른 타입으로 변환하는 일을 담당한다. 클래스 계층 구조로 변환하는 것 같이 더 정교한 조작이 필요할 땐 GenericConverter를 사용하면 되고, 어쩌면 ConditionalConverter를 구현할 수도 있다. 이 인터페이스들은 from, to 타입 descriptor를 사용하므로 좀더 복잡한 변환 로직을 만들 수 있다. 예를 들어 변환 대상이 Something이라는 추상 클래스이고 (파라미터 타입, 채널 데이터 타입 등), Thing1Thing이라는 구현체가 있으며, 입력 타입에 따라 둘 중 하나로 변환하려 한다면 GenericConverter를 사용하면 된다. 자세한 내용은 아래 인터페이스들의 Javadoc을 참고해라:

컨버터를 구현했다면, 아래 예제와 같이 네임스페이스를 이용해 간편하게 등록할 수 있다:

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>

아니면 아래 예제처럼 내부 빈으로 사용해도 된다:

<int:converter>
    <bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>

Spring Integration 4.0부터는 다음 예제처럼 어노테이션을 이용해 위와 동일한 설정을 만들 수 있다:

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

	public Number convert(Boolean source) {
		return source ? 1 : 0;
	}

}

아니면 아래처럼 @Configuration 어노테이션을 사용해도 된다:

@Configuration
@EnableIntegration
public class ContextConfiguration {

	@Bean
	@IntegrationConverter
	public SerializingConverter serializingConverter() {
		return new SerializingConverter();
	}

}

스프링 프레임워크에서 애플리케이션 컨텍스트를 설정할 땐 직접 conversionService 빈을 추가할 수 있다 (ConversionService 설정하기 챕터 참고). 이 서비스는 bean을 생성하고 설정할 때 변환이 필요한 경우에 사용한다.

반면, integrationConversionService는 런타임 변환에 사용한다. 이들의 용도는 완전히 다르다. 데이터 타입 채널, 페이로드 타입 트랜스포머 등에서, bean 생성자 인자와 프로퍼티들을 연결하는 용도로 만든 컨버터를 사용해, 런타임에 메시지로 스프링 인티그레이션 표현식을 평가하면 의도치 않은 결과가 나올 수 있다.

하지만 그래도 스프링 ConversionService를 Spring Integration IntegrationConversionService로 사용하고 싶다면, 아래 예제와 같이 애플리케이션 컨텍스트 안에 alias를 설정해주면 된다:

<alias name="conversionService" alias="integrationConversionService"/>

이렇게 하면 ConversionService에서 제공하는 컨버터들을 Spring Integration의 런타임 변환에 사용할 수 있다.

10.1.7. Content Type Conversion

5.0 버전부터 기본적으로 메소드 실행 메커니즘은 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 인프라를 이용한다. 이때 HandlerMethodArgumentResolver 구현체는 (ex. PayloadArgumentResolver, MessageMethodArgumentResolver) MessageConverter 인터페이스를 사용해 전달받은 payload를 타겟 메소드의 인자 타입으로 변환할 수 있다. 변환 자체는 메시지 헤더 contentType을 가지고 수행할 수 있다. Spring Integration은 ConfigurableCompositeMessageConverter를 제공한다. 이 구현체는 등록된 컨버터 목록에 로직을 위임하는 데, 컨버터 중 하나가 null이 아닌 값을 반환할 때까지 등록된 컨버터를 실행해본다. ConfigurableCompositeMessageConverter는 기본적으로 다음과 같은 컨버터들을 제공한다 (등록한 순서대로 실행한다):

  1. MappingJackson2MessageConverter (클래스패스에 Jackson 프로세서가 있다면)
  2. ByteArrayMessageConverter
  3. ObjectStringMessageConverter
  4. GenericMessageConverter

각각의 용도나 변환에 사용할 적절한 contentType 값 등은 Javadoc을 확인해봐라 (바로 위에 링크를 걸어뒀다). ConfigurableCompositeMessageConverter를 사용하는 이유는 앞에 있는 디폴트 컨버터들을 사용하든 안 하든, 다른 MessageConverter 구현체들과 함께 제공할 수 있기 때문이다. 아래 예제처럼 디폴트 컨버터를 재정의해서 애플리케이션 컨텍스트에 적절한 빈으로 등록해줄 수도 있다:

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
    List<MessageConverter> converters =
        Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
                 new JavaSerializationMessageConverter());
    return new ConfigurableCompositeMessageConverter(converters);
}

위에 보이는 두 가지 컨버터는 디폴트 컨버터들보다 앞에 등록된다. 뿐만 아니라, ConfigurableCompositeMessageConverter를 사용하지 않고 자체 MessageConverterintegrationArgumentResolverMessageConverter란 이름으로 빈을 등록해서 사용할 수도 있다 (IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 프로퍼티를 설정해서).

SpEL 메소드를 실행하는 경우엔 (contentType 헤더를 비롯한) MessageConverter 기반 변환은 이용할 수 없다. 이 경우 페이로드 타입 변환에서 설명한 일반적인 클래스 간 변환만 가능하다.

10.1.8. Asynchronous Polling

메시지를 비동기로 폴링하려면, 폴러의 task-executor 속성이 기존 TaskExecutor 빈 인스턴스를 가리키도록 만들어주면 된다 (스프링 3.0에선 task 네임스페이스를 이용해 손쉽게 설정할 수 있다). 하지만 폴러에 TaskExecutor를 설정할 땐 반드시 이해하고 넘어가야 하는 것들이 몇 가지 있다.

폴러와 TaskExecutor라는 두 가지 설정이 있다는 점에서 문제가 시작된다. 이 둘의 조화가 서로 맞아야 한다. 그렇지 않으면 인위적인 메모리 누수memory leak가 발생할 수 있다.

아래 설정을 한 번 생각해보자:

<int:channel id="publishChannel">
    <int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
	<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />

위 예시는 장단이 맞지 않는 설정이다.

기본적으로 태스크 executor는 무한한unbounded 태스크 큐를 가진다. 폴러는 모든 스레드가 블로킹돼 새 메시지가 도착하길 기다리거나 타임아웃되서 만료되길 기다리고 있더라도 계속해서 새 태스크를 예약한다. 타임아웃을 5초로 두고 태스크를 실행하는 20개의 스레드가 있다고 가정하면, 초당 4개 꼴로 태스크가 실행된다. 하지만 초당 20개 꼴로 새 태스크가 예약되므로, 태스크 executor 안에 있는 내부 큐는 초당 16개의 속도로 증가하기 때문에 (프로세스는 유휴 상태idle인데도) 메모리 누수memory leak가 발생한다.

이 문제를 해결하는 방법 중 하나로는, 태스크 executor의 queue-capacity 속성을 설정하는 방법이 있다. 0이라도 상관 없다. 태스크 Executor의 rejection-policy 속성을 설정해서 (DISCARD 등으로) 큐에 메시지를 넣을 수 없을 때 수행할 작업을 지정하는 식으로 관리하는 방법도 있다. 다시 말하지만, TaskExecutor를 설정한다면 깊게 이해하고 넘어가야 하는 부분이 있다. 이 주제에 대해 자세히 알고 싶다면 스프링 레퍼런스 매뉴얼에 있는 “태스크 실행과 예약”을 참고해라.

10.1.9. Endpoint Inner Beans

엔드포인트 중에는 composite 빈으로 사용하는 엔드포인트가 꽤 있다. 컨슈머들과 폴링 인바운드 채널 어댑터들도 모두 마찬가지다. 컨슈머는 (폴링, 이벤트 기반 컨슈머 모두) 동작을 MessageHandler에 위임한다. 폴링 어댑터는 MessageSource에 위임해서 메시지를 가져온다. 한번 씩 테스트를 진행할 때나, 운영 중 런타임에 위임 대상 객체delegate의 빈 참조를 얻어와 설정을 변경할 수 있다면 유용할 거다. 이 빈들의 이름은 정해진 규칙을 따르고 있기 때문에, 이 이름을 통해 ApplicationContext에서 꺼내올 수 있다. MessageHandler 인스턴스는 someConsumer.handler같은 빈 ID로 애플리케이션 컨텍스트에 등록된다 (여기서 ‘consumer’는 엔드포인트의 id 속성 값이다). MessageSource 인스턴스의 빈 ID는 somePolledAdapter.source와 같은 모양새이며, 여기서 ‘somePolledAdapter’는 어댑터의 ID다.

위에서 설명한 내용은 프레임워크의 자체 컴포넌트에만 해당하는 이야기다. 하지만 스프링을 이용할 땐 자체 컴포넌트 대신 다음 예제와 같이 내부 빈 정의를 사용할 수도 있다:

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="foo">
    <beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>

이 빈은 내부에 선언된 다른 빈들처럼 처리돼서 애플리케이션 컨텍스트에 등록되지 않는다. 이 빈에 다른 방법으로 액세스하고 싶다면 최상위 레벨에 id를 사용해 선언하고 대신 ref 속성으로 참조해라. 자세한 내용은 스프링 문서를 참고해라.


10.2. Endpoint Roles

4.2 버전부턴 엔드포인트를 role에 할당할 수 있다. 이 role을 통해 여러 엔드포인트들을 한 번에 시작하고 중지할 수 있다. 리더십이 부여되거나 회수될 때 엔드포인트 셋을 시작, 중지할 수 있는 리더십 선출leadership election을 사용한다면 활용도가 더 높다. 스프링은 애플리케이션 컨텍스트에 IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER라는 이름으로 SmartLifecycleRoleController 빈을 등록한다. 수명 주기를 제어해야 한다면 이 빈을 주입하거나 @Autowired하면 된다:

<bean class="com.some.project.SomeLifecycleControl">
    <property name="roleController" ref="integrationLifecycleRoleController"/>
</bean>

엔드포인트를 role에 할당할 땐 XML 설정이나, 자바 설정을 이용해도 되고, 코드로 직접 할당할 수도 있다. 다음은 XML을 사용해 엔드포인트 role을 설정하는 예시다:

<int:inbound-channel-adapter id="ica" channel="someChannel" expression="'foo'" role="cluster"
        auto-startup="false">
    <int:poller fixed-rate="60000" />
</int:inbound-channel-adapter>

다음은 자바 코드에서 생성하는 빈에 엔드포인트 role을 설정하는 방법을 보여주는 예시다:

@Bean
@ServiceActivator(inputChannel = "sendAsyncChannel", autoStartup="false")
@Role("cluster")
public MessageHandler sendAsyncHandler() {
    return // some MessageHandler
}

아래 예제는 자바 메소드에 엔드포인트 role을 설정하고 있다:

@Payload("#args[0].toLowerCase()")
@Role("cluster")
public String handle(String payload) {
    return payload.toUpperCase();
}

다음은 자바에서 SmartLifecycleRoleController를 사용해 엔드포인트 role을 설정하는 방법이다:

@Autowired
private SmartLifecycleRoleController roleController;
...
    this.roleController.addSmartLifeCycleToRole("cluster", someEndpoint);
...

다음은 자바에서 IntegrationFlow를 이용해 엔드포인트 role을 설정하는 예시다:

IntegrationFlow flow -> flow
        .handle(..., e -> e.role("cluster"));

이 예시들은 모두 cluster role에 엔드포인트를 추가한다.

roleController.startLifecyclesInRole("cluster") 메소드를 호출하면 엔드포인트를 시작하고, stopLifecyclesInRole 메소드를 호출하면 중지된다.

SmartLifecycle을 구현한 객체라면 꼭 엔드포인트가 아니어도 코드로 직접 추가할 수 있다.

SmartLifecycleRoleControllerApplicationListener<AbstractLeaderEvent>를 구현하고 있으며, 리더십을 부여받거나 회수하면 (어떤 빈이 OnGrantedEventOnRevokedEvent를 게시하면) 그에 해당하는 SmartLifecycle 객체들을 자동으로 시작하고 중지해준다.

리더십 선출leadership election을 이용해 컴포넌트들을 시작하고 중지한다면 XML 속성 auto-startup(빈 프로퍼티 autoStartup)을 false로 설정하는 것이 중요한데, 이렇게 해야 컨텍스트 초기화 중에 애플리케이션 컨텍스트가 컴포넌트들을 시작하지 않는다.

SmartLifecycleRoleController는 4.3.8 버전부터 다음과 같은 여러 가지 상태 메소드들을 제공한다:

public Collection<String> getRoles() // (1)

public boolean allEndpointsRunning(String role) // (2)

public boolean noEndpointsRunning(String role) // (3)

public Map<String, Boolean> getEndpointsRunningStatus(String role) // (4)

(1) 관리 중인 role 리스트를 반환한다.
(2) 이 role에 해당하는 모든 엔드포인트가 실행 중이면 true를 반환한다.
(3) 이 role에 해당하는 엔드포인트들 중, 실행 중인 엔드포인트가 없으면 true를 반환한다.
(4) 컴포넌트 이름 : 실행 상태를 매핑한 맵을 반환한다. 컴포넌트 이름은 일반적으로 빈의 이름을 뜻한다.


10.3. Leadership Event Handling

엔드포인트 그룹은 리더십이 부여되거나 회수됨에 따라 시작, 중지할 수 있다. 클러스터가 하나의 리소스를 공유할 때, 클러스터 안에 있는 인스턴스 중에서 단 하나의 인스턴스만 이 리소스를 컨슘해야 하는 상황 등에 리더십 기능을 활용할 수 있다. 대표적인 예시는 공유 디렉토리를 폴링하는 파일 인바운드 채널 어댑터다. (파일 읽기 참고).

리더 선출에 참여하려면 “leader initiator”라는 컴포넌트를 애플리케이션 컨텍스트 안에 하나 생성하면 된다. 이 컴포넌트를 사용하면, 리더로 선출되었거나, 리더십이 회수됐거나, 리더가 되려면 필요한 리소스를 획득하지 못한 경우에 통지 받을 수 있다. 일반적으로 leader initiator는 SmartLifecycle이기 때문에, 컨텍스트가 시작할 때 시작되며 (선택 사항), 리더십이 변경되면 통지를 보낸다. 뿐만 아니라 5.0 버전부터는 publishFailedEventstrue로 설정하면 실패 통보를 받을 수 있어, 실패 발생 시 원하는 조치를 취할 수 있다. 컨벤션에 따라 이 콜백들을 수신할 Candidate를 하나 제공해야 한다. 추가로, 프레임워크에서 제공하는 Context 객체를 통해 리더십을 회수할 수도 있다. 원하는 코드에서 o.s.i.leader.event.AbstractLeaderEvent 인스턴스(OnGrantedEventOnRevokedEvent의 상위 클래스)를 수신listen하고 적절히 대응할 수도 있다 (ex. SmartLifecycleRoleController를 이용해서). 이 이벤트엔 Context 객체에 대한 참조가 담겨있다. 다음은 Context 인터페이스의 정의다:

public interface Context {

	boolean isLeader();

	void yield();

	String getRole();

}

5.0.6 버전부턴 이 컨텍스트를 통해 candidate의 role에 접근할 수 있다.

Spring Integration은 기초적인 leader initiator 구현체를 제공한다. 이 구현체는 LockRegistry 인터페이스를 기반으로 동작한다. 이 구현체를 사용하려면 다음과 같이 인스턴스를 만들어 빈으로 등록해줘야 한다:

@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) {
    return new LockRegistryLeaderInitiator(locks);
}

lock 레지스트리에 문제가 없다면, 리더는 최대 하나만 존재할 수 있다. lock 레지스트리에서 lock이 만료되거나 문제가 생겼을 때 예외를 던지기도 한다면 (InterruptedException을 던지면 가장 좋다), 리더가 없는 상태로 유지되는 시간은 lock 구현체 자체가 가진 대기 시간만큼만으로 최소화할 수 있다. 기본적으로, busyWaitMillis 프로퍼티로 인해 지연 시간이 좀 더 늘어나는데, 이는 lock이 만료된 사실을 lock을 다시 얻어오려고 해봐야만 알 수 있는 (좀 더 현실적인) 상황에서 CPU가 고갈되지 않게 하기 위한 장치다.

Zookeeper를 이용한 리더십 선출과 이벤트에 대한 자세한 내용은 Zookeeper 리더십 이벤트 핸들링을 참고해라.


10.4. Messaging Gateways

게이트웨이는 Spring Integration에서 제공하는 메시지 관련 API들을 보이지 않게 숨겨준다. 덕분에 Spring Integration API를 인식하지 않고 애플리케이션의 비지니스 로직을 작성할 수 있다. 범용 게이트웨이를 사용하면 간단한 인터페이스 하나와만 상호 작용하는 코드를 만들 수 있다.

10.4.1. Enter the GatewayProxyFactoryBean

앞에서도 언급했지만, Spring Integration API (게이트웨이 클래스도 포함해서)에 대한 의존성을 없앨 수 있다면 정말 좋을 거다. Spring Integration은 의존성을 없앨 수 있도록 GatewayProxyFactoryBean을 제공한다. 이 클래스는 원하는 인터페이스에 대한 프록시를 생성해주며, 내부적으론 아래 있는 것과 같은 게이트웨이 메소드들을 실행해준다. 비즈니스 메소드엔 의존성 주입을 통해 이 인터페이스를 연결해주면 된다.

다음은 Spring Integration과 상호 작용하는 데 사용할 인터페이스 예시다:

package org.cafeteria;

public interface Cafe {

    void placeOrder(Order order);

}

10.4.2. Gateway XML Namespace Support

네임스페이스 역시 지원하고 있다. 다음과 같이 원하는 인터페이스를 서비스로 설정할 수 있다:

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

이렇게 설정해주고 나면, 다른 빈에는 cafeService를 주입할 수 있으며, Cafe 인터페이스의 프록시 인스턴스를 통해 이 메소드를 호출하는 코드에선 Spring Integration API를 인지하지 못한다. 일반적으로 Spring Remoting(RMI, HttpInvoker 등)과 유사하게 처리할 수 있다. gateway 요소를 사용하는 예시는 부록에 있는 “Samples”를 참고해라 (Cafe 데모).

위 설정에 있는 디폴트 속성들은 게이트웨이 인터페이스에 있는 모든 메소드에 적용된다. reply 타임아웃을 지정하지 않으면 호출 스레드는 응답을 무한정 기다린다. 응답이 없을 때의 게이트웨이 동작을 참고해라.

기본값들은 메소드별로 재정의할 수 있다. 어노테이션 및 XML을 이용한 게이트웨이 설정을 참고해라.

10.4.3. Setting the Default Reply Channel

게이트웨이는 응답을 수신listen하는 익명 reply 채널을 임시로 자동 생성하기 때문에, 일반적으론 default-reply-channel을 지정할 필요가 없다. 하지만 상황에 따라 default-reply-channel(또는 HTTP, JMS 등과 같은 어댑터 게이트웨이에선 reply-channel)을 정의하라는 로그가 남을 때도 있다.

그 배경을 이해하기 위해 게이트웨이의 내부 동작을 간단하게 설명해보겠다. 게이트웨이는 reply 채널로 사용할 임시 point-to-point 채널을 생성한다. 이 채널은 익명 채널이며, 메시지 헤더 replyChannel에 추가된다. default-reply-channel(원격 어댑터 게이트웨이에선 reply-channel)을 직접 명시한다면 publish-subscribe 채널을 가리킬 수도 있다. publish-subscribe란 이름을 보면 알 수 있듯이, 이 채널은 구독자를 둘 이상 추가할 수 있는 채널이다. 그러면 Spring Integration은 내부적으로 임시 replyChannel과 명시적으로 정의한 default-reply-channel 사이에 브리지를 생성한다.

응답을 게이트웨이 뿐 아니라 다른 컨슈머에게도 전송하고 싶다고 가정해보자. 그러려면 두 가지 조건을 만족해야 한다:

게이트웨이의 디폴트 전략에서 헤더에 추가하는 reply 채널은 익명, point-to-point 채널이기 때문에 이런 요구 사항들을 충족하지 않는다. 다시 말해, 다른 구독자가 채널을 이용할 수 없고, 가능하다고 쳐도 해당 채널은 point-to-point로 동작하기 때문에 하나의 구독자만 메시지를 받을 수 있다. default-reply-channel을 정의하면 원하는 채널을 가리킬 수 있다. 즉, publish-subscribe-channel을 정의하면 된다. 그러면 게이트웨이는 이 채널에서 헤더에 저장된 임시 익명 reply 채널로 이어주는 브리지를 생성한다.

인터셉터를 통해 모니터링이나 감사audit를 진행하기 위한 (ex. wiretap) reply 채널을 명시적으로 제공하고 싶을 수도 있다. 채널 인터셉터를 설정할 때도 채널의 이름이 필요하다.

5.4 버전부터 게이트웨이 메소드의 리턴 타입이 void인 경우, 스프링은 replyChannel 헤더가 명시돼있지 않다면 replyChannel 헤더에 nullChannel 빈 참조를 채운다. 이렇게 하면 다운스트림 플로우에서 발생할 수 있는 응답을 전부 폐기하므로, 단방향 게이트웨이를 구현할 수 있다.

10.4.4. Gateway Configuration with Annotations and XML

아래 예제를 살펴보자. 앞에 있는 Cafe 인터페이스 예제를 가져와서 @Gateway 어노테이션을 추가했다:

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

다음 예제와 같이 @Header 어노테이션을 사용하면 메시지 헤더에 해당하는 값들을 추가할 수 있다:

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

XML을 이용해 게이트웨이 메소드를 설정하고 싶다면, 다음과 같이 게이트웨이 설정에 method 요소를 추가해주면 된다:

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

메소드마다 실행할 때 사용할 헤더도 XML로 제공할 수 있다. 사실상 설정하려는 헤더가 정적이어서, 게이트웨이의 메소드 시그니처에 @Header 어노테이션을 사용해 헤더를 끼워 넣고 싶진 않을 때 유용하다. 예시로 대출을 중개한다고 가정해보면, 어떤 요청 타입이 들어왔는지에 따라 (견적을 하나만 요청했는지, 전부 요청했는지) 대출 견적을 집계하는 방식을 다르게 가져가려 한다. 어떤 게이트웨이 메소드를 실행했는지를 보고 요청 타입을 판별하는 것은 가능은 하더라도 관심사의 분리separation of concerns 패러다임에 맞지 않는다 (메소드는 자바 아티팩트다). 하지만 메시지 헤더에 의도(메타 정보)를 표현하는 것은, 메시징 아키텍처에선 자연스러운 일이다. 다음은 두 가지 메소드에 각각 다른 메시지 헤더를 추가하는 예시다:

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

위 예시에선 게이트웨이의 메소드에 따라 ‘RESPONSE_TYPE’ 헤더에 다른 값을 설정한다.

requestChannel 등을 <int:method/>@Gateway 어노테이션에 둘 다 지정하면 어노테이션에 있는 값을 사용한다.

XML에 인자를 받지 않는 게이트웨이를 지정했을 때, 인터페이스 메소드에 @Gateway@Payload 어노테이션이 모두 있는 경우 (payloadExpression이나 <int:method/> 요소의 payload-expression을 지정한 경우), @Payload 값은 무시된다.

Expressions and “Global” Headers

<header/> 요소에선 value 대신 expression을 사용할 수 있다. 여기 지정한 SpEL 표현식을 평가해서 헤더 값을 결정한다. 5.2 버전부터 평가 컨텍스트의 #root 객체는 getMethod()getArgs() 접근자를 가지고 있는 MethodArgsHolder다.

아래 있는 두 가지 표현식 평가 컨텍스트 변수는 5.2 버전부터 deprecated되었다:

java.reflect.Method는 직렬화가 불가능하다. 나중에 메시지를 직렬화하게 되면 method 표현식이 담겨있는 헤더는 손실된다. 그렇기 때문에 직렬화가 필요한 경우에는 method.name이나 method.toString()을 사용하는 게 좋다. toString() 메소드는 파라미터와 리턴 타입을 포함한 메소드의 정보를 String으로 표현해준다.

3.0 버전부터 <default-header/> 요소를 정의하면 어떤 메소드가 실행됐는지와는 무관하게 게이트웨이에서 생성한 모든 메시지에 헤더를 추가할 수 있다. 헤더를 메소드에 직접 정의하면 디폴트 헤더보다 우선시한다. 여기에서 메소드에 정의한 헤더는 서비스 인터페이스의 @Header 어노테이션을 재정의한다. 하지만 디폴트 헤더는 서비스 인터페이스의 @Header 어노테이션을 재정의하지 않는다.

게이트웨이는 이제 모든 메소드에 적용되는 (재정의하지만 않는다면) default-payload-expression도 지원한다.

10.4.5. Mapping Method Arguments to a Message

앞에서 보여준 방법대로 설정하면 메소드 인자를 어떤 메시지 요소(페이로드, 헤더)에 매핑할지를 직접 선택할 수 있다. 반면, 설정이 명시되어 있지 않으면 특정 컨벤션을 사용해 매핑을 수행하게 된다. 이 컨벤션은 상황에 따라서 어떤 인자가 페이로드이고, 어떤 인자를 헤더에 매핑해야 하는지 결정할 수 없을 때도 있다. 아래 예시를 살펴보자:

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

첫 번째 케이스에선, 컨벤션에 따라 첫 번째 인자를 (Map만 아니라면) 페이로드에 매핑하고 두 번째 인자가 헤더가 된다.

두 번째 케이스에선 어떤 인자를 페이로드로 취급해야 하는지 프레임워크가 결정할 수 없다 (첫 번째 케이스에서 thing1 인자가 Map인 경우도 마찬가지다). 결과적으로 매핑에 실패한다. 이 문제는 일반적으로 payload-expression이나, @Payload 또는 @Headers 어노테이션을 사용하면 해결할 수 있다.

아니면 (컨벤션으로 해결할 수 없다면) 메소드를 실행할 때 메시지를 매핑하는 일을 전부 직접 구현하는 방법도 있다. MethodArgsMessageMapper를 구현해서 <gateway/>mapper 속성에 지정해주면 된다. 이 매퍼는 MethodArgsHolder를 메시지로 매핑하는 역할을 담당한다. MethodArgsHolderjava.reflect.Method 인스턴스와 인자들이 담겨있는 Object[]를 래핑한 간단한 클래스다. 커스텀 매퍼를 제공할 땐 게이트웨이의 default-payload-expression 속성이나 <default-header/> 요소는 사용할 수 없다. 마찬가지로 <method/> 요소에선 payload-expression 속성과 <header/> 요소를 허용하지 않는다.

Mapping Method Arguments

아래 코드에선 메소드 인자를 메시지에 매핑하는 방법과 함께, 잘못된 설정의 예시를 몇 가지 확인할 수 있다:

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("#args[0] + #args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(#args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  (1)

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}

(1) SpEL 변수 #this는 인자를 나타낸다 — 이 경우 s의 값 .

같은 설정을 XML을 이용하면 약간 달라지는데, 이땐 메소드 인자에 대한 #this 컨텍스트가 없기 때문이다. 하지만 아래 예제와 같이 #args 변수를 사용하면 표현식으로 메소드 인자를 참조할 수 있다:

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="#args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(#args[0])"/>
  <int:method name="send3" payload-expression="#method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="#args[2].toUpperCase()"/>
  </int:method>
</int:gateway>

10.4.6. @MessagingGateway Annotation

4.0 버전부터 게이트웨이 서비스 인터페이스를 설정하기 위해 xml 요소 <gateway/>를 정의하는 대신 @MessagingGateway 어노테이션으로 간단히 마킹할 수 있다. 아래 있는 두 예제를 보면 동일한 게이트웨이를 설정하는 두 가지 방법을 비교해볼 수 있다:

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}

Spring Integration은 XML 설정과 유사하게 컴포넌트 스캔 중에도 이런 어노테이션들을 발견하면, 메시징 인프라를 이용해 proxy 구현체를 생성한다. 여기서 말하는 스캔을 수행하고 애플리케이션 컨텍스트에 BeanDefinition을 등록하려면 @Configuration 클래스에 @IntegrationComponentScan 어노테이션을 추가해라. 표준 @ComponentScan 인프라에선 인터페이스들을 처리해주지 않는다. 그렇기 때문에 인터페이스 위에 있는 @MessagingGateway 어노테이션을 정제해서 관련 GatewayProxyFactoryBean 인스턴스를 등록할 수 있도록 커스텀 @IntegrationComponentScan 로직을 도입했다. 어노테이션 지원도 함께 참고해라.

서비스 인터페이스 위에 @MessagingGateway, @Profile 어노테이션을 함께 마킹해주면, 해당 프로파일이 활성화되었을 때에만 빈을 생성할 수 있다.

XML 설정을 전혀 사용하지 않는다면 @Configuration 클래스 중 최소 하나엔 @EnableIntegration 어노테이션을 선언해줘야 한다. 자세한 내용은 설정과 @EnableIntegration을 참고해라.

10.4.7. Invoking No-Argument Methods

게이트웨이 인터페이스에서 인자를 받지 않는 메소드를 호출할 땐, PollableChannel에서 Message를 수신하는 게 기본 동작이다.

하지만 간혹, 인자가 없는 메소드를 트리거해서, 사용자가 파라미터를 제공할 필요가 없는 다운스트림의 다른 구성 요소와 상호 작용하고 싶을 수도 있다. 예를 들면 인자가 없는 SQL 호출이나 저장 프로시저stored procedure를 트리거하고 싶을 수 있다.

send-and-receive 시맨틱스를 달성하려면 반드시 페이로드를 제공해야 한다. 페이로드 생성을 위해 인터페이스 메소드에 파라미터를 추가해야 하는 건 아니다. @Payload 어노테이션을 이용하거나 XML의 method 요소에서 payload-expression 속성을 사용하면 된다. 다음은 페이로드로 이용할 수 있는 몇 가지 예시다:

다음은 @Payload 어노테이션을 사용하는 예시다:

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

@Gateway 어노테이션을 이용할 수도 있다.

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

두 어노테이션이 모두 있다면 (그리고 payloadExpression을 지정했다면) @Gateway를 우선시한다.

어노테이션과 XML을 이용한 게이트웨이 설정도 함께 참고해라.

인자를 받지 않고, 값을 반환하지도 않는 메소드에 페이로드 표현식이 포함되어 있으면, send-only로 처리한다.

10.4.8. Invoking default Methods

게이트웨이 프록시에서 사용하는 인터페이스는 default 메소드를 가질 수 있다. 5.3 버전부턴 프록시 메커니즘 대신 java.lang.invoke.MethodHandle을 이용해 default 메소드를 호출할 수 있도록 프록시에 DefaultMethodInvokingMethodInterceptor를 주입한다. java.util.function.Function같이 JDK에 포함되어있는 인터페이스 역시 게이트웨이 프록시에 사용할 수 있지만, 여기 있는 default 메소드는 호출할 수 없다. JDK 클래스에 대한 MethodHandles.Lookup 인스턴스를 만드는 건 자바 내부 보안 정책에 걸리기 때문이다. 이런 메소드들은 메소드 위에 @Gateway 어노테이션을 명시하거나, @MessagingGateway 어노테이션이나 XML 요소 <gateway>에서 proxyDefaultMethods를 사용해도 앞에 프록시를 둘 수 있다 (메소드의 구현 로직이 없어짐과 동시에 이전 게이트웨이 프록시 방식으로 복귀한다).

10.4.9. Error Handling

게이트웨이를 실행하는 동안에도 에러가 발생할 수 있다. 기본적으로, 게이트웨이의 메소드를 실행하는 동안 다운스트림에서 발생하는 모든 에러는 “그 상태 그대로” 다시 던져진다. 예를 들어서 아래 있는 간단한 플로우를 생각해보자:

gateway -> service-activator

서비스 activator에 의해 호출된 서비스가 MyException을 던진다면 (예를 들어서), 프레임워크는 이를 MessagingException으로 감싸고 failedMessage 프로퍼티에 서비스 activator에 전달된 메시지를 첨부한다. 결과적으로 프레임워크에서 남기는 모든 로그엔 실패에 관한 전체 컨텍스트가 담기게 된다. 기본적으로 게이트웨이에서 예외를 catch하면 MyException을 감싸지 않은 채로 호출부에게 던진다. 게이트웨이 메소드 선언부에 throws 절을 구성하면 cause 체인에서 원하는 예외 타입을 매칭시킬 수 있다. 예를 들어 다운스트림에서 발생한 에러에 대한 모든 원인과 메시지 처리 정보가 포함된 MessagingException을 통으로 catch하고 싶다면, 다음과 유사한 게이트웨이 메소드를 사용하면 된다:

public interface MyGateway {

    void performProcess() throws MessagingException;

}

스프링은 POJO 프로그래밍을 권장하고 있기 때문에, 메시지 처리 인프라에 호출자를 노출하는 게 꺼려질 수 있다.

게이트웨이 메소드에 아무런 throws 절이 없다면 게이트웨이는 cause 트리를 탐색해서 MessagingException이 아닌 RuntimeException을 찾는다. 아무 것도 발견하지 못하면 프레임워크는 MessagingException을 던진다. 앞의 예시에서 MyExceptionSomeOtherException이란 원인을 가지고 있고 메소드가 throws SomeOtherException을 선언하고 있는 경우, 게이트웨이는 감싸진 예외를 언랩unwrap해서 SomeOtherException을 호출부에 던진다.

게이트웨이 선언에 service-interface가 없다면 내부 프레임워크 인터페이스인 RequestReplyExchanger를 사용한다.

아래 예제를 살펴보자:

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

exchange 메소드는 5.0 버전 전엔 throws 절이 없었기 때문에 예외를 언랩unwrap했었다. 이 인터페이스를 사용하면서 이전 언랩unwrap 동작으로 돌아가고 싶다면, 커스텀 service-interface를 사용하거나, 아니면 MessagingExceptioncause에 직접 액세스해야 한다.

하지만 에러를 전파하는 대신 로그로만 남기고 싶을 수도 있고, 예외를 유효한 응답으로 처리해야 할 수도 있다 (호출부가 이해할 수 있는 “에러 메시지”로 적당히 매핑하는 식으로). 게이트웨이는 이를 위한 에러 전용 메시지 채널과 error-channel 속성을 지원한다. 아래 예시에선 ‘transformer’가 Exception으로부터 응답 Message를 생성한다:

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer는 원하는 에러 응답 객체를 생성할 수 있는 간단한 POJO를 이용하면 된다. 이 에러 응답 객체가 바로 호출부에 다시 전송되는 페이로드가 된다. 필요하다면 “에러 플로우” 안에서 정교한 로직을 더 실행할 수 있다. 에러 플로우에선 (Spring Integration의 ErrorMessageExceptionTypeRouter를 비롯한) 라우터, 필터 등을 사용할 수 있다. 하지만 대부분의 경우는 간단한 ‘transformer’만으로도 충분할 거다.

아니면 예외를 로그로만 기록할 수도 있다 (혹은 어딘가에 비동기로 전송하거나). 단방향 플로우를 만든다면 호출부엔 아무 것도 전송되지 않는다. 예외를 완전히 숨기려면suppress 글로벌 nullChannel의 참조를 제공하면 된다 (사실상 /dev/null 처리 방식). 마지막으로, 위에서도 언급했지만, error-channel이 정의돼 있지 않으면 평소와 같이 예외를 전파한다.

@MessagingGateway 어노테이션을 이용한다면 (@MessagingGateway Annotation 참고), errorChannel 속성을 사용할 수 있다.

5.0 버전부터 리턴 타입이 void인 게이트웨이 메소드를 사용하면 (단방향 플로우), 전달되는 각 메시지의 표준 errorChannel 헤더에 error-channel 참조가 채워진다 (제공했다면). 이를 이용하면 표준 ExecutorChannel 설정(또는 QueueChannel) 기반의 다운스트림 비동기 플로우에서 디폴트 글로벌 errorChannel 예외 전송 동작을 재정의할 수 있다. 이전에는 @GatewayHeader 어노테이션이나 <header> 요소를 사용해서 수동으로 errorChannel 헤더를 지정해야 했다. 비동기 플로우의 void 메소드의 경우 error-channel 프로퍼티가 무시됐었고, 그대신 디폴트 errorChannel로 오류 메시지를 전송했었다.

메시징 시스템을 간단한 POJO 게이트웨이를 통해 노출하게 되면 좋은 점도 있지만, 실존하는 내부 메시징 시스템을 "숨기는 것"에는 대가가 따르기 때문에 몇 가지를 생각해봐야 한다. 우리는 자바 메소드가 최대한 빨리 반환되길 바라며, 호출부가 void, 리턴 값이나 예외가 던져지길 기다리며 무한정 멈춰있지기를hang 바라지 않는다. 메시징 시스템 앞에 일반적인 메소드를 프록시로 두는 경우엔, 내부 메시지 처리 동작이 잠재적으로 비동기 특성을 가질 수 있음을 고려해야 한다. 이는 게이트웨이에 의해 시작된 메시지가 필터에 걸려 버려져 응답 생성을 담당하는 구성 요소에 도달하지 못할 수도 있음을 의미한다. 서비스 activator 메소드에서 예외가 발생해 응답을 제공하지 않을 수도 있다 (우린 null 메시지를 생성하지 않기 때문에). 즉, 응답 메시지는 여러 가지 상황으로 인해 도착하지 않을 수 있다. 이는 메시징 시스템에선 지극히 자연스러운 일이다. 이번엔 게이트웨이 메소드로 인해 생길 수 있는 일을 생각해 보자. 게이트웨이 메소드의 입력 인자는 메시지에 통합돼 다운스트림으로 전송된다. 그 응답 메시지는 게이트웨이 메소드의 반환 값으로 변환된다. 따라서 게이트웨이를 호출할 때마다 매번 응답 메시지가 있길 바랄 수도 있다. 그렇지 않으면 게이트웨이 메소드는 영영 반환되지 않고 멈춰버릴hang 수 있다. 이 상황을 해결하는 한 가지 방법은 비동기 게이트웨이를 사용하는 거다 (뒤에서 설명한다). 또 다른 방법으론 reply-timeout 속성을 명시하는 방법이 있다. 게이트웨이는 reply-timeout으로 지정한 시간 이상으로 멈춰있지hang 않으며, 해당 시간만큼 경과하면 'null'을 반환한다. 마지막으로, 서비스 activator에서 'requires-reply'와 같은 다운스트림 플래그를 설정하거나 필터에서 'throw-exceptions-on-rejection'을 설정하는 걸 검토해보는 게 좋다. 이 옵션들은 이 챕터의 마지막 섹션에서 좀더 자세히 논한다.

다운스트림 플로우가 ErrorMessage를 반환하면 해당 payload(Throwable)는 일반적인 다운스트림 에러로 처리된다. error-channel이 설정돼 있으면 에러 플로우로 전송되며, 그렇지 않으면 게이트웨이 호출부로 페이로드를 던진다. 유사하게 error-channel의 에러 플로우에서 ErrorMessage를 반환하면 호출부에 해당 페이로드를 던진다. 이는 Throwable 페이로드를 가진 모든 메시지에 동일하게 적용된다. 비동기 상황에서 호출자에게 직접 Exception을 전파해야 할 때 유용할 거다. 이땐 Exception을 반환하거나 (특정 서비스의 reply로) 던지면 된다. 일반적으로, 비동기 플로우라 하더라도 다운스트림 플로우에서 발생한 예외는 프레임워크가 게이트웨이로 다시 전파해준다. TCP Client-Server Multiplex 샘플에선 호출자에게 예외를 반환하는 두 가지 테크닉을 모두 살펴볼 수 있다. 이 샘플에선 aggregatorgroup-timeout(Aggregator와 Group Timeout 참고)을 이용해 소켓 IO 오류를 발생시키며, discard 플로우에서 대기 중인 스레드에 MessagingTimeoutException 응답을 보낸다.

10.4.10. Gateway Timeouts

게이트웨이는 requestTimeoutreplyTimeout이라는 두 가지 타임아웃 프로퍼티를 가지고 있다. requestTimeout은 채널이 블로킹될 수 있는 경우에만 적용된다 (예를 들어 유한bounded QueueChannel이 가득 찼을 때). replyTimeout 값은 게이트웨이가 응답을 기다리는 시간으로, 이 시간이 지나면 null을 반환한다. 기본값은 무한대다.

이 타임아웃 값들은 게이트웨이의 모든 메소드나 (defaultRequestTimeout, defaultReplyTimeout) MessagingGateway 인터페이스 어노테이션에서 사용할 기본값으로 설정할 수 있다. 개별 메소드에선 자식 요소 <method/>@Gateway 어노테이션에서 이 기본값을 재정의할 수 있다.

5.0 버전부터 다음과 같이 표현식으로 타임아웃을 정의할 수 있다:

@Gateway(payloadExpression = "#args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "#args[1]", replyTimeoutExpression = "#args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

평가 컨텍스트는 BeanResolver를 가지고 있으며 (다른 빈을 참조하려면 @someBean을 사용해라), 배열 변수 #args를 이용할 수 있다.

XML로 설정할 땐, 다음 예제와 같이 타임아웃 속성에 long 값이나 SpEL 표현식을 사용할 수 있다:

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="#args[0]"
                      request-timeout="1000"
                      reply-timeout="#args[1]">
</method>

10.4.11. Asynchronous Gateway

메시징 게이트웨이는 하나의 패턴으로, 메시지 시스템의 기능은 전부 사용하면서도 메시지 처리와 관련된 코드는 숨길 수 있게 해준다. 앞서 언급한 GatewayProxyFactoryBean 덕분에 서비스 인터페이스를 통해 간편하게 프록시를 노출하고, 메시징 시스템에 POJO 기반으로 (자체 도메인 객체나, 프리미티브primitive/문자열, 기타 다른 객체 기반) 접근할 수 있다. 하지만 값을 반환하는 POJO 메소드에서 게이트웨이를 이용하는 경우, 각 요청 메시지(메소드를 실행할 때 생성되는)마다 응답 메시지(메소드를 반환하면 생성되는)가 반드시 있어야 함을 의미한다. 메시징 시스템은 본래 비동기이므로 “각 요청마다 항상 응답이 존재할 것”이라는 규약을 항상 보장하기 어렵다. Spring Integration 2.0은 비동기 게이트웨이를 도입해서, 응답을 기다릴지를 잘 모르겠거나 응답이 도착하기까지 걸리는 시간을 알 수 없는 상황에서도 플로우를 간편하게 시작할 수 있다.

이런 상황을 위해 Spring Integration은 java.util.concurrent.Future 인스턴스를 활용한 비동기 게이트웨이를 지원한다.

XML 설정에선 아무 것도 달라지지 않으며, 비동기 게이트웨이를 정의할 때도 아래 예시처럼 일반적인 게이트웨이를 정의할 때와 동일한 설정을 이용하면 된다:

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

하지만 게이트웨이 인터페이스(서비스 인터페이스)는 다음과 같이 약간 달라진다:

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

위 예제에 보이듯이, 이 게이트웨이 메소드의 반환 타입은 Future다. GatewayProxyFactoryBean은 게이트웨이 메소드의 반환 타입이 Future임을 확인하면 AsyncTaskExecutor를 사용해 즉시 비동기 모드로 전환한다. 비동기 게이트웨이는 딱 이 정도의 차이만 있다. 이런 메소드를 호출하면 항상 Future 인스턴스를 즉시 반환한다. 그런 다음 원하는 타이밍에 Future와 상호 작용해 결과를 얻거나 취소하는 등의 작업을 수행할 수 있다. 또한 평소 Future 인스턴스를 사용할 때와 마찬가지로 get()을 호출하면 타임아웃, 실행 예외 등이 발생할 수 있다. 다음 예제는 비동기 게이트웨이가 반환하는 Future를 사용하는 방법을 보여주는 예시다:

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

좀 더 상세한 예제는 Spring Integration samples 레포지토리에 있는 async-gateway를 참고해라.

ListenableFuture

비동기 게이트웨이 메소드는 4.1 버전부터 ListenableFuture(Spring Framework 4.0에서 도입했다)도 반환할 수 있다. 이 타입을 반환하면 실행 결과를 사용할 수 있는 시점에 (혹은 예외가 발생했을 때) 호출할 콜백을 제공할 수 있다. 게이트웨이가 이 리턴 타입을 감지하고 동시에 task executorAsyncListenableTaskExecutor라면, 이 executor의 submitListenable() 메소드를 실행한다. 다음은 ListenableFuture를 사용하는 방법을 보여주는 예시다:

ListenableFuture<String> result = this.asyncGateway.async("something");
result.addCallback(new ListenableFutureCallback<String>() {

    @Override
    public void onSuccess(String result) {
        ...
    }

    @Override
    public void onFailure(Throwable t) {
        ...
    }
});

AsyncTaskExecutor

GatewayProxyFactoryBean은 기본적으로 리턴 타입이 Future인 게이트웨이 메소드에서 내부 AsyncInvocationTask 인스턴스를 제출submit할 땐 org.springframework.core.task.SimpleAsyncTaskExecutor를 사용한다. 하지만 <gateway/> 요소에서 async-executor 속성을 이용하면 스프링 애플리케이션 컨텍스트 내에서 사용할 수 있는 java.util.concurrent.Executor 구현체를 참조시킬 수 있다.

(디폴트) SimpleAsyncTaskExecutor는 리턴 타입이 FutureListenableFuture일 때를 모두 지원하여며, 각각 FutureTaskListenableFutureTask를 반환한다. CompletableFuture를 참고해라. 디폴트 executor가 있더라도, 아래 예제처럼 외부 executor를 하나 제공하면 로그에서 스레드를 식별할 수 있어 유용하다 (XML을 사용할 땐 executor의 빈 이름을 기반으로 스레드 명이 정해진다):

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

다른 Future 구현체를 반환하고 싶다면, 커스텀 executor를 제공하거나, executor를 완전히 비활성화하고 Future를 다운스트림 플로우의 응답 메시지 페이로드 안에 넣어 반환하면 된다. executor를 비활성화하려면 GatewayProxyFactoryBean을 통해 null로 세팅해라 (setAsyncTaskExecutor(null)). XML로 게이트웨이를 설정할 땐 async-executor=""를 사용해라. @MessagingGateway 어노테이션을 이용해 설정하는 경우 아래와 유사한 코드를 사용하면 된다:

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

리턴 타입이 설정한 executor가 지원하지 않는 Future의 특정 구현체이거나 다른 하위 인터페이스인 경우, 플로우는 호출자의 스레드에서 실행되며, 이땐 플로우에서 반드시 응답 메시지 페이로드에서 요구하는 타입을 반환해야 한다.

CompletableFuture

게이트웨이 메소드는 4.2 버전부터 CompletableFuture<?>를 반환할 수 있다. 이 타입을 반환할 땐 두 가지 모드로 실행할 수 있다:

사용 시나리오

아래 시나리오에선, 호출자 스레드는 CompletableFuture<Invoice>와 함께 즉시 반환된다. 이 feature는 다운스트림 플로우가 게이트웨이에 응답할 때 완료된다 (Invoice 객체와 함께).

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

아래 시나리오에선, 호출자 스레드는 다운스트림 플로우가 게이트웨이에 대한 응답 페이로드로 CompletableFuture<Invoice>를 제공할 때 반환된다. 송장invoice이 준비되면 다른 프로세스에서 future를 완료해야 한다.

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

아래 시나리오에선, 호출자 스레드는 다운스트림 플로우가 게이트웨이에 대한 응답 페이로드로 CompletableFuture<Invoice>를 제공할 때 반환된다. 송장invoice이 준비되면 다른 프로세스에서 future를 완료해야 한다. DEBUG 로그를 활성화했다면 이 시나리오엔 비동기 executor를 사용할 수 없다는 로그가 기록된다.

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

CompletableFuture 인스턴스를 이용해 다음과 같이 응답을 추가로 조작할 수도 있다:

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

Reactor Mono

GatewayProxyFactoryBean은 5.0 버전부터 게이트웨이 인터페이스 메소드에서 프로젝트 리액터를 사용할 수 있게 지원한다. Mono를 리턴 타입으로 사용하면 된다. 내부 AsyncInvocationTaskMono.fromCallable()로 감싸진다.

Mono는 나중에 결과를 조회할 때 사용할 수 있으며 (Future<?>와 유사하다), 또는 게이트웨이로 결과가 반환되면 Consumer를 실행하는 식으로 dispatcher와 함께 사용할 수 있다.

Mono는 프레임워크에 의해 곧바로 플러시되지 않는다. 그렇기 때문에 내부 메시지 플로우는 게이트웨이 메소드가 반환되기 전까진 시작되지 않는다 (Future<?> Executor 태스크에서와 같이). 플로우는 Mono가 구독되면 시작한다. 아니면 subscribe()가 전체 Flux를 구독한 것이었다면, Mono는 리액터 스트림의 일부일 수도 있다 ("Composable"). 다음은 프로젝트 리액터로 게이트웨이를 생성하는 방법을 보여주는 예시다:

@MessagingGateway
public static interface TestGateway {

	@Gateway(requestChannel = "promiseChannel")
	Mono<Integer> multiply(Integer value);

	}

	    ...

	@ServiceActivator(inputChannel = "promiseChannel")
	public Integer multiply(Integer value) {
			return value * 2;
	}

		...

    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(integers -> ...);

프로젝트 리액터를 사용하는 또 다른 예시로 아래 있는 간단한 콜백 시나리오를 들 수 있다:

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

호출 스레드는 계속 이어지며, 플로우가 완료되면 handleInvoice()를 호출한다.

Downstream Flows Returning an Asynchronous Type

위에서 ListenableFuture를 설명하며 언급했듯이, 특정 다운스트림 구성 요소가 비동기 페이로드를 가진 (Future, Mono 등) 메시지를 반환하게 만들고 싶다면, 반드시 비동기 executor를 null로 명시해줘야 한다 (XML 설정을 사용하는 경우 ""). 그러면 호출자 스레드에서 플로우를 실행하며, 그 결과는 나중에 조회할 수 있다.

void Return Type

앞에서 언급했던 리턴 타입들과는 달리, 메소드 리턴 타입이 void인 경우엔, 호출자 스레드는 즉시 반환하고 비동기로 다운스트림 플로우를 실행하는 것을 의도한 건지를 프레임워크가 판단하기 어렵다. 이런 경우엔 반드시 아래 예제처럼 인터페이스 메소드 위에 @Async 어노테이션을 선언해줘야 한다:

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

Future<?> 타입을 반환할 때와는 달리, 특정 커스텀 TaskExecutor@Async 어노테이션과 연결해주지 않고서는 플로우에서 예외를 던졌는지를 호출자에게 알릴 수 있는 방법이 없다.

10.4.12. Gateway Behavior When No response Arrives

앞에서 언급한 대로 게이트웨이를 이용하면 간편하게 POJO 메소드를 실행해서 메시징 시스템과 상호 작용할 수 있다. 하지만 그냥 일반적인 메소드를 실행하면 언제나 반환되는 반면 (예외가 발생하더라도), 메시지를 교환하는 상황은 일반적인 메소드 실행과 항상 동일하게 흘러가진 않는다 (예를 들면, 응답 메시지가 도착하지 않을 수도 있다 — 이는 반환되지 않는 메소드와 동일하다).

여기서부터는 다양한 시나리오들과 함께 게이트웨이를 어떻게 하면 좀 더 예측 가능한 방향으로 동작시킬 수 있는지를 다룬다. 동기식으로 동작하는 게이트웨이는 특정 속성들을 이용해 예측이 가능한 방향으로 구성할 수 있지만, 속성에 따라 예상대로 동작하지 않는 상황도 벌어진다. 그 중 하나는 reply-timeout이다 (메소드 레벨. 게이트웨이 레벨에선 default-reply-timeout). 여기서는 다양한 시나리오를 통해 reply-timeout 속성이 동기식 게이트웨이의 동작에 영향을 줄 수 있는 것과 없는 것들에 대해 알아본다. 싱글 스레드 시나리오(다운스트림의 모든 구성 요소가 direct 채널을 통해 연결된다)와 멀티 스레드 시나리오(예를 들어, 다운스트림 어딘가에 싱글 스레드의 범위를 벗어나는 pollable 채널이나 executor 채널이 있을 수 있다)를 나눠서 생각해본다.

다운스트림 프로세스가 오랜 시간 실행 중일 때

다운스트림 구성 요소가 ‘null’을 반환할 때

다운스트림 구성 요소의 반환 타입은 ‘void’인데 게이트웨이 메소드 시그니처는 void가 아닐 때

다운스트림 구성 요소가 Runtime Exception으로 끝났을 때

reply-timeout은 디폴트가 무한unbounded이라는 점을 알아두는 게 좋다. 그렇기 때문에 reply-timeout을 명시하지 않으면 게이트웨이 메소드 실행부는 무기한으로 멈출 수도 있다hang. 따라서 이런 시나리오 중 하나라도 해당될 가능성이 희박하더라도, 플로우를 분석해서 reply-timeout 속성을 "안전한" 값으로 설정해놔야 한다. 다운스트림 구성 요소의 requires-reply 속성을 'true'로 설정하면 더 좋은데, 다운스트림 구성 요소가 내부적으로 null을 반환하면 즉시 예외를 던지기 때문에 응답을 보장할 수 있다. 하지만 reply-timeout이 아무런 소용이 없는 상황도 있다는 것도 함께 인지해야 한다 (첫 번째 시나리오 참고). 즉, 메시지 플로우를 분석해서 언제 비동기 게이트웨이가 아닌 동기식 게이트웨이를 사용할지를 결정하는 것 또한 중요한 일이다. 앞에서 설명한 것처럼 비동기 게이트웨이는 간단히 말하면 Future 인스턴스를 반환하는 게이트웨이 메소드를 정의하는 일이다. 이렇게 하면 값을 반환한다는 것을 보장받을 수 있으며, 실행 결과를 가지고 좀 더 세부적인 컨트롤이 가능하다. 또한 라우터를 다룰 때는 resolution-required 속성을 'true'로 설정하면 라우터가 특정 채널을 리졸브할 수 없는 경우엔 예외가 발생한다는 점을 기억해둬야 한다. 마찬가지로 필터를 다룰 땐 throw-exception-on-rejection 속성을 설정할 수 있다. 둘 모두 requires-reply 속성을 가진 서비스 activator가 있는 것처럼 동작하게 된다. 다른 말로 하면, 게이트웨이 메소드를 실행할 때 응답 받는 것을 보장하는 데 도움이 된다.

<gateway/> 요소에서 reply-timeout은 무한이다unbounded (GatewayProxyFactoryBean에서 만들어진다). 외부와의 통합을 위한 인바운드 게이트웨이(WS, HTTP 등)는 이런 게이트웨이들과 공통된 특징과 속성이 많이 있다. 하지만 이런 인바운드 게이트웨이의 경우 디폴트 reply-timeout은 1000 밀리세컨드다 (1초). 다운스트림에서 비동기 처리를 다른 스레드가 이어받는 경우, 게이트웨이 타임아웃이 발생하기 전에 플로우가 완료될 수 있도록 이 속성을 늘려야 할 수 있다.

게이트웨이로 스레드가 반환될 때 타이머가 시작된다는 것을 이해해야 한다. 즉, 플로우가 완료되거나 메시지가 다른 스레드로 전달될 때 말이다. 이 시점부터 호출 스레드는 응답을 기다리기 시작한다. 플로우가 완전히 동기식으로 동작한다면 호출 스레드에서 즉시 응답을 사용하면 된다. 비동기 플로우의 경우 스레드는 이 시간만큼 대기한다.

IntegrationFlows를 이용해 게이트웨이를 정의하는 방법은 Java DSL 챕터에 있는 IntegrationFlow as Gateway를 참고해라.


10.5. Service Activator

서비스 activator는 스프링이 관리하는 객체가 서비스의 역할을 수행할 수 있도록 입력 채널에 연결해주는 엔드포인트다. 서비스가 출력을 생성하는 경우엔 출력 채널에도 연결할 수 있다. 출력이 있는 서비스가 처리 파이프라인(또는 메시지 플로우) 상 맨끝에 위치할 수도 있는데, 이 경우엔 인바운드 메시지의 replyChannel 헤더를 활용할 수 있다. 출력 채널이 정의되지 않았을 때의 기본 동작이기도 하다. 여기에서 설명하는 설정 옵션들은 대부분 다른 구성 요소에서도 동일하게 동작한다.

10.5.1. Configuring Service Activator

서비스 activator를 생성할 땐 다음과 같이 ‘service-activator’ 요소와 ‘input-channel’, ‘ref’ 속성을 이용하면 된다:

<int:service-activator input-channel="exampleChannel" ref="exampleHandler"/>

위 설정에선 exampleHandler의 메소드들 중, 아래와 같은 요구 사항 중 하나를 충족하는 메소드들을 전부 선별한다:

호출할 타겟 메소드는 런타임에 각 요청 메시지에 있는 payload 타입에 따라 선택하거나, 타겟 클래스에 Message<?> 타입을 가진 메소드가 있다면 이 메소드로 폴백하기도 한다.

5.0 버전부터 서비스 메소드 하나를 @org.springframework.integration.annotation.Default로 마킹하면 매칭되는 케이스가 없을 때 폴백으로 사용할 수 있다. 컨텐츠 타입 변환을 진행한 후 타겟 메소드를 실행할 때 활용하기 좋다.

원하는 객체의 메소드를 직접 지정하고 싶다면 다음과 같이 method 속성을 추가해주면 된다:

<int:service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/>

두 케이스 모두 서비스 메소드가 null이 아닌 값을 반환하면, 서비스 activator는 적절한 응답 채널로 응답 메시지를 전송해본다. 응답 채널을 결정할 땐 가장 먼저 다음과 같이 엔드포인트 설정에 output-channel을 제공했는지를 확인한다:

<int:service-activator input-channel="exampleChannel" output-channel="replyChannel"
                       ref="somePojo" method="someMethod"/>

메소드는 결과를 반환하는데 output-channel이 정의되지 않았다면, 그땐 요청 메시지에 있는 replyChannel 헤더를 확인해본다. 이 헤더에 값이 들어있으면 그 타입을 체크해서, MessageChannel 타입이라면 그 채널로 응답 메시지를 전송한다. String 타입인 경우 서비스 activator는 해당 채널명을 채널 인스턴스로 리졸브해본다. 채널을 리졸브할 수 없다면 DestinationResolutionException을 던지고, 리졸브할 수 있다면 리졸브한 채널로 메시지를 전송한다. 요청 메시지에 replyChannel 헤더가 담겨있지 않고 reply 객체가 Message였다면 이 메시지의 replyChannel 헤더를 참조해서 다음 행선지를 결정한다. 이 테크닉은 Spring Integration에서 request-reply 메시지를 처리할 때도 사용하고 있으며, return address 패턴의 한 예시이기도 하다.

메소드가 결과를 반환하는데, 반환 결과를 폐기discard하고 플로우를 종료하고 싶다면, output-channelNullChannel로 설정해야 한다. NullChannelnullChannel이라는 이름으로 하나가 자동 등록된다. 자세한 내용은 특별한 채널들을 참고해라.

서비스 activator는 응답 메시지를 생성하지 않아도 되는 구성 요소 중 하나다. 메소드가 null을 반환하거나 리턴 타입이 void인 경우 서비스 activator는 메소드를 실행한 후 별도 신호 없이 종료한다. 이 동작은 AbstractReplyProducingMessageHandler.requiresReply 옵션으로 변경할 수 있으며, XML 네임스페이스를 이용할 땐 requires-reply로 설정해주면 된다. 이 플래그를 true로 설정했을 때 메소드가 null을 반환하면 ReplyRequiredException을 던진다.

서비스 메소드의 인자는 메시지일 수도, 임의의 타입일 수도 있다. 임의의 타입일 땐 메시지 페이로드인 것으로 가정하며, 메시지에서 페이로드를 추출해서 서비스 메소드에 주입해준다. 이렇게 하면 POJO 모델을 따를 수 있으므로, Spring Integration을 이용할 땐 일반적으로 임의의 타입을 사용하는 것을 권장한다. 어노테이션 지원에서 설명하는 것처럼 인자들은 @Header@Headers 어노테이션을 선언할 수도 있다.

서비스 메소드는 인자가 없어도 되기 때문에, 이벤트 스타일의 서비스 액티베이터를 구현할 수 있으며 (여기선 오로지 서비스 메소드를 실행하는 것에만 관심을 둔다) 메시지의 내용은 신경쓰지 않아도 된다. 메시지가 null JMS 메시지라고 생각해보자. 예를 들면 입력 채널에 보관된 메시지들을 모니터링하거나 간단한 카운터를 구현할 수 있다.

아래 보이는 것 처럼 4.1 버전부터는 메시지 프로퍼티(payload, headers)를 받는 POJO 메소드 파라미터에 Java 8 Optional을 사용해도 문제 없이 변환해준다:

public class MyBean {
    public String computeValue(Optional<String> payload,
               @Header(value="foo", required=false) String foo1,
               @Header(value="foo") Optional<String> foo2) {
        if (payload.isPresent()) {
            String value = payload.get();
            ...
        }
        else {
           ...
       }
    }

}

커스텀 서비스 activator 핸들러 구현체를 다른 <service-activator> 정의에서 재사용할 수 있다면 보통은 ref 속성을 사용하는 게 좋다. 하지만 하나의 <service-activator> 정의 안에서만 사용한다면, 다음과 같이 내부 빈 정의를 제공할 수도 있다:

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="someMethod">
    <beans:bean class="org.something.ExampleServiceActivator"/>
</int:service-activator>

동일한 <service-activator> 설정에서 ref 속성과 내부 핸들러 정의를 모두 사용하는 것은 허용하지 않는다. 둘 다 사용하면 조건이 모호해져 예외가 발생한다.

ref 속성에서 AbstractMessageProducingHandler를 상속한 빈을 참조하는 경우 (프레임워크 자체에서 제공하는 핸들러 등), 이 설정은 출력 채널을 핸들러에 직접 주입하는 식으로 최적화된다. 이때는 각 ref 속성마다 별도 빈 인스턴스(또는 prototype 스코프 빈)를 참조하거나, 내부 <bean/> 설정을 이용해야 한다. 무심코 여러 빈에서 동일한 메시지 핸들러를 참조한다면 설정 예외를 만나게될 거다.

Service Activators and the Spring Expression Language (SpEL)

Spring Integration 2.0 이후로는 서비스 activator에서 SpEL 역시 활용할 수 있다.

예를 들면, 다음과 같이 ref 속성에서 빈을 가리키거나 내부 빈 정의를 추가하지 않아도 원하는 빈의 메소드를 호출할 수 있다:

<int:service-activator input-channel="in" output-channel="out"
	expression="@accountService.processAccount(payload, headers.accountId)"/>

	<bean id="accountService" class="thing1.thing2.Account"/>

위 설정에선 ref 속성이나 내부 빈을 이용해 ‘accountService’를 주입하지 않고, SpEL의 @beanId 표기법을 통해 메시지 페이로드와 호환되는 타입을 하나 받는 메소드를 실행한다. 이땐 헤더 값도 함께 전달한다. 유효한 SpEL 표현식이라면 메시지에 있는 어떤 내용이든지 사용해서 평가할 수 있다. 아래와 같이 전체 로직을 하나의 표현식으로 캡슐화할 수 있다면 서비스 activator는 빈을 참조하지 않아도 된다:

<int:service-activator input-channel="in" output-channel="out" expression="payload * 2"/>

위 설정에선 페이로드 값에 2를 곱하는 것이 서비스 로직이다. SpEL을 사용하면 이런 로직은 쉽게 처리할 수 있다.

서비스 activator 설정에 관한 좀 더 자세한 내용은 Java DSL 챕터에 있는 서비스 액티베이터와 .handle() 메소드를 참고해라.

10.5.2. Asynchronous Service Activator

서비스 activator는 호출 스레드에서 실행된다. 입력 채널이 SubscribableChannel이라면 업스트림 스레드를, PollableChannel이라면 폴러 스레드를 뜻한다. 서비스가 ListenableFuture<?>를 반환하면 출력(또는 응답) 채널로 보내는 메시지의 페이로드에 이를 담아 보내는 게 기본 동작이다. 4.3 버전부터는 async 속성을 true로 설정할 수 있다 (자바 설정에선 setAsync(true)). async 속성을 true로 설정하면, 서비스가 ListenableFuture<?>를 반환했을 때 호출 스레드는 즉시 release되고 응답 메시지는 future를 완료하는 스레드로 전달된다. 이는 PollableChannel을 사용하는, 오랫동안 실행되는 서비스에서 특히 유리하다. 폴러 스레드를 release하면 프레임워크 내에서 다른 서비스를 수행할 수 있기 때문이다.

서비스에서 future를 Exception으로 완료하면 일반적인 에러 처리 프로세스를 진행한다. errorChannel 헤더가 있다면 이곳으로 ErrorMessage를 전송하고, 그 외는 디폴트 errorChannel(가능한 경우)로 ErrorMessage를 전송한다.

10.5.3. Service Activator and Method Return Type

서비스 메소드는 어떤 타입이든지 반환할 수 있으며, 반환 값은 응답 메시지 페이로드가 된다. 이 경우 새로운 Message<?> 객체가 만들어지며, 요청 메시지에 있는 헤더들이 전부 복사된다. 대부분의 Spring Integration MessageHandler 구현체들이 POJO 메소드를 통해 상호 작용할 땐 이런 방식으로 동작한다.

서비스 메소드에서 Message<?> 객체를 완성해서 반환할 수도 있다. 하지만 트랜스포머와 달리 Service Activator에선, 요청 메시지엔 있던 헤더가 반환된 메시지엔 없다면 요청 메시지의 헤더를 복사해와서 메시지를 수정한다. 따라서 메소드 파라미터가 Message<?>인 서비스 메소드에서 기존 헤더의 전부가 아닌 일부만 복사하더라도, 응답 메시지엔 전부 다시 나타나게 될 거다. 응답 메시지에서 헤더를 제거하는 일은 Service Activator의 책임이 아니며, 느슨한 결합이라는 원칙을 추구한다면 통합 플로우에 HeaderFilter를 추가해주는 게 좋다. 아니면 Service Activator대신 Transformer를 사용해도 좋지만, 이 경우 Message<?> 전체를 반환한다면 요청 메시지의 헤더를 복사하는 일(필요한 경우)을 포함해서, 메시지에 대한 모든 책임은 메소드에 있다. 프레임워크의 중요 헤더가 있다면 (e.g. replyChannel, errorChannel) 반드시 보존해줘야 한다.


10.6. Delayer

delayer는 메시지 플로우를 정해진 간격으로 지연시키는 간단한 엔드포인트다. 메시지가 지연될 땐 기존 sender를 블로킹하지 않는다. 그보단 지연된 메시지는 org.springframework.scheduling.TaskScheduler 인스턴스로 스케줄링되어 지연 시간이 지난 이후 출력 채널로 전송된다. 이 처리 방식은 sender 스레드를 대량 블로킹하는 것이 아니기 때문에, 지연 시간을 꽤 길게도 확장할 수 있다. 반면, 일반적인 케이스에서 메시지를 실제로 놓아줄release 땐 스레드 풀을 사용한다. 이 섹션에선 delayer를 설정하는 예제를 몇 가지 다뤄본다.

10.6.1. Configuring a Delayer

<delayer> 요소는 두 메시지 채널 사이의 메시지 플로우를 지연시킬 때 사용한다. 다른 엔드포인트들과 마찬가지로 ‘input-channel’과 ‘output-channel’ 속성을 제공할 수 있지만, delayer는 각 메시지를 지연시킬 시간(밀리세컨드)을 결정하는 ‘default-delay’와 ‘expression’ 속성(그리고 ‘expression’ 요소)도 가지고 있다. 다음은 모든 메시지를 3초씩 지연시키는 예시다:

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

메시지마다 지연 시간을 각자 결정해야 할 때는 다음과 같이 ‘expression’ 속성을 이용해 SpEL 표현식을 지정할 수도 있다:

Java DSL Kotlin DSL Java XML
@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from("input")
            .delay("delayer.messageGroupId", d -> d
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}
@Bean
fun flow() =
    integrationFlow("input") {
        delay("delayer.messageGroupId") {
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

위 예시에서 설정한 3초라는 시간은 주어진 인바운드 메시지에서 표현식이 null로 평가될 때에만 적용된다. 표현식을 평가한 결과가 유효한 메시지에만 지연 시간을 적용하려면 ‘default-delay’를 0(디폴트)으로 설정하면 된다. 지연 시간이 0(또는 그 이하)인 메시지는 호출 스레드에서 즉시 메시지를 전송한다.

XML 파서는 메시지 그룹 ID(<beanName>.messageGroupId)를 사용한다.

delay 핸들러에선 표현식으로 밀리세컨드 단위의 시간 간격을 나타낼 수 있으며 (toString() 메소드의 실행 결과를 Long으로 파싱할 수 있는 Object), 절대 시간을 나타내는 java.util.Date 인스턴스를 만들어도 된다. 첫 번째 케이스에선 현재 시간을 기준으로 밀리세컨드를 계산한다 (예를 들어 5000으로 평가된다면, delayer가 메시지를 수신한 시간으로부터 최소 5초 동안 메시지를 지연시킨다). Date 인스턴스를 사용하면 해당 Date 객체가 나타내는 시간이 될 때까지 메시지를 놓아주지release 않는다. 지연 시간이 양수가 아니거나 과거 날짜에 해당하는 값이라면 지연이 발생하지 않는다. 그대신 기존 sender의 스레드에서 출력 채널로 곧장 전송된다. 표현식을 평가한 결과가 Date도 아니고 Long으로 파싱할 수도 없는 경우엔 디폴트 지연 시간(있다면 — 디폴트는 0)을 적용한다.

표현식을 평가하는 중엔 잘못된 표현식을 사용하는 등의 이유로 evaluation exception이 발생할 수 있다. 이런 예외들은 기본적으로 무시되며 (DEBUG 레벨로 로그를 남기기는 한다), delayer는 디폴트 지연 시간(있다면)으로 폴백한다. 이 동작은 ignore-expression-failures 속성을 설정하면 수정할 수 있다. 이 속성은 기본적으로 true로 설정되며, delayer는 지금 설명한 그대로 동작한다. 반대로 evaluation exception을 무시하는 대신 delayer의 호출부로 던지고 싶다면 ignore-expression-failures 속성을 false로 설정해라.

위 예제에선 delay 표현식을 headers['delay']로 지정했다. 이 구문은 Map 요소에 액세스하기 위한 SpEL Indexer 구문이다 (MessageHeadersMap을 구현하고 있다). 이 구문은 headers.get("delay")를 호출한다. 맵 요소의 이름이 간단할 땐 (‘.’이 들어있지 않을 땐) SpEL “dot accessor” 구문도 사용할 수 있다. 그러면 이 헤더 표현식은 headers.delay로 명시할 수 있다. 하지만 헤더가 누락됐을 때의 결과는 달라진다. 첫 번째 케이스에서 표현식은 null로 평가된다. 두 번째 케이스에선 다음과 유사한 결과를 낳게 된다:

org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

결과적으로, 헤더가 생략될 가능성이 있고 디폴트 지연 시간으로 폴백하고 싶다면, 예외를 catch하는 것보단 null을 감지하는 것이 더 빠르기 때문에, dot property accessor 구문 보단 indexer 구문을 사용하는 것이 일반적으로 더 효율적이다 (권장하는 방법이기도 하다).

delayer는 동작을 스프링 TaskScheduler 인터페이스의 인스턴스에 위임한다. delayer에서 사용하는 디폴트 스케줄러는 Spring Integration에서 기동 시 제공하는 ThreadPoolTaskScheduler 인스턴스다. 자세한 내용은 태스크 스케줄러 설정하기를 참고해라. 다른 스케줄러에 위임하고 싶다면, 다음과 같이 delayer 요소의 ‘scheduler’ 속성을 통해 참조를 제공하면 된다:

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>

외부 ThreadPoolTaskScheduler를 설정한다면 스케줄러 프로퍼티에 waitForTasksToCompleteOnShutdown = true를 설정할 수 있다. 이 설정을 이용하면 애플리케이션을 종료할 때 이미 실행 상태에 있는 ‘delay’ 태스크를 완전하게 완료할 수 있다 (메시지를 놓아주는release 등). Spring Integration 2.2 이전에는 DelayHandler가 백그라운드에서 자체 스케줄러를 생성할 수 있었기 때문에 <delayer> 요소에서 이 프로퍼티를 사용할 수 있었다. 2.2부터 delayer는 외부 스케줄러 인스턴스가 필요하며 waitForTasksToCompleteOnShutdown은 제거됐다. 이제 스케줄러의 자체 설정을 사용해야 한다.

ThreadPoolTaskSchedulererrorHandler라는 속성이 있는데, 이 속성엔 org.springframework.util.ErrorHandler의 구현체를 주입할 수 있다. 이 핸들러를 이용하면 지연된 메시지를 전송하는 예약 태스크 스레드에서 발생하는 Exception을 처리할 수 있다. 기본적으론 org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler를 사용하며, 로그를 통해 스택 트레이스를 확인할 수 있다. org.springframework.integration.channel.MessagePublishingErrorHandler 사용을 검토해보는 것도 좋다. 이 구현체는 ErrorMessage를 실패한 메시지의 헤더에 있는 error-channel이나 디폴트 error-channel로 전송해준다. 이때 에러 처리는 트랜잭션을 (있다면) 롤백한 이후에 진행한다. 릴리즈 실패를 참고해라.

10.6.2. Delayer and a Message Store

DelayHandler는 설정에 있는 MessageStore의 메시지 그룹에 지연된 메시지를 보관한다. (‘groupId’는 <delayer> 요소의 필수 속성 ‘id’를 기반으로 만들어진다.) 예약 태스크에선 DelayHandleroutput-channel로 메시지를 전송하기 직전에 MessageStore에서 지연된 메시지를 제거한다. 설정한 MessageStore가 영구적인persistent 스토어라면 (ex. JdbcMessageStore), 애플리케이션이 종료해도 메시지를 잃어버리지 않는다. DelayHandler는 애플리케이션이 기동되면 MessageStore의 메시지 그룹에서 메시지를 읽어와, 메시지의 기존 도착 시간을 기반으로 delay 태스크를 다시 예약한다 (delay가 숫자일 때). delay 헤더가 Date인 메시지에선 이 Date를 사용해 다시 스케줄링한다. 지연된 메시지가 ‘delay’ 값보다 더 오래 MessageStore에 남아 있었다면 기동 직후에 전송한다.

<delayer>는 두 가지 요소 <transactional><advice-chain> 중 하나로 보강할 수 있다 (이 둘은 함께 사용할 수 없다). 이 AOP 어드바이스의 ListDelayHandler.ReleaseMessageHandler로 적용돼 내부에서 프록시 처리된다. 이 핸들러는 예약된 태스크의 Thread에서 지연 시간이 지난 이후 메시지를 릴리즈하는 일을 담당한다. 예를 들면 다운스트림 메시지 플로우에서 예외가 발생해 ReleaseMessageHandler의 트랜잭션을 롤백하는 상황 등에 활용된다. 이 경우 지연된 메시지는 영구persistent MessageStore에 남게된다. <advice-chain> 안에선 원하는 커스텀 org.aopalliance.aop.Advice 구현체도 사용할 수 있다. <transactional> 요소는 트랜잭션 어드바이스만 가지고 있는 간단한 어드바이스 체인을 정의한다. 다음은 <delayer> 안에서 advice-chain을 사용하는 예제다:

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

DelayHandler는 관리하는 오퍼레이션(getDelayedMessageCount, reschedulePersistedMessages)을 통해 JMX MBean으로 익스포트할 수 있다. 이를 통해 보관 중인persisted 지연 메시지를 런타임에 다시 스케줄링할 수 있다 (예를 들어 TaskScheduler가 이전에 중단된 상태인 경우). 이런 류의 오퍼레이션은 다음과 같이 Control Bus 명령을 통해 실행할 수 있다:

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);

메시지 스토어와 JMX, 컨트롤 버스에 관한 자세한 내용은 시스템 관리를 참고해라.

5.3.7 버전부터 메시지를 MessageStore에 저장할 때 트랜잭션이 활성화된 상태라면, TransactionSynchronization.afterCommit() 콜백에서 릴리즈 태스크를 예약한다. 이렇게 하는 이유는 트랜잭션이 커밋되기 전에 예약된 릴리즈 태스크가 실행돼 메시지를 찾을 수 없는 경합 상태에 놓일 수 있기 때문이다. 이 경우 메시지는 지연 시간이 지난 이후나 트랜잭션 커밋 이후 중 더 늦은 시점에 릴리즈된다.

10.6.3. Release Failures

5.0.8 버전부터 delayer엔 두 가지 프로퍼티가 새로 생겼다:

메시지를 놓아줄release 땐, 다운스트림 플로우가 실패했다면 retryDelay가 지난 이후에 릴리즈를 시도한다. maxAttempts에 도달하면 메시지를 폐기한다 (트랜잭션 내에서 릴리즈를 진행하는 경우만 아니면. 트랜잭션을 적용하면 메시지는 스토어에 남아 있지만, 애플리케이션을 재시작하거나 reschedulePersistedMessages() 메소드를 실행하기 전엔 더 이상 릴리즈 태스크를 예약하지 않는다).

게다가 delayedMessageErrorChannel을 설정할 수 있는데, 릴리즈에 실패하면 이 채널로 ErrorMessage를 전달한다. ErrorMessage의 페이로드엔 예외가 담기며, originalMessage란 프로퍼티를 가지고 있다. ErrorMessage에는 현재 카운트가 담겨있는 IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT 헤더가 들어 있다.

에러 플로우에서 에러 메시지를 컨슘하고 정상적으로 종료되면 별도로 다른 조치를 취하지 않는다. 트랜잭션 내에서 릴리즈를 진행한다면, 트랜잭션은 커밋되고 스토어에 있는 메시지를 삭제한다. 에러 플로우에서 예외가 발생하면 위에서 설명한 대로 최대 maxAttempts까지 릴리즈를 재시도한다.


10.7. Scripting Support

Spring Integration 2.1부터 Java 6에서 도입된 자바 사양을 위한 JSR223 스크립팅을 지원하기 시작했다. 따라서 다양한 통합 구성 요소들의 로직을, 지원 언어(Ruby, JRuby, Groovy, Kotlin 포함)로 작성한 스크립트로 제공할 수 있다. Spring Integration에서 SpELSpring Expression Language을 사용하는 방식과 유사하다. JSR223에 대한 자세한 내용은 이 문서를 읽어봐라.

Java 11부터 Nashorn JavaScript 엔진은 deprecated되었으며, Java 15에선 제거될 가능성이 있다. 지금부터는 다른 스크립팅 언어를 이용하는 쪽을 검토해보는 것이 좋다.

먼저 프로젝트에 아래 의존성을 추가해야 한다:

Maven Gradle
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-scripting</artifactId>
    <version>5.5.15</version>
</dependency>
compile "org.springframework.integration:spring-integration-scripting:5.5.15"

추가로, 스크립트 엔진 구현체를 추가해야 한다 (e.g. JRuby, Jython).

Spring Integration은 5.2 버전부터 Kotlin Jsr223을 지원한다. 이를 사용하려면 프로젝트에 다음 의존성을 추가해야 한다:

Maven Gradle
<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-script-util</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-compiler-embeddable</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-scripting-compiler-embeddable</artifactId>
    <scope>runtime</scope>
</dependency>
runtime 'org.jetbrains.kotlin:kotlin-script-util'
runtime 'org.jetbrains.kotlin:kotlin-compiler-embeddable'
runtime 'org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable'

언어를 직접 kotlin으로 설정하거나, 확장자가 .kts인 스크립트 파일이 있으면 KotlinScriptExecutor를 사용한다.

JVM 스크립팅 언어를 사용하려면 반드시 클래스패스에 해당 언어에 대한 JSR223 구현체가 있어야 한다. GroovyJRuby 프로젝트는 표준 배포판에서 JSR233을 지원하고 있다.

써드 파티에서 구현한 JSR223 언어 구현체도 다양하다. Spring Integration과 특정 구현체와의 호환성은 해당 구현체를 개발한 개발자가 사양을 어떻게 이해하고 얼마나 잘 따랐는지에 따라 갈린다.

스크립팅 언어로 Groovy를 사용할 계획이라면, Groovy에 특화된 기능들을 추가로 제공하는 Spring-Integration의 Groovy 지원 모듈을 사용하는 게 좋다. 물론 현재 섹션에서도 관련 내용을 다루고 있다.

10.7.1. Script Configuration

가지고 있는 통합 요구 사항이 얼마나 복잡한지에 따라, XML 설정의 CDATA를 이용해 인라인으로 스크립트를 추가하거나, 스크립트가 들어있는 스프링 리소스를 참조시키면 된다. Spring Integration은 스크립팅 지원을 위해 ScriptExecutingMessageProcessor를 정의하는데, 이 클래스는 메시지 페이로드를 payload라는 변수에 바인딩하고 메시지 헤더를 headers라는 변수에 바인딩한다. 두 변수 모두 스크립트 실행 컨텍스트 내에서 접근할 수 있다. 개발자는 이 변수들을 사용해서 스크립트를 작성하기만 하면 된다. 아래 두 설정은 필터를 생성하는 예시다:

Java DSL XML
@Bean
public IntegrationFlow scriptFilter() {
    return f -> f.filter(Scripts.processor("some/path/to/ruby/script/RubyFilterTests.rb"));
}
...
@Bean
public Resource scriptResource() {
	return new ByteArrayResource("headers.type == 'good'".getBytes());
}

@Bean
public IntegrationFlow scriptFilter() {
	return f -> f.filter(Scripts.processor(scriptResource()).lang("groovy"));
}
<int:filter input-channel="referencedScriptInput">
   <int-script:script location="some/path/to/ruby/script/RubyFilterTests.rb"/>
</int:filter>

<int:filter input-channel="inlineScriptInput">
     <int-script:script lang="groovy">
     <![CDATA[
     return payload == 'good'
   ]]>
  </int-script:script>
</int:filter>

위 예제에서 확인할 수 있듯이, 스크립트는 인라인으로 포함시킬 수도 있고, 리소스 위치를 참조시킬 수도 있다 (location 속성을 이용해서). 참고로, lang은 언어 이름(혹은 JSR223 alias)을 나타내는 속성이다.

스크립팅을 지원하는 또 다른 Spring Integration 엔드포인트로는 router, service-activator, transformer, splitter가 있다. 이 엔드포인트들의 스크립팅 설정도 위와 동일하다 (endpoint 요소는 제외하고).

스크립팅과 관련해서는 또 하나 유용한 기능을 제공하는데, 애플리케이션 컨텍스트를 재시작하지 않아도 스크립트를 업데이트(reload)할 수 있다. 그러려면 다음과 같이 script 요소에 refresh-check-delay 속성을 명시하면 된다:

Java DSL XML
Scripts.processor(...).refreshCheckDelay(5000)
}
<int-script:script location="..." refresh-check-delay="5000"/>

위 예시에선 5초 간격으로 스크립트가 있는 곳에서 업데이트 내역을 확인한다. 스크립트가 업데이트되었다면, 업데이트 이후 5초가 지난 이후엔 전부 새 스크립트를 실행하게 된다.

다음 예제를 살펴보자:

Java DSL XML
Scripts.processor(...).refreshCheckDelay(0)
}
<int-script:script location="..." refresh-check-delay="0"/>

위 예제에선 스크립트가 수정되는 즉시 컨텍스트를 업데이트한다. 이 트릭을 이용해 손쉽게 ‘실시간’ 동기화를 설정할 수 있다. 음수 값은 전부 애플리케이션 컨텍스트를 초기화한 이후엔 스크립트를 다시 로드하지 않는다는 걸 의미하며, 이 동작이 디폴트 동작이다. 다음은 절대 업데이트되지 않는 스크립트 예시다:

Java DSL XML
Scripts.processor(...).refreshCheckDelay(-1)
}
<int-script:script location="..." refresh-check-delay="-1"/>

인라인 스크립트는 다시 로드할 수 없다.

Script Variable Bindings

외부에서 스크립트의 실행 컨텍스트에 제공한 변수들을 스크립트에서 참조할 땐 변수 바인딩을 이용한다. 기본적으로 사용할 수 있는 바인딩 변수는 payloadheaders가 있다. 스크립트에 다른 변수도 함께 바인딩하고 싶다면 다음 예제와 같이 <variable> 요소(또는 ScriptSpec.variables() 옵션)를 사용하면 된다:

Java DSL XML
Scripts.processor("foo/bar/MyScript.py")
    .variables(Map.of("var1", "thing1", "var2", "thing2", "date", date))
}
<script:script lang="py" location="foo/bar/MyScript.py">
    <script:variable name="var1" value="thing1"/>
    <script:variable name="var2" value="thing2"/>
    <script:variable name="date" ref="date"/>
</script:script>

위 예제에 보이는 것처럼, 스크립트 변수는 스칼라 값이나 스프링 빈 참조에 바인딩할 수 있다. 참고로, 따로 명시하진 않았지만 payloadheaders 역시 바인딩 변수로 사용할 수 있다.

Spring Integration 3.0에선 variable 요소와 더불어 variables 속성도 도입했다. 이 속성과 variable 요소는 상호 배타적이지 않아서 하나의 script 구성 요소 내에서 함께 조합해 쓸 수 있다. 하지만 변수는 정의된 위치에 관계없이 반드시 고유해야 한다. 추가로, Spring Integration 3.0부터는 다음과 같이 인라인 스크립트에도 변수 바인딩을 사용할 수 있다:

<service-activator input-channel="input">
    <script:script lang="ruby" variables="thing1=THING1, date-ref=dateBean">
        <script:variable name="thing2" ref="thing2Bean"/>
        <script:variable name="thing3" value="thing2"/>
        <![CDATA[
            payload.foo = thing1
            payload.date = date
            payload.bar = thing2
            payload.baz = thing3
            payload
        ]]>
    </script:script>
</service-activator>

위 예제에선 인라인 스크립트, variable 요소, variables 속성을 조합해서 사용하고 있다. variables 속성은 값을 콤마로 구분해서 지정하며, 각 세그먼트는 변수와 그 값을 ‘=’로 구분해 나타낸다. 변수명은 위 예시의 date-ref와 같이 -ref를 suffix로 사용할 수 있다. 바인딩 변수의 이름은 date이지만, 애플리케이션 컨텍스트의 dateBean 빈 참조 값을 사용한다는 의미로 suffix를 사용했다. 프로퍼티 플레이스홀더 설정이나 커맨드라인 인자를 사용할 때 유용할 거다.

변수를 생성하는 방법을 하나하나 제어하고 싶다면, 자바 클래스를 구현해서 아래 인터페이스에서 정의하는 ScriptVariableGenerator 전략을 사용하면 된다:

public interface ScriptVariableGenerator {

    Map<String, Object> generateScriptVariables(Message<?> message);

}

이 인터페이스를 사용하려면 generateScriptVariables(Message) 메소드를 구현해야 한다. message 인자를 이용해 메시지 페이로드와 헤더에 있는 모든 데이터에 접근할 수 있으며, 바인딩된 변수들의 Map을 반환하면 된다. 메시지로 스크립트를 실행할 때마다 이 메소드를 호출한다. 다음은 ScriptVariableGenerator 구현체를 제공하고 script-variable-generator 속성으로 참조시키는 방법을 보여주는 예시다:

Java DSL XML
Scripts.processor("foo/bar/MyScript.groovy")
    .variableGenerator(new foo.bar.MyScriptVariableGenerator())
}
<int-script:script location="foo/bar/MyScript.groovy"
        script-variable-generator="variableGenerator"/>

<bean id="variableGenerator" class="foo.bar.MyScriptVariableGenerator"/>

script-variable-generator를 제공하지 않으면 스크립트 컴포넌트는 DefaultScriptVariableGenerator를 사용한다. 이 클래스는 generateScriptVariables(Message) 메소드에서 설정한 <variable> 요소들을 전부 Messagepayload, headers 변수와 함께 병합한다.

script-variable-generator 속성과 <variable> 요소를 모두 지정하는 것은 안 된다. 이 둘은 함께 사용할 수 없다.


10.8. Groovy support

Spring Integration 2.0부터는 Groovy를 지원하기 때문에, 다양한 통합 구성 요소들의 로직을 Groovy 스크립팅 언어로 제공할 수 있다. 라우팅, 변환 등 다양한 통합 로직에 SpELSpring Expression Language을 활용하는 방식과 유사하다. Groovy에 대한 자세한 내용은 프로젝트 웹 사이트에 있는 Groovy 문서를 참고해라.

먼저 프로젝트에 아래 의존성을 추가해야 한다:

Maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-groovy</artifactId>
    <version>5.5.15</version>
</dependency>

Gradle

compile "org.springframework.integration:spring-integration-groovy:5.5.15"

10.8.1. Groovy Configuration

Spring Integration 2.1에서 제공하는 Groovy 전용 설정 네임스페이스는 Spring Integration의 스크립팅 지원을 확장한 것으로, 스크립팅 지원 섹션에 상세하게 다룬 핵심 설정과 동작을 공유한다. Groovy 스크립트는 범용 스크립팅 지원 모듈을 이용해도 되지만, Groovy 모듈은 스프링 프레임워크의 org.springframework.scripting.groovy.GroovyScriptFactory와 관련 컴포넌트들을 통해 Groovy 설정 네임스페이스를 따로 제공하며, Groovy 전용 기능들을 사용할 수 있게 해준다. 다음은 두 가지 설정 예시다:

Example 2. Filter

<int:filter input-channel="referencedScriptInput">
   <int-groovy:script location="some/path/to/groovy/file/GroovyFilterTests.groovy"/>
</int:filter>

<int:filter input-channel="inlineScriptInput">
     <int-groovy:script><![CDATA[
     return payload == 'good'
   ]]></int-groovy:script>
</int:filter>

위 예시를 보면 일반 스크립팅 지원을 사용할 때와 거의 동일해 보인다. 유일한 차이점은 네임스페이스 프리픽스 int-groovy를 사용해서, Groovy 네임스페이스를 불러온다는 점이다. 이 네임스페이스에선 <script> 태그 위에 lang 속성을 지정할 수 없다는 점도 함께 알아두자.

10.8.2. Groovy Object Customization

Groovy 객체 자체를 커스텀해야 한다면 (단순히 변수를 세팅하는 것 이상으로), customizer 속성을 이용해 GroovyObjectCustomizer를 구현한 빈을 참조시키면 된다. 예를 들어, MetaClass를 수정해서 스크립트 내에서 사용할 수 있는 함수들을 등록하는 식으로 DSLdomain-specific language을 구현하는 상황 등에 활용할 수 있다. 그 방법은 아래 예시를 참고해라:

<int:service-activator input-channel="groovyChannel">
    <int-groovy:script location="somewhere/SomeScript.groovy" customizer="groovyCustomizer"/>
</int:service-activator>

<beans:bean id="groovyCustomizer" class="org.something.MyGroovyObjectCustomizer"/>

커스텀 GroovyObjectCustomizer<variable> 요소나 script-variable-generator 속성을 사용할 때도 설정할 수 있다. 인라인 스크립트를 정의할 때도 가능하다.

Spring Integration 3.0에선 variable 요소와 함께 조합해 쓸 수 있는 variables 속성을 도입했다. 또한 groovy 스크립트는, 해당 이름을 가진 바인딩 변수를 제공하지 않은 경우 BeanFactory 안에 있는 빈으로 변수를 리졸브하는 기능이 있다. 다음은 변수 하나(entityManager)를 활용하는 예시다:

<int-groovy:script>
    <![CDATA[
        entityManager.persist(payload)
        payload
    ]]>
</int-groovy:script>

애플리케이션 컨텍스트엔 위와 같이 사용할 수 있는 있는 entityManager 빈이 있어야 한다.

<variable> 요소, variables 속성, script-variable-generator 속성에 대한 자세한 내용은 스크립트 변수 바인딩을 참고해라.

10.8.3. Groovy Script Compiler Customization

Groovy 컴파일러의 커스텀 옵션 중 가장 많이 사용하는 건 어노테이션 힌트 @CompileStatic이다. 이 어노테이션은 클래스 레벨이나 메소드 레벨에 사용할 수 있다. 자세한 내용은 Groovy 레퍼런스 매뉴얼에서 @CompileStatic 부분을 찾아 읽어봐라. 간단한 스크립트에서 이 기능을 활용하려면 (통합 시나리오에서) 스크립트를 좀 더 자바 코드에 가깝게 변경해야 한다. 아래 <filter> 스크립트를 생각해보자:

headers.type == 'good'

위 스크립트는 Spring Integration에선 아래 메소드로 바뀐다:

@groovy.transform.CompileStatic
String filter(Map headers) {
	headers.type == 'good'
}

filter(headers)

이렇게 변경하고 나면 filter() 메소드는 getProperty() 팩토리와 CallSite 프록시같은 Groovy의 동적인 호출 단계는 건너뛰고, 정적인 자바 코드로 변환되고 컴파일된다.

4.3 버전부터는 Spring Integration Groovy 구성 요소를 설정할 때 boolean 옵션 compile-static을 사용할 수 있다. 이 옵션은 내부 CompilerConfiguration@CompileStatic을 사용하는 ASTTransformationCustomizer를 추가해준다. 그러면 스크립트 코드에서 @CompileStatic을 이용한 메소드 선언을 생략해도 순수 자바 코드로 컴파일할 수 있다. 위 스크립트는 다음과 같이 더 짧게 바뀌지만, 인터프리터로 실행하는 스크립트에 비하면 약간 더 장황한 편이다:

binding.variables.headers.type == 'good'

@CompileStatic을 사용하면 동적인 GroovyObject.getProperty()를 이용할 수가 없기 때문에 headerspayload에 접근할 땐 (다른 변수들도) 반드시 groovy.lang.Script binding 프로퍼티를 통해야 한다.

이와 함께 빈 참조를 지정할 수 있는 compiler-configuration 속성을 도입했다. 이 속성으로는 ImportCustomizer를 사용하는 등, Groovy 컴파일러를 다양하게 커스텀할 수 있다. 이 기능을 자세히 알아보려면 Groovy 문서에서 고급 컴파일러 설정을 읽어봐라.

compilerConfiguration을 사용하면 @CompileStatic 어노테이션을 위한 ASTTransformationCustomizer가 자동으로 추가되지 않으며, compileStatic 옵션을 재정의하게 된다. CompileStatic도 사용해야 한다면, 커스텀 compilerConfigurationCompilationCustomizersnew ASTTransformationCustomizer(CompileStatic.class)를 직접 추가해줘야 한다.

Groovy 컴파일러를 커스텀해도 refresh-check-delay 옵션엔 영향을 미치지 않으며, 다시 로드할 수 있는 스크립트 역시 정적으로 컴파일할 수 있다.

10.8.4. Control Bus

엔터프라이즈 통합 패턴에서 설명하는 것처럼, 컨트롤 버스에 깔려있는 아이디어는 “애플리케이션 수준”에서 메시지를 처리할 때 사용하는 메시징 시스템을 그대로 활용해서 프레임워크 내에 있는 구성 요소들을 모니터링하고 관리하겠다는 거다. Spring Integration은 앞서 다뤘던 어댑터들을 기반으로 움직이기 때문에, 정의된 작업을 호출하는 수단으로 메시지를 전송할 수 있다. Groovy 스크립트 또한 이런 작업에 해당할 수 있다. 다음은 컨트롤 버스를 위한 Groovy 스크립트를 설정하는 예시다:

<int-groovy:control-bus input-channel="operationChannel"/>

컨트롤 버스는 가지고 있는 입력 채널을 통해 애플리케이션 컨텍스트에 있는 빈을 호출한다.

Groovy 컨트롤 버스는 입력 채널에서 받은 메시지를 Groovy 스크립트로 실행한다. 메시지를 받아서 본문을 스크립트로 컴파일하고, GroovyObjectCustomizer로 커스텀한 뒤 실행한다. 컨트롤 버스의 MessageProcessor는 애플리케이션 컨텍스트 내 빈들 중 @ManagedResource를 선언한 빈과 스프링의 Lifecycle 인터페이스나 기반 클래스 CustomizableThreadCreator를 확장한 빈들을 변수로 노출해준다 (ex. TaskExecutorTaskScheduler 구현체들).

컨트롤 버스의 커맨드 스크립트에서 커스텀 스코프(ex. 'request')로 관리하는 빈을 사용한다면 주의가 필요하다. 특히 비동기 메시지 플로우라면 더욱 더 주의해야 한다. 컨트롤 버스의 MessageProcessor는 애플리케이션 컨텍스트에 있는 빈을 노출할 수 없는 경우 커맨드 스크립트를 실행하는 중에 BeansException이 발생할 수 있다. 예를 들어 커스텀 스코프의 컨텍스트가 설정되지 않았다면, 해당 스코프 내 빈을 가져오려하면 BeanCreationException을 유발하게 된다.

Groovy 객체를 좀 더 커스텀해야 한다면 다음 예제와 같이 customizer 속성을 이용해 GroovyObjectCustomizer를 구현한 빈을 참조하는 것도 가능하다:

<int-groovy:control-bus input-channel="input"
        output-channel="output"
        customizer="groovyCustomizer"/>

<beans:bean id="groovyCustomizer" class="org.foo.MyGroovyObjectCustomizer"/>

10.9. Adding Behavior to Endpoints

Spring Integration 2.2 이전에는 폴러의 <advice-chain/> 요소에 AOP Advice를 추가해서 전반적인 통합 플로우에 원하는 동작을 추가할 수 있었다. 하지만 다운스트림 엔드포인트를 전부 다시 실행하는 게 아니라, 단순히 REST 웹 서비스만 다시 호출하고 싶다고 가정해보자.

아래 있는 플로우를 한 번 생각해보자:

inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter

폴러의 어드바이스 체인에 재시도 로직을 구성했다면, 네트워크 결함으로 인해 http-gateway2 호출에 실패한 경우, 재시도로 인해 http-gateway1http-gateway2는 두 번 호출된다. 마찬가지로 jdbc-outbound-adapter에서 일시적인 에러가 발생했을 때에도 jdbc-outbound-adapter를 다시 실행하기 전 두 HTTP 게이트웨이를 또 다시 호출한다.

Spring Integration 2.2부턴 개별 엔드포인트에 동작을 추가할 수 있다. 다양한 엔드포인트에 <request-handler-advice-chain/> 요소를 추가해주면 된다. 다음은 outbound-gateway 내에서 <request-handler-advice-chain/> 요소를 사용하는 예시다:

<int-http:outbound-gateway id="withAdvice"
    url-expression="'http://localhost/test1'"
    request-channel="requests"
    reply-channel="nextChannel">
    <int-http:request-handler-advice-chain>
        <ref bean="myRetryAdvice" />
    </int-http:request-handler-advice-chain>
</int-http:outbound-gateway>

이 경우 myRetryAdvice는 이 게이트웨이에만 국소적으로 적용되며, 응답을 nextChannel로 전송하고 난 뒤에 다운스트림에서 일어나는 작업들엔 적용되지 않는다. 즉, 어드바이스 스코프는 엔드포인트 자체로 제한된다.

현재로썬 어드바이스는 전체 엔드포인트들을 아우르는 <chain/>엔 적용할 수 없다. <request-handler-advice-chain>은 chain 자체의 자식 요소로는 사용할 수 없다.

하지만 <request-handler-advice-chain><chain/> 요소 안에 있는 엔드포인트들 중, 응답을 생성하는 엔드포인트들에 개별적으로 추가할 수 있다. 한 가지 예외가 있다면, 응답을 생성하지 않는 체인에선, 체인의 마지막 요소가 outbound-channel-adapter이기 때문에 마지막 요소에는 어드바이스를 적용할 수 없다는 점이다. 이런 요소에 어드바이스를 적용해야 한다면 해당 어댑터를 체인 외부로 이동시켜야 한다 (체인의 output-channel이 어댑터의 input-channel이 된다). 그러면 평소대로 어댑터에 어드바이스를 적용할 수 있다. 응답을 생성하는 체인의 경우 모든 자식 요소에 어드바이스를 적용할 수 있다.

10.9.1. Provided Advice Classes

일반적인 메커니즘을 이용해 AOP 어드바이스 클래스를 적용하는 방법도 있지만, Spring Integration은 다음과 같은 어드바이스 구현체들을 기본으로 제공하고 있다:

Retry Advice

retry 어드바이스(o.s.i.handler.advice.RequestHandlerRetryAdvice)는 Spring Retry 프로젝트에서 제공하는 풍부한 재시도 메커니즘을 활용하는 어드바이스다. spring-retry의 핵심 컴포넌트는 RetryTemplate이다. 이 클래스를 이용하면 재시도 횟수를 전부 소진했을 때 취할 조치를 결정하는 RecoveryCallback 전략과, RetryPolicy, BackoffPolicy 전략(다양한 구현체들도 함께) 등, 재시도 시나리오를 정교하게 구성할 수 있다.

spring-retry에 대한 자세한 내용은 해당 프로젝트의 Javadocspring-retry의 시초였던 스프링 배치의 레퍼런스 문서를 참고해라.

디폴트 백오프 동작은 백오프를 사용하지 않는 거다. 즉, 곧바로 재시도한다. 다시 시도할 때마다 스레드를 멈추는pause 백오프 정책을 사용하면, 과도한 메모리 사용 및 스레드 고갈을 비롯한 성능 이슈가 발생할 수 있다. 대용량 환경에선 백오프 정책을 주의해서 사용해야 한다.

Configuring the Retry Advice

이번 섹션에서 다루는 예제들은 다음과 같이 언제나 예외를 던지는 <service-activator>를 사용한다:

public class FailingService {

    public void service(String message) {
        throw new RuntimeException("error");
    }
}

이 경우엔 사용자가 정의한 예외를 MessagingException으로 감쌀 수 있기 때문에 traverseCauses가 필요하다.

Circuit Breaker Advice

서킷 브레이커 패턴은 현재 이용할 수 없는 서비스가 있다면 그 서비스에 시간과 리소스를 낭비하지 않겠다는 개념이다. o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice는 이 패턴을 구현한 클래스다. 서킷 브레이커가 닫혀 있으면 엔드포인트는 서비스 호출을 시도한다. 서킷 브레이커는 연속으로 특정 횟수만큼 실패하면 열린다. 열린 상태일 때 새로 요청이 들어오면 “빠르게 실패”하고 정해진 시간 동안은 서비스 호출을 시도하지 않는다.

정해진 시간이 지나면 서킷 브레이커는 half-open 상태로 세팅된다. 이 상태에선 한 번이라도 실패하면 서킷 브레이커는 즉시 열린 상태로 돌아간다. 호출에 성공했다면 서킷 브레이커는 다시 닫히며, 이때는 설정한 횟수만큼 다시 연속해서 실패하지 않는다면 열리지 않는다. 서킷 브레이커를 언제 다시 열지를 결정할 수 있도록 호출에 성공할 때마다 실패 횟수는 0으로 리셋된다.

일반적으로 서킷 브레이커 어드바이스는 실패하는 데 시간이 좀 걸리기도 하는 (네트워크 연결을 시도하는 중 타임아웃이 발생하는 등) 외부 서비스에 활용하곤 한다.

RequestHandlerCircuitBreakerAdvicethresholdhalfOpenAfter라는 두 가지 프로퍼티가 있다. threshold 프로퍼티는 서킷 브레이커를 열으려면 필요한 연속 실패 횟수를 나타낸다. 디폴트는 5다. halfOpenAfter 프로퍼티는, 마지막으로 실패한 시간을 기준으로 서킷 브레이커가 다시 요청을 보내보기 전에 대기하는 시간이다. 디폴트는 1000밀리세컨드다.

다음은 서킷 브레이커를 설정하는 예제와, DEBUG, ERROR로 출력된 로그다:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

위 예제에선 threshold를 2로 설정하고 halfOpenAfter12초로 설정하고 있다. 요청은 5초간격으로 새로 도착한다. 처음 두 번은 서비스를 호출했다. 세 번째와 네 번째 시도에선 서킷 브레이커가 열려있음을 나타내는 예외와 함께 실패했다. 다섯 번째 요청은 마지막으로 호출에 실패한지 15초가 지난 다음 들어온 요청이기 때문에 실제로 서비스 호출을 시도했다. 서킷 브레이커는 즉시 열리고, 여섯 번째 시도 또한 즉시 실패한다.

Expression Evaluating Advice

그 다음으로 제공하는 어드바이스 클래스는 o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice다. 이 어드바이스는 앞에서 다룬 어드바이스들보단 좀 더 범용적인데, 엔드포인트로 전송된 기존 인바운드 메시지로 표현식을 평가할 수 있다. 이땐 성공했을 때와 실패했을 때를 나눠서 별도의 표현식을 평가할 수 있다. 원한다면 입력 메시지와 평가 결과를 함께 담은 메시지를 특정 메시지 채널로 전송할 수도 있다.

이 어드바이스의 대표적인 사용 사례는 <ftp:outbound-channel-adapter/>를 사용해, 전송에 성공하면 파일을 특정 디렉토리로 이동시키고 실패하면 또 다른 디렉토리로 이동시키는 케이스다.

이 어드바이스는 성공/실패 시 사용할 표현식과, 각각의 채널을 설정할 수 있는 프로퍼티를 가지고 있다. 성공한 케이스에서 successChannel로 전송하는 메시지는 AdviceMessage로, 표현식을 평가한 결과를 페이로드로 가지고 있다. AdviceMessage는 핸들러로 전송된 원본 메시지를 저장하는 inputMessage라는 또 다른 프로퍼티도 가지고 있다. failureChannel에 전송되는 (핸들러에서 예외를 던지면) 메시지는 ErrorMessage로, 페이로드에 MessageHandlingExpressionEvaluatingAdviceException을 담고있다. MessagingException 인스턴스가 전부 그렇듯, 이 페이로드는 failedMessage, cause 프로퍼티를 가지고 있으며, 추가로 표현식의 평가 결과가 들어있는 evaluationResult라는 프로퍼티도 존재한다.

5.1.3 버전부터 채널은 설정했지만 표현식을 제공하지 않은 경우, 디폴트 표현식을 사용해 메시지의 payload를 평가한다.

어드바이스 스코프 내에서 예외가 발생하면 기본적으로 failureExpression을 평가한 뒤 호출부로 해당 예외를 던진다. 예외를 던지는 게 싫다면 trapException 프로퍼티를 true로 설정해라. 다음은 Java DSL을 사용해 어드바이스를 설정하는 예제다:

@SpringBootApplication
public class EerhaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }

    @Bean
    public IntegrationFlow advised() {
        return f -> f.handle((GenericHandler<String>) (payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

}

Rate Limiter Advice

Rate Limiter 어드바이스(RateLimiterRequestHandlerAdvice)를 사용하면 엔드포인트가 요청으로 인해 과부하에 걸리지 않도록 보호할 수 있다. 속도 제한을 어기면 요청을 차단한다.

Rate Limiter 어드바이스는 분당 n개 이상의 요청을 허용하지 않는 외부 서비스 provider에 사용하는 케이스가 대표적이다.

RateLimiterRequestHandlerAdvice 구현체는 완벽하게 Resilience4j 프로젝트를 기반으로 동작하며, RateLimiterRateLimiterConfig를 주입받아야 한다. 디폴트값이나 커스텀 이름도 함께 설정할 수 있다.

다음은 1초당 하나의 요청만 허용하도록 rate limiter 어드바이스를 설정하는 예시다:

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}

Caching Advice

5.2 버전부터는 CacheRequestHandlerAdvice를 사용할 수 있다. 이 클래스는 스프링 프레임워크가 추상화해놓은 캐시 인프라를 사용하며, @Caching 어노테이션 패밀리에서 제공하는 개념과 기능들을 그대로 사용한다. 내부 로직에선 CacheAspectSupport를 상속해서, AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage 메소드를 중심으로 요청 Message<?>를 인자로 넘겨 캐시 연산을 프록시 처리한다. 이 어드바이스는 캐시 키를 평가할 SpEL 표현식이나 Function을 지정할 수 있다. 요청 Message<?>는 SpEL 평가 컨텍스트의 루트 객체나 Function 입력 인자를 통해 접근할 수 있다. 기본적으론 요청 메시지의 payload를 캐시 키로 사용한다. CacheRequestHandlerAdvice는 디폴트 캐시 연산이 하나의 CacheableOperation이거나 임의의 CacheOperation 셋을 사용하는 경우, 반드시 cacheNames를 설정해줘야 한다. 모든 CacheOperation은 별도로 설정할 수도 있고, CacheManager, CacheResolver, CacheErrorHandler같은 옵션들을 공유할 수 있으며, CacheRequestHandlerAdvice 설정에서 재사용할 수 있다. 스프링 프레임워크의 @CacheConfig@Caching 어노테이션 설정을 조합하는 것과 유사하다. CacheManager를 제공하지 않으면 기본적으로 CacheAspectSupport에서 BeanFactory를 사용해 빈 하나를 리졸브한다.

다음 예제에선 서로 다른 캐시 연산 셋을 갖는 두 가지 어드바이스를 설정하고 있다:

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}

10.9.2. Reactive Advice

5.3 버전부터 Mono 응답을 생성하는 요청 메시지 핸들러에는 ReactiveRequestHandlerAdvice를 사용할 수 있다. 리액티브 어드바이스를 사용할 땐 BiFunction<Message<?>, Mono<?>, Publisher<?>>를 지정해야 하며, handleRequestMessage() 메소드를 가로채 받은 응답에서 Mono.transform() 연산자를 호출하고, 그 안에서 이 BiFunction을 호출한다. 일반적으로 이런 식의 Mono 커스텀은 timeout(), retry() 등과 유사한 연산자를 통해 들쭉날쭉한 네트워크 상황을 대응해야 할 때 필요하다. 예를 들어 WebFlux 클라이언트를 통해 HTTP 요청을 보낼 때는, 아래와 같이 설정해주면 응답을 5초 이상 기다리지 않도록 만들 수 있다:

.handle(WebFlux.outboundGateway("https://somehost/"),
                       e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));

message 인자는 메시지 핸들러로 도착한 요청 메시지이며, 이 인자를 사용해 요청 스코프에 있는 속성들을 알아낼 수 있다. mono 인자는 이 메시지 핸들러의 handleRequestMessage() 메소드를 실행해서 얻은 결과다. 이 함수 안에서 Mono.transform()을 중첩하면 리액티브 서킷 브레이커 등을 적용할 수도 있다.

10.9.3. Custom Advice Classes

앞서 설명한 기본 제공 어드바이스 클래스들 외에도, 자체 어드바이스 클래스를 구현하 것도 가능하다. org.aopalliance.aop.Advice의 구현체를 제공할 수도 있지만 (보통 org.aopalliance.intercept.MethodInterceptor), 일반적으로 o.s.i.handler.advice.AbstractRequestHandlerAdvice의 하위 클래스를 만드는 것을 권장한다. 이 클래스를 상속하면 저수준 AOPaspect-oriented programming 코드를 작성하지 않아도 되며, 통합 환경에 딱 맞게 개발을 시작할 수 있다.

하위 클래스에선 doInvoke() 메소드를 구현해야 하며, 메소드 정의는 다음과 같다:

/**
 * Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
 * invokes the handler method and returns its result, or null).
 * @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
 * @param target The target handler.
 * @param message The message that will be sent to the handler.
 * @return the result after invoking the {@link MessageHandler}.
 * @throws Exception
 */
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;

콜백 파라미터가 있는 덕분에 하위 클래스에선 AOP를 직접 처리하지 않아도 된다. callback.execute() 메소드를 호출하면 메시지 핸들러가 실행된다.

target 파라미터는 특정 핸들러의 상태를 유지해야 할 때를 대비해 제공한다. 보통은 타겟에서 키를 지정한 Map 안에 상태를 유지해야 할 거다. 이를 잘 활용하면 여러 핸들러에 동일한 어드바이스를 적용할 수 있다. RequestHandlerCircuitBreakerAdvice는 이 어드바이스를 사용해 각 핸들러의 서킷 브레이커 상태를 유지한다.

message 파라미터는 핸들러로 전달하는 메시지다. 어드바이스는 핸들러를 호출하기 전에 메시지를 수정할 수 없지만, 페이로드는 수정할 수 있다 (변경 가능한mutable 프로퍼티가 있다면). 보통 어드바이스에선 이 메시지를 사용해, 핸들러를 호출하기 전이나 후에 메시지를 기록하거나 어딘가에 메시지 복사본을 전송하곤 한다.

보통은 callback.execute()가 반환한 값을 그대로 리턴한다. 하지만 이 어드바이스는 반환 값을 수정할 수 있다. 단, AbstractReplyProducingMessageHandler 인스턴스만 값을 반환한다는 점을 주의하자. 다음은 AbstractRequestHandlerAdvice를 상속한 커스텀 어드바이스 클래스의 예시다:

public class MyAdvice extends AbstractRequestHandlerAdvice {

    @Override
    protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
        // add code before the invocation
        Object result = callback.execute();
        // add code after the invocation
        return result;
    }
}

ExecutionCallbackexecute() 외에 cloneAndExecute() 메소드도 함께 제공하고 있다. RequestHandlerRetryAdvice와 같이 doInvoke()를 한 번 실행하는 동안 콜백을 여러 번 실행할 수 있다면 반드시 cloneAndExecute() 메소드를 사용해야 한다. 스프링 AOP org.springframework.aop.framework.ReflectiveMethodInvocation 객체가 체인에서 마지막으로 호출된 어드바이스를 추적해서 상태를 유지하기 때문이다. 이 상태는 호출이 일어날 때마다 리셋되어야 한다.

자세한 내용은 Javadoc ReflectiveMethodInvocation을 참고해라.

10.9.4. Other Advice Chain Elements

위에선 간편한 추상 클래스를 알아보았지만, 트랜잭션 어드바이스를 비롯한 모든 Advice를 체인에 추가할 수 있다.

10.9.5. Handling Message Advice

이번 섹션을 소개하면서도 설명했지만, request handler advice chain 안에 있는 어드바이스 객체들은 다운스트림 플로우(있다면)가 아닌 현재 엔드포인트에만 적용된다. 응답을 생성하는 MessageHandler 객체라면 (AbstractReplyProducingMessageHandler를 상속한 객체 등), 어드바이스는 내부 메소드 handleRequestMessage()에 적용된다 (MessageHandler.handleMessage() 안에서 호출한다). 다른 메시지 핸들러의 경우 MessageHandler.handleMessage()에 어드바이스가 적용된다.

하지만 상황에 따라 메시지 핸들러가 AbstractReplyProducingMessageHandler이더라도 handleMessage 메소드에 어드바이스를 적용해야 할 때도 있다. 예를 들어, idempotent receivernull을 반환할 수 있는데, 이때 핸들러의 replyRequired 프로퍼티가 true로 설정돼있으면 예외가 발생한다. 또 다른 예시로 BoundRabbitChannelAdvice도 있다 (정확한 메시지 순서 유지하기 참고).

4.3.1 버전에선 새롭게 HandleMessageAdvice 인터페이스와 이 인터페이스의 기본 구현체(AbstractHandleMessageAdvice)를 도입했다. HandleMessageAdvice를 구현한 Advice 객체는 핸들러 타입에 관계없이 항상 handleMessage() 메소드에 적용된다.

HandleMessageAdvice의 구현체는 (idempotent receiver 등) 응답을 반환하는 핸들러에 적용되면, adviceChain에서 분리돼서 적절히 MessageHandler.handleMessage() 메소드에 적용된다는 점을 이해하고 넘어가야 한다.

체인에서 분리되기 때문에 어드바이스 체인의 순서는 지켜지지 않는다.

아래 설정을 한 번 살펴보자:

<some-reply-producing-endpoint ... >
    <int:request-handler-advice-chain>
        <tx:advice ... />
        <ref bean="myHandleMessageAdvice" />
    </int:request-handler-advice-chain>
</some-reply-producing-endpoint>

위 예제에서 <tx:advice>AbstractReplyProducingMessageHandler.handleRequestMessage()에 적용된다. 하지만 myHandleMessageAdviceMessageHandler.handleMessage()에 적용된다. 따라서 myHandleMessageAdvice<tx:advice>보다 먼저 실행된다. 순서를 유지하고 싶다면 표준 Spring AOP 설정 방식을 따르고, 엔드포인트 id.handler suffix를 사용해 타겟 MessageHandler 빈을 가져와야 한다. 이 경우 전체 다운스트림 플로우가 트랜잭션 범위 내에 들어가게 된다.

응답을 반환하지 않는 MessageHandler에선 어드바이스 체인의 순서가 유지된다.

5.3 버전부터 HandleMessageAdviceAdapter를 제공해서, 기존 MethodInterceptor 중 원하는 것을 MessageHandler.handleMessage()에, 즉 전체 하위 플로우에 적용할 수 있다. 예를 들면 특정 엔드포인트에서 시작하는 전체 하위 플로우에 RetryOperationsInterceptor를 적용시킬 수 있다. 원래대로면 컨슈머 엔드포인트에선 AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()에만 어드바이스가 적용되기 때문에 불가능한 일이었다.

10.9.6. Transaction Support

5.0 버전부터 HandleMessageAdvice를 구현한 덕분에 전체 다운스트림 플로우에 트랜잭션을 적용할 수 있는 TransactionHandleMessageAdvice를 새로 도입했다. <request-handler-advice-chain> 요소에서 일반 TransactionInterceptor를 사용했을 땐 (ex. <tx:advice> 설정을 통해), 트랜잭션을 시작해도 내부 AbstractReplyProducingMessageHandler.handleRequestMessage()에만 적용되며 다운스트림 플로우로는 전파되지 않는다.

<request-handler-advice-chain>과 더불어 XML 설정을 단순하게 만들 수 있도록, 모든 <outbound-gateway>, <service-activator>와 관련 구성 요소들에 <transactional> 요소가 추가됐다. 다음은 <transactional>을 사용하는 예시다:

<int-rmi:outbound-gateway remote-channel="foo" host="localhost"
    request-channel="good" reply-channel="reply" port="#{@port}">
        <int-rmi:transactional/>
</int-rmi:outbound-gateway>

<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
    <constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>

JPA 통합 컴포넌트에 익숙하다면 별로 새로운 설정은 아니겠지만, 이제 <poller>JMS같은 메시지 기반 채널 어댑터 뿐 아니라, 플로우의 어느 지점에서나 트랜잭션을 시작할 수 있다.

자바 설정은 TransactionInterceptorBuilder를 이용해 단순화할 수 있으며, 다음과 같이 메시지 처리 어노테이션adviceChain 속성에 등록한 빈의 이름을 지정하면 된다:

@Bean
public ConcurrentMetadataStore store() {
    return new SimpleMetadataStore(hazelcastInstance()
                       .getMap("idempotentReceiverMetadataStore"));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(
            new MetadataStoreSelector(
                    message -> message.getPayload().toString(),
                    message -> message.getPayload().toString().toUpperCase(), store()));
}

@Bean
public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder(true)
                .transactionManager(this.transactionManager)
                .isolation(Isolation.READ_COMMITTED)
                .propagation(Propagation.REQUIRES_NEW)
                .build();
}

@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
         outputChannel = "output",
         adviceChain = { "idempotentReceiverInterceptor",
                 "transactionInterceptor" })
public Transformer transformer() {
    return message -> message;
}

TransactionInterceptorBuilder 생성자에 true를 넘긴 점에 주목해라. 이 파라미터 덕분에 일반 TransactionInterceptor가 아닌 TransactionHandleMessageAdvice를 생성한다.

Java DSL에선 엔드포인트를 설정하면서 .transactional() 옵션을 이용하면 Advice를 등록할 수 있다:

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
        .handle(Jpa.updatingGateway(this.entityManagerFactory),
                e -> e.transactional(true))
        .channel(c -> c.queue("persistResults"));
}

10.9.7. Advising Filters

Filter에 어드바이스를 적용할 땐 추가로 생각해봐야 하는 것이 있다. 기본적으로, 모든 discard 동작은 (필터가 false를 반환할 때를 의미한다) 어드바이스 체인의 스코프 내에서 수행된다. discard 채널의 다운스트림 플로우가 전부 어드바이스 체인에 들어갈 수도 있다. 그렇기 때문에, 예를 들어 discard 채널의 다운스트림에서 예외가 발생했고 retry 어드바이스가 하나 있는 경우엔 이 프로세스를 재시도하게 된다. throwExceptionOnRejectiontrue로 설정된 경우도 마찬가지다 (어드바이스 스코프 내에서 예외를 던진다).

이 동작은 discard-within-advicefalse로 설정하면 변경할 수 있으며, 어드바이스 체인을 실행한 이후에 discard(또는 exception)를 진행하게 된다.

10.9.8. Advising Endpoints Using Annotations

특정 엔드포인트를 어노테이션을 사용해 설정할 땐 (@Filter, @ServiceActivator, @Splitter, @Transformer), adviceChain 속성에 어드바이스 체인의 빈 이름을 명시할 수 있다. 추가로, @Filter 어노테이션은 필터에 어드바이스 적용하기에서 설명한 것처럼 discard 동작을 설정할 수 있는 discardWithinAdvice 속성도 가지고 있다. 다음은 어드바이스를 적용한 이후에 discard를 진행하도록 만드는 설정 예시다:

@MessageEndpoint
public class MyAdvisedFilter {

    @Filter(inputChannel="input", outputChannel="output",
            adviceChain="adviceChain", discardWithinAdvice="false")
    public boolean filter(String s) {
        return s.contains("good");
    }
}

10.9.9. Ordering Advices within an Advice Chain

어드바이스를 여러 개 사용할 땐 다른 어드바이스를 “감싸는around” 식으로 동작해 중첩된다. 첫 번째 어드바이스가 가장 바깥쪽에 위치하고, 마지막 어드바이스는 가장 안쪽에 위치한다 (즉, 어드바이스는 적용하는 핸들러에 가장 가깝게 배치된다). 원하는 기능을 구현하려면 어드바이스 클래스들을 올바른 순서로 배치하는 게 중요하다.

예를 들어서 retry 어드바이스와 트랜잭션 어드바이스를 추가한다고 생각해보자. retry 어드바이스를 가장 앞에 배치하고 그 뒤에 트랜잭션 어드바이스를 배치하고 싶다고 해보자. 즉, 재시도할 때마다 새 트랜잭션을 시작해야 한다. 반면, 복구 작업을 비롯해서 (retry RecoveryCallback) 모든 호출 시도를 하나의 트랜잭션 내에서 진행하고 싶다면 트랜잭션 어드바이스를 앞에 배치하면 된다.

10.9.10. Advised Handler Properties

간혹 어드바이스 내에서 핸들러 프로퍼티에 접근할 수 있으면 유용할 때가 있다. 예를 들어, 대부분의 핸들러는 NamedComponent를 구현하고 있기 때문에 컴포넌트 이름에 접근할 수 있다.

타겟 객체는 target 인자나 (AbstractRequestHandlerAdvice를 상속했을 때), invocation.getThis()(org.aopalliance.intercept.MethodInterceptor를 구현했을 때)를 통해 액세스할 수 있다.

어드바이스가 핸들러 전체에 적용될 때는 (핸들러가 응답을 생성하지 않는 핸들러이거나, 어드바이스가 HandleMessageAdvice를 구현하고 있는 경우), 다음 예제와 같이 타겟 객체를 NamedComponent 등의 인터페이스로 타입 캐스팅할 수 있다:

String componentName = ((NamedComponent) target).getComponentName();

MethodInterceptor를 직접 구현한다면 타겟 객체를 다음과 같이 캐스팅할 수 있다:

String componentName = ((NamedComponent) invocation.getThis()).getComponentName();

handleRequestMessage() 메소드에만 어드바이스를 적용하는 경우 (응답을 생성하는 핸들러) 핸들러 자체, 즉 AbstractReplyProducingMessageHandler에 접근해야 한다:

AbstractReplyProducingMessageHandler handler =
    ((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();

String componentName = handler.getComponentName();

10.9.11. Idempotent Receiver Enterprise Integration Pattern

Spring Integration 4.1 버전부터 엔터프라이즈 통합 패턴 Idempotent Receiver의 구현체를 제공한다. 이 패턴은 함수형 패턴으로, 멱등성idempotency을 위한 로직은 전부 애플리케이션에서 구현해야 한다. 하지만 Spring Integration은 좀 더 쉽게 패턴을 구현할 수 있도록 IdempotentReceiverInterceptor라는 컴포넌트를 제공한다. 이 클래스는 MessageHandler.handleMessage() 메소드에 적용되는 AOP Advice이며, 설정에 따라 요청 메시지를 필터링하거나 중복으로 마킹할 수 있다.

이전에는 <filter/>에서 커스텀 MessageSelector를 사용하는 식으로 이 패턴을 구현할 수 있었다 (Filter 참고). 하지만 이 패턴은 엔드포인트 자체라기 보단, 엔드포인트의 동작을 정의하는 패턴에 가깝기 때문에, idempotent receiver 구현체는 엔드포인트 구성 요소를 제공하지 않는다. 그보단 오히려 애플리케이션에 정의돼있는 다른 엔드포인트들에 적용된다고 볼 수 있다.

IdempotentReceiverInterceptor는 설정해둔 MessageSelector를 기반으로 동작하며, 이 셀렉터가 메시지를 수락하지 않으면 메시지에 duplicateMessage 헤더를 true로 저장한다. 타겟 MessageHandler에선 (또는 다운스트림 플로우) 이 헤더를 보고 멱등성idempotency을 위한 로직을 올바르게 구현할 수 있다. IdempotentReceiverInterceptordiscardChannel이나 throwExceptionOnRejection = true를 설정하면 중복 메시지는 타겟 MessageHandler.handleMessage()로 전송하지 않고 버린다discard. 중복 메시지를 삭제하려면discard (아무 로직도 실행하지 않으려면), discardChannel을 디폴트 nullChannel 빈과 같은 NullChannel로 설정해야 한다.

Spring Integration은 여러 메시지를 처리하면서 상태를 유지하고 메시지를 비교해 멱등성idempotency을 지원할 수 있도록 MetadataStoreSelector를 제공하고 있다. MetadataStoreSelector는 생성자에서 MessageProcessor 구현체와 (Message를 가지고 lookup 키를 생성한다), ConcurrentMetadataStore를 받는다 (메타데이터 스토어). ConcurrentMetadataStore는 생략할 수 있다. 자세한 내용은 MetadataStoreSelector Javadoc을 참고해라. MessageProcessor를 하나 더 넘겨서 ConcurrentMetadataStorevalue를 커스텀할 수도 있다. MetadataStoreSelector는 기본적으로 메시지 timestamp 헤더를 사용한다.

이 셀렉터는 일반적으로 키에 해당하는 값이 기존에 없다면 메시지를 수락한다. 상황에 따라서는 키에 해당하는 현재의 값과 새 값을 비교해서 메시지 수락 여부를 결정해야 할 때도 있다. 5.3 버전부터 BiPredicate<String, String>을 참조하는 compareValues란 프로퍼티를 제공한다. 첫 번째 파라미터로는 이전 값을 넘기며, true를 반환해 메시지를 수락하면 MetadataStore의 이전 값을 새로운 값으로 교체한다. 이 프로퍼티를 이용하면 키의 개수를 줄일 수 있다. 예를 들어서 파일에 담겨있는 라인들을 처리한다면, 키에 파일명을, 값에 현재 라인 넘버를 저장할 수 있다. 재시작 이후엔 이미 처리한 라인들을 건너뛸 수 있다. 예제가 필요하다면 파일을 분할한 뒤 다운스트림에서 멱등성을 지켜가며 처리하기를 확인해봐라.

MetadataStoreSelector 옵션들은 <idempotent-receiver> 위에 바로 설정할 수 있어 편리하다. 아래 설정엔 지원하는 모든 속성을 나타냈다:

<idempotent-receiver
        id=""  <!-- (1) -->
        endpoint=""  <!-- (2) -->
        selector=""  <!-- (3) -->
        discard-channel=""  <!-- (4) -->
        metadata-store=""  <!-- (5) -->
        key-strategy=""  <!-- (6) -->
        key-expression=""  <!-- (7) -->
        value-strategy=""  <!-- (8) -->
        value-expression=""  <!-- (9) -->
        compare-values="" <!-- (10) -->
        throw-exception-on-rejection="" />  <!-- (11) -->

(1) IdempotentReceiverInterceptor 빈의 ID.
생략할 수 있다.

(2) 이 인터셉터를 적용할 컨슈머 엔드포인트의 이름 혹은 패턴 (여러 개 지정할 수 있다).
endpoint=”aaa, bbb*, ccc, *ddd, eee*fff”와 같이 콤마(,)로 이름(패턴)들을 구분한다.
그러면 이 패턴과 일치하는 엔드포인트 빈 이름들로 타겟 엔드포인트의 MessageHandler 빈을 조회하고 (.handler suffix 사용), 해당 빈들에 IdempotentReceiverInterceptor를 적용한다.
필수 값이다.

(3) MessageSelector 빈에 대한 참조.
metadata-storekey-strategy (key-expression)와는 함께 사용할 수 없다.
selector를 지정하지 않는다면 key-strategykey-strategy-expression 중 하나를 지정해야 한다.

(4) IdempotentReceiverInterceptor가 수락하지 않은 메시지를 전송할 채널.
생략하면 중복 메시지는 duplicateMessage 헤더와 함께 핸들러로 전달된다.
생략할 수 있다.

(5) ConcurrentMetadataStore에 대한 참조.
내부 MetadataStoreSelector에서 사용한다.
selector와는 함께 사용할 수 없다.
생략할 수 있다.
디폴트 MetadataStoreSelector는 애플리케이션을 재실행하면 상태가 유지되지 않는 SimpleMetadataStore를 사용한다.

(6) MessageProcessor에 대한 참조.
내부 MetadataStoreSelector에서 사용한다.
요청 메시지를 가지고 idempotentKey를 결정한다.
selectorkey-expression과는 함께 사용할 수 없다.
selector를 지정하지 않는으면 key-strategykey-strategy-expression 중 하나를 지정해야 한다.

(7) ExpressionEvaluatingMessageProcessor에서 사용할 SpEL 표현식.
내부 MetadataStoreSelector에서 사용한다.
요청 메시지를 평가 컨텍스트 루트 객체로 사용해서 idempotentKey를 평가한다.
selectorkey-strategy와는 함께 사용할 수 없다.
selector를 지정하지 않으면 key-strategykey-strategy-expression 중 하나를 지정해야 한다.

(8) MessageProcessor에 대한 참조.
내부 MetadataStoreSelector에서 사용한다.
요청 메시지를 가지고 idempotentKey에 대한 value를 평가한다.
selectorvalue-expression과는 함께 사용할 수 없다.
기본적으로 ‘MetadataStoreSelector’는 메시지 헤더 ‘timestamp’를 메타데이터 ‘value’로 사용한다.

(9) ExpressionEvaluatingMessageProcessor에서 사용할 SpEL 표현식.
내부 MetadataStoreSelector에서 사용한다.
요청 메시지를 평가 컨텍스트 루트 객체로 사용해서 idempotentKey에 대한 value를 평가한다.
selectorvalue-strategy와는 함께 사용할 수 없다.
기본적으로 ‘MetadataStoreSelector’는 메시지 헤더 ‘timestamp’를 메타데이터 ‘value’로 사용한다.

(10) BiPredicate<String, String> 빈에 대한 참조.
특정 키에 해당하는 이전 값과 새 값을 비교해서 메시지를 선택할 수 있다.
생략할 수 있으며, 디폴트는 null이다.

(11) IdempotentReceiverInterceptor가 메시지를 거절했을 때 예외를 던질지 여부.
디폴트는 false다.
discard-channel 설정 여부와는 관계없이 적용된다.

자바 설정의 경우, Spring Integration은 메소드 레벨 어노테이션 @IdempotentReceiver를 제공하고 있다. 메시지 처리 어노테이션(@ServiceActivator, @Router 등)을 가지고 있는 method를 마킹해주면, 해당 엔드포인트에 적용할 IdempotentReceiverInterceptor 객체를 지정할 수 있다. 다음은 @IdempotentReceiver 어노테이션을 사용하는 예시다:

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

자바 DSL을 사용할 때는 다음과 같이 엔드포인트의 어드바이스 체인에 인터셉터를 추가할 수 있다:

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}

IdempotentReceiverInterceptorMessageHandler.handleMessage(Message<?>) 메소드 전용으로 설계한 인터셉터다. 4.3.1 버전부터는, 어드바이스 체인과 분리할 수 있도록 AbstractHandleMessageAdvice를 기본 클래스로 HandleMessageAdvice를 구현하고 있다. 자세한 내용은 메시지 어드바이스 처리하기를 참고해라.


10.10. Logging Channel Adapter

Wire Tap 섹션에서 말했듯이 <logging-channel-adapter>는 Wire Tap과 자주 사용하는 경우가 많다. 하지만 원하는 플로우의 최종 컨슈머로도 사용할 수 있다. 예를 들어 플로우가 결과를 반환하는 <service-activator>로 끝나지만, 그 결과를 폐기discard하고 싶다고 생각해 보자. 그러려면 그 결과를 NullChannel로 전송하면 된다. 그게 아니라면, INFO 레벨 <logging-channel-adapter>로 라우팅해도 된다. 이렇게 하면 INFO 레벨로 로깅할 때 버려진 메시지를 볼 수 있지만 (예를 들어) WARN 레벨로 로깅할 땐 로그를 조회할 수 없다. NullChannel을 사용한다면 버려지는 메시지들은 DEBUG 레벨로 로깅할 때만 조회할 수 있다. 다음은 logging-channel-adapter 요소에서 사용할 수 있는 속성들을 전부 나타낸 예시다:

<int:logging-channel-adapter
    channel="" <!-- (1) -->
    level="INFO" <!-- (2) -->
    expression="" <!-- (3) -->
    log-full-message="false" <!-- (4) -->
    logger-name="" /> <!-- (5) -->

(1) 이 로깅 어댑터를 업스트림 컴포넌트에 연결해주는 채널.
(2) 이 어댑터로 전송된 메시지를 로그로 남길 때 사용할 로그 레벨.
디폴트: INFO.

(3) 메시지의 정확히 어떤 부분을 로그로 남길지를 나타내는 SpEL 표현식.
디폴트: payload — 페이로드만 기록한다.
log-full-message를 지정했다면 이 속성은 사용할 수 없다.

(4) true면 헤더를 포함한 전체 메시지를 로깅한다.
디폴트: false — 페이로드만 기록한다.
expression을 지정했다면 이 속성은 사용할 수 없다.

(5) 로거의 name을 지정한다 (log4j에서는 category라고 부른다).
이 어댑터가 생성하는 로그 메시지를 식별하는 데 사용한다.
이 속성을 사용하면 어댑터에 개별적으로 로그 이름(하위 로그 시스템 내)을 설정할 수 있다.
기본적으로 모든 어댑터는 org.springframework.integration.handler.LoggingHandler라는 이름 아래 로그를 남긴다.

10.10.1. Using Java Configuration

다음은 자바 설정을 통해 LoggingHandler를 구성하는 스프링 부트 애플리케이션 예시다:

@SpringBootApplication
public class LoggingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
             new SpringApplicationBuilder(LoggingJavaApplication.class)
                    .web(false)
                    .run(args);
         MyGateway gateway = context.getBean(MyGateway.class);
         gateway.sendToLogger("foo");
    }

    @Bean
    @ServiceActivator(inputChannel = "logChannel")
    public LoggingHandler logging() {
        LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
        adapter.setLoggerName("TEST_LOGGER");
        adapter.setLogExpressionString("headers.id + ': ' + payload");
        return adapter;
    }

    @MessagingGateway(defaultRequestChannel = "logChannel")
    public interface MyGateway {

        void sendToLogger(String data);

    }

}

10.10.2. Configuring with the Java DSL

다음은 Java DSL을 이용해 로깅 채널 어댑터를 설정하는 스프링 부트 애플리케이션 예시다:

@SpringBootApplication
public class LoggingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
             new SpringApplicationBuilder(LoggingJavaApplication.class)
                    .web(false)
                    .run(args);
         MyGateway gateway = context.getBean(MyGateway.class);
         gateway.sendToLogger("foo");
    }

    @Bean
    public IntegrationFlow loggingFlow() {
        return IntegrationFlows.from(MyGateway.class)
                     .log(LoggingHandler.Level.DEBUG, "TEST_LOGGER",
                           m -> m.getHeaders().getId() + ": " + m.getPayload());
    }

    @MessagingGateway
    public interface MyGateway {

        void sendToLogger(String data);

    }

}

10.11. java.util.function Interfaces Support

Spring Integration은 5.1 버전부터 java.util.function 패키지의 인터페이스들을 본격적으로 지원하기 시작했다. 모든 메시지 처리 엔드포인트들은 (Service Activator, Transformer, Filter 등) 이제 Function(또는 Consumer) 빈을 참조할 수 있다. 일반적인 MessageHandler 정의와 유사하게, 이런 빈에도 메시지 처리 어노테이션을 바로 적용할 수 있다. 예를 들어 아래와 같은 Function 빈을 정의했다면:

@Configuration
public class FunctionConfiguration {

    @Bean
    public Function<String, String> functionAsService() {
        return String::toUpperCase;
    }

}

XML 설정 파일에서 이 빈을 간단하게 참조할 수 있다:

<service-activator input-channel="processorViaFunctionChannel" ref="functionAsService"/>

메시지 처리 어노테이션들로 플로우를 구성하는 코드는 다음과 같이 꽤 직관적이다:

@Bean
@Transformer(inputChannel = "functionServiceChannel")
public Function<String, String> functionAsService() {
    return String::toUpperCase;
}

배열이나 Collection(사실상 모든 Iterable), Stream, 리액터 Flux를 반환하는 함수라면, 이 빈 위에 @Splitter를 사용해 결과를 순회하도록 만들 수 있다.

java.util.function.Consumer 인터페이스는 <int:outbound-channel-adapter>에 사용하거나 @ServiceActivator 어노테이션과 함께 사용해 플로우의 마지막 단계를 이어갈 수 있다:

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
public Consumer<Message<?>> messageConsumerAsService() {
    // Has to be an anonymous class for proper type inference
    return new Consumer<Message<?>>() {

        @Override
        public void accept(Message<?> e) {
            collector().add(e);
        }

    };
}

위 코드에 있는 주석에도 주목해라. Function/Consumer에서 전체 메시지를 처리하려면 람다 정의는 사용할 수 없다. 자바 타입이 사라지기 때문에 apply()/accept() 메소드 호출에 사용할 타입을 결정할 수 없다.

java.util.function.Supplier 인터페이스는 간단히 @InboundChannelAdapter 어노테이션과 함께 사용하거나, <int:inbound-channel-adapter>ref로 사용할 수 있다:

@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public Supplier<String> pojoSupplier() {
    return () -> "foo";
}

Java DSL을 이용할 땐 엔드포인트 정의에서 함수 빈에 대한 참조를 사용하기만 하면 된다. 한편 Supplier 인터페이스의 구현체는 일반적인 MessageSource 정의로 사용할 수 있다:

@Bean
public Function<String, String> toUpperCaseFunction() {
    return String::toUpperCase;
}

@Bean
public Supplier<String> stringSupplier() {
    return () -> "foo";
}

@Bean
public IntegrationFlow supplierFlow() {
    return IntegrationFlows.from(stringSupplier())
                .transform(toUpperCaseFunction())
                .channel("suppliedChannel")
                .get();
}

함수 지원 기능은 Spring Cloud Function 프레임워크와 함께 사용할 때 더 유용하다. 이 프레임워크는 function catalog를 제공하며, 통합 플로우 정의에서 해당 멤버 함수를 참조할 수 있다.

10.11.1. Kotlin Lambdas

코틀린 람다 또한 지원하도록 프레임워크를 개선했기 때문에, 이제 코틀린 언어를 사용해 Spring Integration 플로우를 정의할 수 있다:

@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
    return { it.toUpperCase() }
}

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
    return { print(it) }
}

@Bean
@InboundChannelAdapter(value = "counterChannel",
        poller = [Poller(fixedRate = "10", maxMessagesPerPoll = "1")])
fun kotlinSupplier(): () -> String {
    return { "baz" }
}

Next :
Java DSL
Java DSL을 이용한 Spring Integration 설정 가이드

전체 목차는 여기에 있습니다.

<< >>

TOP