토리맘의 한글라이즈 프로젝트 logo 토리맘의 한글라이즈 프로젝트

스프링 인티그레이션 공식 레퍼런스를 한글로 번역한 문서입니다.

전체 목차는 여기에 있습니다.


이 챕터에선 Spring Integration을 사용해 메시지를 라우팅하는 방법을 자세히 설명한다.

목차


8.1. Routers

이번 섹션에선 라우터가 어떻게 동작하는지를 설명하면서, 다음과 같은 주제를 다룬다:

8.1.1. Overview

라우터는 수많은 메시징 아키텍처에서 없어서는 안 되는 요소다. 라우터는 메시지 채널에서 메시지를 컨슘하고, 가지고 있는 조건 셋에 따라 컨슘한 각 메시지를 하나 이상의 다른 메시지 채널로 전달해준다.

Spring Integration은 다음과 같은 라우터를 제공한다:

라우터 구현체들은 다양한 설정 파라미터를 공통으로 사용하고 있다. 하지만 라우터마다 몇 가지 차이점이 존재한다. 게다가 설정 파라미터를 사용할 수 있는지는 라우터를 체인 안에서 사용하는지, 바깥에서 사용하는지에 따라 달라진다. 바로 바로 참고할 수 있도록 사용 가능한 모든 속성들은 아래 두 테이블에 정리해뒀다.

다음은 체인 바깥에 있는 라우터에서 사용할 수 있는 설정 파라미터들을 정리한 테이블이다:

Table 4. Routers Outside of a Chain

Attribute router header value router xpath router payload type router recipient list route exception type router
apply-sequence tickmark tickmark tickmark tickmark tickmark tickmark
default-output-channel tickmark tickmark tickmark tickmark tickmark tickmark
resolution-required tickmark tickmark tickmark tickmark tickmark tickmark
ignore-send-failures tickmark tickmark tickmark tickmark tickmark tickmark
timeout tickmark tickmark tickmark tickmark tickmark tickmark
id tickmark tickmark tickmark tickmark tickmark tickmark
auto-startup tickmark tickmark tickmark tickmark tickmark tickmark
input-channel tickmark tickmark tickmark tickmark tickmark tickmark
order tickmark tickmark tickmark tickmark tickmark tickmark
method tickmark          
ref tickmark          
expression tickmark          
header-name   tickmark        
evaluate-as-string     tickmark      
xpath-expression-ref     tickmark      
converter     tickmark      

다음은 체인 안에 있는 라우터에서 사용할 수 있는 설정 파라미터들을 정리한 테이블이다:

Table 5. Routers Inside of a Chain

Attribute router header value router xpath router payload type router recipient list router exception type router
apply-sequence tickmark tickmark tickmark tickmark tickmark tickmark
default-output-channel tickmark tickmark tickmark tickmark tickmark tickmark
resolution-required tickmark tickmark tickmark tickmark tickmark tickmark
ignore-send-failures tickmark tickmark tickmark tickmark tickmark tickmark
timeout tickmark tickmark tickmark tickmark tickmark tickmark
id            
auto-startup            
input-channel            
order            
method tickmark          
ref tickmark          
expression tickmark          
header-name   tickmark        
evaluate-as-string     tickmark      
xpath-expression-ref     tickmark      
converter     tickmark      

모든 라우터 구현체는 Spring Integration 2.1부터 라우터 파라미터를 좀더 표준화해서 사용하기 시작했다. 그렇기 때문에 사소한 변경이지만 몇 가지는 이전 Spring Integration 기반 애플리케이션과 호환이 안 될 수도 있다.

Spring Integration 2.1부터 ignore-channel-name-resolution-failures 속성은 제거되었으며, 관련 기능은 resolution-required 속성으로 통합됐다. 뿐만 아니라 resolution-required 속성은 이제 true가 기본값이다.

이렇게 변경되기 전엔 resolution-required 속성은 false가 기본값이었으며, 어떤 채널로도 리졸브되지 않고 default-output-channel 또한 설정돼있지 않으면 아무런 경고 없이 메시지가 버려졌었다. 통합 이후에는 최소한 채널 하나로는 리졸브되어야 하며, 채널을 결정하지 못하면 (혹은 전송에 성공하지 못하면) 기본적으로 MessageDeliveryException이 발생한다.

에러를 발생시키는 대신 메시지를 버리는 동작을 선호한다면 default-output-channel="nullChannel"을 설정하면 된다.

8.1.2. Common Router Parameters

이 섹션에선 모든 라우터에서 공통으로 사용하는 파라미터들을 설명한다 (이전 챕터에서 보여준 두 테이블에서 모든 칸에 체크되어 있는 파라미터들).

Inside and Outside of a Chain

다음 파라미터들은 체인 내부와 바깥에 있는 모든 라우터에서 유효하다.

Top-Level (Outside of a Chain)

다음 파라미터들은 체인 바깥에 있는 최상위 라우터에서만 유효하다.

8.1.3. Router Implementations

메시지를 컨텐츠 기반으로 라우팅할 땐 종종 도메인에 특화된 로직이 필요하기 때문에, 대부분의 유스 케이스에선 Spring Integration의 XML 네임스페이스나 어노테이션을 이용해 POJO에 동작을 위임할 수 있는 옵션이 필요할 거다. 이 두 가지는 모두 뒤에서 논한다. 여기서는 먼저, 흔히 있는 요구 사항을 충족하는 몇 가지 구현체들을 소개한다.

PayloadTypeRouter

아래 보이는 PayloadTypeRouter는 페이로드 타입에 매핑돼있는 채널로 메시지를 전송한다:

<bean id="payloadTypeRouter"
      class="org.springframework.integration.router.PayloadTypeRouter">
    <property name="channelMapping">
        <map>
            <entry key="java.lang.String" value-ref="stringChannel"/>
            <entry key="java.lang.Integer" value-ref="integerChannel"/>
        </map>
    </property>
</bean>

PayloadTypeRouter는 Spring Integration에서 제공하는 네임스페이스로도 설정할 수 있다 (Namespace Support 참고). Spring Integration의 네임스페이스를 이용하면 사실상 <router/> 설정과 그에 따른 구현체(<bean/> 요소로 정의한)를 하나의 요소로 정의할 수 있어 더 간단해진다. 다음 예제는 위와 동일한 설정이지만, 네임스페이스 지원을 이용한 PayloadTypeRouter 설정을 보여준다:

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String" channel="stringChannel" />
    <int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>

다음은 자바를 이용한 동일한 라우터 설정 예시다:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

자바 DSL을 사용할 땐 두 가지 옵션이 있다.

먼저, 위에 있는 예제에서처럼 라우터 객체를 정의하는 방법이 있다:

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlows.from("routingChannel")
            .route(router())
            .get();
}

public PayloadTypeRouter router() {
    PayloadTypeRouter router = new PayloadTypeRouter();
    router.setChannelMapping(String.class.getName(), "stringChannel");
    router.setChannelMapping(Integer.class.getName(), "integerChannel");
    return router;
}

라우터는 @Bean이어도 되지만 반드시 그래야 하는 것은 아니다. @Bean이 아니면 플로우에서 라우터를 등록한다.

두 번째 방법으로는, 다음과 같이 DSL 플로우 안에서 라우팅 함수를 정의할 수 있다:

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlows.from("routingChannel")
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(String.class, "stringChannel")
                    .channelMapping(Integer.class, "integerChannel"))
            .get();
}

HeaderValueRouter

HeaderValueRouter는 헤더 값마다 설정된 매핑 정보를 이용해 메시지를 전송할 채널을 결정한다. HeaderValueRouter를 생성할 땐 평가할 헤더의 이름으로 초기화를 진행한다. 이때 헤더의 값에는 아래 두 가지 중 하나를 사용할 수 있다:

헤더에 임의의 값을 저장할 땐 이런 헤더 값을 채널 이름으로 매핑해주는 설정이 별도로 필요하다. 그 외는 추가 설정은 필요하지 않다.

Spring Integration은 HeaderValueRouter를 위한 간단한 네임스페이스 기반 XML 설정을 제공한다. 다음은 헤더 값을 채널로 매핑해주는 설정을 가지고 있는 HeaderValueRouter 설정 예시다:

<int:header-value-router input-channel="routingChannel" header-name="testHeader">
    <int:mapping value="someHeaderValue" channel="channelA" />
    <int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>

위에서 정의한 라우터는 리졸브 과정에서 채널을 결정하지 못하면 예외가 발생할 수 있다. 이때 예외를 던지기보단 리졸브되지 않은 메시지는 디폴트 출력 채널(default-output-channel 속성으로 정의)로 전송하고 싶다면 resolution-requiredfalse로 설정해라.

메시지가 가진 헤더 값에 매핑된 채널이 없으면 일반적으로 default-output-channel로 전송된다. 하지만 헤더 값에 매핑된 채널은 있지만 해당 채널을 리졸브할 수 없는 경우, resolution-required 속성을 false로 설정해야 default-output-channel로 메시지를 라우팅한다.

이 속성은 Spring Integration 2.1부터 ignore-channel-name-resolution-failures에서 resolution-required로 변경됐다. resolution-required 속성의 기본값은 true다.

다음은 자바를 이용한 동일한 라우터 설정 예시다:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

자바 DSL을 사용할 땐 두 가지 옵션이 있다. 먼저, 위에 있는 예제에서처럼 라우터 객체를 정의하는 방법이 있다:

@Bean
public IntegrationFlow routerFlow1() {
    return IntegrationFlows.from("routingChannel")
            .route(router())
            .get();
}

public HeaderValueRouter router() {
    HeaderValueRouter router = new HeaderValueRouter("testHeader");
    router.setChannelMapping("someHeaderValue", "channelA");
    router.setChannelMapping("someOtherHeaderValue", "channelB");
    return router;
}

라우터는 @Bean이어도 되지만 반드시 그래야 하는 것은 아니다. @Bean이 아니면 플로우에서 라우터를 등록한다.

두 번째 방법으로는, 다음과 같이 DSL 플로우 안에서 라우팅 함수를 정의할 수 있다:

@Bean
public IntegrationFlow routerFlow2() {
    return IntegrationFlows.from("routingChannel")
            .route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
                    m -> m
                        .channelMapping("someHeaderValue", "channelA")
                        .channelMapping("someOtherHeaderValue", "channelB"),
                e -> e.id("headerValueRouter"))
            .get();
}

헤더 값 자체가 채널 이름을 나타낸다면 헤더 값을 채널 이름에 매핑해주는 설정은 필요하지 않다. 다음은 이와 같이 헤더 값을 채널 이름에 매핑할 필요가 없는 라우터를 보여주는 예시다:

<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>

채널 리졸브 동작은 Spring Integration 2.1부터 더 명확해졌다. 예를 들어 default-output-channel 속성을 생략하면, 라우터가 유효한 채널을 하나도 리졸브하지 못했고 resolution-requiredfalse로 설정해 채널 리졸브 실패를 전부 무시한 경우 MessageDeliveryException이 발생한다.

라우터는 기본적으로 메시지를 적어도 하나의 채널로는 라우팅할 수 있어야 한다. 정말로 메시지가 버려지길 원한다면 default-output-channel 역시 nullChannel로 설정해줘야 한다.

RecipientListRouter

RecipientListRouter는 메시지를 받으면 정적으로 정의돼있는 메시지 채널 목록으로 전송해준다. 다음은 RecipientListRouter를 생성하는 예시다:

<bean id="recipientListRouter"
      class="org.springframework.integration.router.RecipientListRouter">
    <property name="channels">
        <list>
            <ref bean="channel1"/>
            <ref bean="channel2"/>
            <ref bean="channel3"/>
        </list>
    </property>
</bean>

Spring Integration은 아래 예제에 보이는 RecipientListRouter 전용 네임스페이스도 지원하고 있다 (네임스페이스 지원 참고):

<int:recipient-list-router id="customRouter" input-channel="routingChannel"
        timeout="1234"
        ignore-send-failures="true"
        apply-sequence="true">
  <int:recipient channel="channel1"/>
  <int:recipient channel="channel2"/>
</int:recipient-list-router>

다음은 자바를 이용한 동일한 라우터 설정 예시다:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
    RecipientListRouter router = new RecipientListRouter();
    router.setSendTimeout(1_234L);
    router.setIgnoreSendFailures(true);
    router.setApplySequence(true);
    router.addRecipient("channel1");
    router.addRecipient("channel2");
    router.addRecipient("channel3");
    return router;
}

다음은 Java DSL을 이용한 동일한 라우터 설정 예시다:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
            .routeToRecipients(r -> r
                    .applySequence(true)
                    .ignoreSendFailures(true)
                    .recipient("channel1")
                    .recipient("channel2")
                    .recipient("channel3")
                    .sendTimeout(1_234L))
            .get();
}

여기서 ‘apply-sequence’ 플래그의 효과는 publish-subscribe-channel에서와 동일하며, publish-subscribe-channel과 마찬가지로 recipient-list-router에서도 기본적으로 비활성화돼있다. 자세한 내용은 PublishSubscribeChannel 설정을 참고해라.

RecipientListRouter를 설정할 땐 SpELSpring Expression Language을 이용해 메시지를 받을 각 채널마다 셀렉터를 지정할 수도 있다. ‘선택적인 컨슈머’로 동작시키기 위해 ‘체인’ 앞 부분에 필터를 두는 것과 유사하다. 하지만 이 경우엔 아래 예제에서 볼 수 있듯이 라우터 설정 하나만으로 구성할 수 있다:

<int:recipient-list-router id="customRouter" input-channel="routingChannel">
    <int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
    <int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>

위 설정에선 selector-expression 속성에 있는 SpEL 표현식을 평가해서, 입력 메시지가 주어졌을 때 해당 채널recipient을 recipient 목록에 포함시킬지를 결정한다. 이때 표현식을 평가하면 반드시 boolean이 나와야 한다. 이 속성을 정의하지 않은 채널은 항상 recipient 목록에 포함된다.

RecipientListRouterManagement

4.1 버전부터 RecipientListRouter는 런타임에 동적으로 수신자recipient를 조작할 수 있는 여러 가지 동작들을 제공한다. 이런 관리성management 연산은 RecipientListRouterManagement를 통해 제공되며, @ManagedResource 어노테이션을 이용한다. 다음 예제와 같이 컨트롤 버스를 활용할 수도 있고, JMX를 이용하는 방법도 있다:

<control-bus input-channel="controlBus"/>

<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
   <recipient channel="channel1"/>
</recipient-list-router>

<channel id="channel2"/>
messagingTemplate.convertAndSend(controlBus, "@'simpleRouter.handler'.addRecipient('channel2')");

애플리케이션이 기동하는 시점에는 simpleRouterchannel1이라는 수신자recipient가 하나만 존재한다. 하지만 addRecipient 명령어를 실행하고 나면 channel2가 추가된다. “메시지가 담고 있는 어떤 정보에 새로 관심을 표하는” 상황이라고 할 수 있다. 일정 기간 동안만 라우터가 전달하는 메시지에 관심 있다면, 이 recipient-list-router를 구독할 것이며, 어느 시점에는 구독을 취소하면 된다.

런타임에 <recipient-list-router>를 관리해주는 이 management 연산 덕분에, 맨 처음엔 <recipient>를 설정하지 않아도 된다. 이때는 메시지에 매칭되는 수신자recipient가 없을 때의 RecipientListRouter 동작과 동일하다. defaultOutputChannel이 설정돼 있으면 메시지는 이곳으로 보내진다. 그 외는 MessageDeliveryException을 던진다.

XPath Router

XPath 라우터는 XML 모듈에 들어있다. XPath를 이용해 XML 메시지 라우팅하기를 참고해라.

Routing and Error Handling

Spring Integration은 에러 메시지를 라우팅하기 위한 특별한 타입 기반 라우터 ErrorMessageExceptionTypeRouter를 제공한다. 에러 메시지는 payloadThrowable 인스턴스인 메시지로 정의할 수 있다. ErrorMessageExceptionTypeRouterPayloadTypeRouter와 유사하다. 사실 거의 똑같다고 봐도 된다. 유일한 차이점은 PayloadTypeRouter는 페이로드 인스턴스의 인스턴스 계층 구조를 탐색해서 (ex. payload.getClass().getSuperclass()) 가장 구체적인 타입과 매핑된 채널 정보를 찾는 반면, ErrorMessageExceptionTypeRouter는 ‘exception causes’의 계층 구조를 탐색해 (ex. payload.getCause()) 가장 구체적인 Throwable 타입과 매핑된 채널을 찾으며, mappingClass.isInstance(Cause)를 사용해 cause를 현재 클래스나 다른 슈퍼 클래스에 매칭해본다는 점이다.

여기서는 채널을 매핑하는 순서가 중요하다. 즉, RuntimeException이 아닌 IllegalArgumentException에 매핑된 정보를 가져와야 한다면, 라우터엔 RuntimeException 항목을 먼저 구성해야 한다.

4.3 버전부터 ErrorMessageExceptionTypeRouter는 초기화 단계에서 모든 매핑 클래스를 로드하기 때문에, 문제가 있다면 미리 ClassNotFoundException이 발생한다.

다음은 ErrorMessageExceptionTypeRouter를 설정하는 예시다:

<int:exception-type-router input-channel="inputChannel"
                           default-output-channel="defaultChannel">
    <int:mapping exception-type="java.lang.IllegalArgumentException"
                 channel="illegalChannel"/>
    <int:mapping exception-type="java.lang.NullPointerException"
                 channel="npeChannel"/>
</int:exception-type-router>

<int:channel id="illegalChannel" />
<int:channel id="npeChannel" />

8.1.4. Configuring a Generic Router

Spring Integration은 범용 라우터를 하나 제공하므로, 범용적인 목적으로 라우팅하는 데에선 이 라우터를 활용하면 된다 (Spring Integration이 제공하는, 각자의 분야에 특화돼 있는 다른 라우터들과는 상반된다).

Configuring a Content-based Router with XML

router 요소를 이용하면 라우터를 입력 채널에 연결할 수 있으며, 선택적으로 default-output-channel 속성도 지정할 수 있다. ref 속성으로는 커스텀 라우터 구현체의 빈 이름을 참조한다 (라우터 구현체는 반드시 AbstractMessageRouter를 상속하고 있어야 한다). 다음은 세 가지 범용 라우터 예시다:

<int:router ref="payloadTypeRouter" input-channel="input1"
            default-output-channel="defaultOutput1"/>

<int:router ref="recipientListRouter" input-channel="input2"
            default-output-channel="defaultOutput2"/>

<int:router ref="customRouter" input-channel="input3"
            default-output-channel="defaultOutput3"/>

<beans:bean id="customRouterBean" class="org.foo.MyCustomRouter"/>

아니면 ref로 내부에 @Router 어노테이션을 가지고 있는 POJO를 가리키는 방법도 있고 (뒤에서 보여준다), ref와 메소드 이름을 함께 명시하기도 한다. 메소드를 지정하는 방법도 뒤에 나오는 @Router 어노테이션 섹션에서 설명하는 방식과 동일하게 동작한다. 아래 예시에선 ref 속성으로 POJO를 가리키는 라우터를 정의하고 있다:

<int:router input-channel="input" ref="somePojo" method="someMethod"/>

특정 커스텀 라우터 구현체를 다른 <router> 정의에서도 참조할 수 있다면 보통 ref 속성을 사용하는 것이 좋다. 하지만 커스텀 라우터 구현체의 스코프를 하나의 <router> 정의 내로 한정하고 싶다면, 아래 예제와 같이 내부 빈 정의를 제공해도 된다:

<int:router method="someMethod" input-channel="input3"
            default-output-channel="defaultOutput3">
    <beans:bean class="org.foo.MyCustomRouter"/>
</int:router>

동일한 <router> 설정에서 ref 속성과 내부 핸들러 정의를 둘 다 사용하는 것은 허용하지 않는다. 둘 다 사용하면 조건이 모호해져 예외가 발생한다.

ref 속성으로 AbstractMessageProducingHandler를 상속한 빈을 참조하는 경우 (프레임워크에서 자체적으로 제공하는 라우터들처럼), 라우터를 직접 참조하도록 최적화된다. 이때는 각 ref 속성마다 별도 빈 인스턴스(또는 prototype 스코프 빈)를 참조하거나, 내부 <bean/> 설정을 이용해야 한다. 단, 여기서 말하는 최적화는 라우터 XML 정의에 특정 라우터의 전용 속성을 제공하지 않았을 때에만 적용된다. 무심코 여러 빈에서 동일한 메시지 핸들러를 참조한다면 설정 예외를 만나게될 거다.

다음은 자바를 이용한 동일한 라우터 설정 예시다:

@Bean
@Router(inputChannel = "routingChannel")
public AbstractMessageRouter myCustomRouter() {
    return new AbstractMessageRouter() {

        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            return // determine channel(s) for message
        }

    };
}

다음은 Java DSL을 이용한 동일한 라우터 설정 예시다:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
            .route(myCustomRouter())
            .get();
}

public AbstractMessageRouter myCustomRouter() {
    return new AbstractMessageRouter() {

        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            return // determine channel(s) for message
        }

    };
}

아니면 아래 예제처럼 메시지 페이로드에 있는 데이터를 통해 라우팅할 수도 있다:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
            .route(String.class, p -> p.contains("foo") ? "fooChannel" : "barChannel")
            .get();
}

8.1.5. Routers and the Spring Expression Language (SpEL)

간혹가다 보면 라우팅 로직이 매우 단순할 때도 있는데, 이럴땐 라우팅만을 위해 별도 클래스를 만들어 빈으로 설정하는 건 조금 과할 수 있다. 이전에는 간단한 계산에도 커스텀 POJO 라우터를 만들어야 했지만, Spring Integration 2.0부터는 대신 SpEL을 활용할 수 있다.

SpELSpring Expression Language에 관한 자세한 내용은 스프링 프레임워크 레퍼런스 가이드에서 관련 챕터를 확인해봐라.

일반적으로 SpEL 표현식은 다음과 같이 평가해서 그 결과를 채널에 매핑시킨다:

<int:router input-channel="inChannel" expression="payload.paymentType">
    <int:mapping value="CASH" channel="cashPaymentChannel"/>
    <int:mapping value="CREDIT" channel="authorizePaymentChannel"/>
    <int:mapping value="DEBIT" channel="authorizePaymentChannel"/>
</int:router>

다음은 자바를 이용한 동일한 라우터 설정 예시다:

@Router(inputChannel = "routingChannel")
@Bean
public ExpressionEvaluatingRouter router() {
    ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.paymentType");
    router.setChannelMapping("CASH", "cashPaymentChannel");
    router.setChannelMapping("CREDIT", "authorizePaymentChannel");
    router.setChannelMapping("DEBIT", "authorizePaymentChannel");
    return router;
}

다음은 Java DSL을 이용한 동일한 라우터 설정 예시다:

@Bean
public IntegrationFlow routerFlow() {
    return IntegrationFlows.from("routingChannel")
        .route("payload.paymentType", r -> r
            .channelMapping("CASH", "cashPaymentChannel")
            .channelMapping("CREDIT", "authorizePaymentChannel")
            .channelMapping("DEBIT", "authorizePaymentChannel"))
        .get();
}

SpEL 표현식 자체가 채널 이름으로 평가된다면 좀더 간결해진다:

<int:router input-channel="inChannel" expression="payload + 'Channel'"/>

위 설정을 보면, SpEL 표현식은 payload 값 뒤에 리터럴 String ‘Channel’을 붙이고 있으며, 이를 계산한 결과를 채널로 사용하고 있다.

라우터를 설정할 때 SpEL을 사용하면 좋은 점이 또 있는데, 바로, 표현식은 Collection을 반환할 수 있고, 사실상 모든 <router>를 recipient list로 만들 수 있다는 점이다. 표현식에서 채널 값으로 여러 개의 값을 반환하기만 하면 각각의 채널로 메시지를 전달해준다. 다음은 값을 여러 개 반환하는 표현식 예시다:

<int:router input-channel="inChannel" expression="headers.channels"/>

위 설정에선, 메시지에 ‘channels’라는 헤더가 담겨있고 그 값이 채널 이름의 List라면, 이 리스트에 들어있는 채널로 각각 메시지를 전송한다. 여러 채널을 선택해야 하는 경우엔 collection projection과 collection selection 표현식이 적합할 수도 있다. 더 자세한 내용은 아래 링크를 참고해라:

Configuring a Router with Annotations

메소드 위에 @Router를 선언했다면 이 메소드에선 MessageChannel이나 String 타입을 반환할 수 있다. 후자의 경우 채널에서와 마찬가지로 채널명으로 바로 리졸브한다. 또한 이 메소드는 단일 값을 반환할 수도 있지만, 컬렉션을 반환할 수도 있다. 컬렉션을 반환하면 응답 메시지를 여러 개의 채널로 전송한다. 정리하자면, 아래 있는 메소드 시그니처들을 모두 사용할 수 있다:

@Router
public MessageChannel route(Message message) {...}

@Router
public List<MessageChannel> route(Message message) {...}

@Router
public String route(Foo payload) {...}

@Router
public List<String> route(Foo payload) {...}

꼭 페이로드를 기반으로 라우팅해야 하는 것은 아니며, 메시지 헤더 안에 있는 프로퍼티나 attribute 등의 메타데이터를 기반으로도 라우팅할 수 있다. 헤더를 활용할 땐 @Router를 달아준 메소드에 @Header를 선언한 파라미터를 추가할 수 있다. 아래 예제에서 확인할 수 있듯이, 이 파라미터에는 헤더의 값이 매핑된다. 자세한 내용은 어노테이션 지원에서 설명하고 있다:

@Router
public List<String> route(@Header("orderStatus") OrderStatus status)

XPath 지원을 포함해서, XML 기반 메시지 라우팅에 대해 알아보려면 XML 지원 - XML 페이로드 처리하기를 참고해라.

라우터 설정을 좀더 자세히 알아보려면 Java DSL 챕터에 있는 메시지 라우터를 함께 읽어봐라.

8.1.6. Dynamic Routers

Spring Integration에선 POJO로 커스텀 라우터를 구현할 수도 있지만, 흔히 쓰는 컨텐츠 기반 라우팅을 위한 설정도 다양하게 제공하고 있다. 예를 들어 PayloadTypeRouter로는 전달받은 메시지의 페이로드 타입을 기반으로 채널을 계산하는 라우터를 간단하게 설정할 수 있으며, HeaderValueRouter는 메시지에 있는 특정 헤더 값을 평가해서 채널을 계산해준다. 표현식을 평가해서 채널을 결정하는 표현식 기반(SpEL) 라우터도 존재한다. 이런 라우터들은 모두 어느 정도 동적인 특성을 지니고 있다.

하지만 이 라우터들은 모두 정적인 설정을 필요로 한다. 표현식 기반 라우터라고 하더라도, 표현식 자체는 라우터 설정에 정의해놔야 한다. 다시 말해, 같은 표현식을 같은 값으로 실행하면 언제나 같은 채널을 계산해낸다. 이런 식의 라우팅 규칙은 명확해서 예측하기도 쉽기 때문에 대부분의 케이스에 잘 활용할 수 있다. 하지만 간혹 라우터 설정을 동적으로 변경해서 메시지 플로우를 또 다른 채널로 라우팅해야 하는 경우가 있다.

예를 들어 유지 보수를 위해 잠시 일부 시스템을 중단하고 일시적으로 메시지들을 다른 메시지 플로우로 라우팅할 수 있다. 또 다른 예시로는, 라우팅 로직을 하나 더 추가해서, (PayloadTypeRouter의 경우) java.lang.Number가 아닌 좀더 구체적인 타입을 처리하도록 메시지 플로우를 세분화할 수도 있다.

하지만 안타깝게도 정적인 라우터 설정을 사용하고 있었다면 이런 목표들을 달성할 땐, 전체 애플리케이션을 중단하고 라우터 설정을 변경한 뒤 (routes 수정) 애플리케이션을 다시 기동해야 한다. 단언컨데 이런 솔루션을 원하는 사람은 없을 거다.

다이나믹 라우터 패턴에선 시스템이나 개별 라우터 중단 없이 라우터를 동적으로 변경하거나 설정할 수 있는 메커니즘을 다루고 있다.

Spring Integration이 동적인 라우팅을 어떻게 지원하는지에 대해 자세히 알아보기 전에 먼저, 라우터의 일반적인 흐름을 생각해볼 필요가 있다:

  1. 채널 식별자를 계산한다. 채널 식별자란 라우터가 메시지를 받아 계산하는 값을 말한다. 보통은 문자열이거나, 실제 MessageChannel의 인스턴스다.
  2. 채널 식별자를 채널 이름으로 리졸브한다. 이 프로세스에 관해서는 뒤에서 자세히 논한다.
  3. 이 채널명을 실제 MessageChannel로 리졸브한다.

첫 번째 단계에서 실제 MessageChannel의 인스턴스를 가져오게 되면 동적인 라우팅에 관해서는 할 수 있는 일이 많지 않다. 어떤 라우터라도 MessageChannel이 최종 결과물이기 때문이다. 하지만 첫 번째 스텝에서 계산한 채널 식별자가 MessageChannel 인스턴스가 아니라면 다양한 방법으로 MessageChannel을 얻어오는 프로세스에 변화를 줄 수 있다. 아래 있는 페이로드 타입 라우터를 예시로 들어보자:

<int:payload-type-router input-channel="routingChannel">
    <int:mapping type="java.lang.String"  channel="channel1" />
    <int:mapping type="java.lang.Integer" channel="channel2" />
</int:payload-type-router>

페이로드 타입 라우터의 컨텍스트 내에서 살펴보면, 앞서 언급한 세 단계의 스텝은 다음과 같이 진행된다:

  1. 페이로드 타입의 풀네임fully qualified name에 해당하는 채널 식별자를 계산한다 (ex. java.lang.String).
  2. 이 채널 식별자를 채널 이름으로 리졸브한다. 이때는 이전 스텝에서 얻은 값을 이용해 페이로드 타입 매핑 정보에서 적절한 값을 선택한다. 매핑 정보는 mapping 요소에 정의돼있다.
  3. 이 채널명을 실제 MessageChannel 인스턴스로 리졸브한다. 이때는 앞 스텝에서 계산한 이름으로 식별된 애플리케이션 컨텍스트 내 빈(MessageChannel)을 참조한다.

즉, 각 스텝이 진행되는 동안에는 다음 스텝을 위한 값을 계산한다.

이번에는 header value 라우터를 예로 들어보겠다:

<int:header-value-router input-channel="inputChannel" header-name="testHeader">
    <int:mapping value="foo" channel="fooChannel" />
    <int:mapping value="bar" channel="barChannel" />
</int:header-value-router>

이제 header value 라우터에서는 이 세 단계의 스텝이 어떻게 동작할지 생각해보자:

  1. 채널 식별자를 계산한다. 여기서 채널 식별자는 헤더의 값을 뜻하며, header-name 속성으로 식별한다.
  2. 이 채널 식별자를 채널 이름으로 리졸브한다. 이때는 이전 스텝에서 얻은 값을 이용해 일반적인 매핑 정보에서 적절한 값을 선택한다. 매핑 정보는 mapping 요소에 정의돼있다.
  3. 이 채널명을 실제 MessageChannel 인스턴스로 리졸브한다. 이때는 앞 스텝에서 계산한 이름으로 식별된 애플리케이션 컨텍스트 내 빈(MessageChannel)을 참조한다.

앞에서 살펴본 두 가지 유형의 라우터 설정은 거의 동일해 보인다. 하지만 HeaderValueRouter의 또 다른 설정을 살펴보면, 이번에는 분명히 하위에 mapping 요소가 없다는 게 보일 거다:

<int:header-value-router input-channel="inputChannel" header-name="testHeader">

하지만 이 설정 역시 아무런 문제 없는 설정이다. 자연스럽게 이어지는 질문으로 넘어가보면, 두 번째 스텝에서의 매핑은 어떻게 되는 걸까?

이제 두 번째 스텝은 생략할 수 있다. mapping이 정의돼있지 않은 경우, 첫 번째 스텝에서 계산한 채널 식별자 값을 자동으로 채널 이름으로 처리하며, 세 번째 스텝에선 이 값을 실제 MessageChannel로 리졸브한다. 즉, 두 번째 스텝이 라우터에 동적인 특성을 더해주는 핵심 단계라는 것을 의미한다. 두 번째 스텝을 통해 채널 식별자를 채널명으로 식별하는 방식에 변화를 줄 수 있으며, 궁극적으로 초기 채널 식별자로부터 최종 MessageChannel 인스턴스를 결정하는 프로세스를 수정할 수 있다.

예를 들어서 위 설정에서 채널 식별자로 활용하는 (첫 번째 스텝) testHeader 값이 ‘kermit’이라고 가정해보자. 이 라우터에는 매핑 정보가 없기 때문에 이 채널 식별자를 채널 이름으로 리졸브하는 것은 불가능하며 (두 번째 스텝), 이제 이 채널 식별자를 채널명으로 취급한다. 반대로 매핑 정보는 있는데 다른 값이 들어있었다면 어떻게 될까? 최종 결과는 동일한데, 그 이유는 채널 식별자를 채널명으로 리졸브하는 과정에서 값을 결정할 수 없으면 채널 식별자가 채널 이름이 되기 때문이다.

이제 세 번째 스텝을 통해 채널 이름(‘kermit’)을, 이 이름으로 식별하는 실제 MessageChannel 인스턴스로 리졸브하는 일이 남아있다. 이때는 기본적으로 주어진 이름에 해당하는 빈을 조회해본다. 이제 헤더에 testHeader=kermit이 담겨 있는 모든 메시지는 빈 이름(id)이 ‘kermit’인 MessageChannel로 라우팅된다.

이번엔 이 메시지들을 ‘simpson’ 채널로 라우팅하려면 어떻게 해야 할까? 정적인 설정을 수정하는 방법도 있지만, 이렇게 되면 시스템을 중단시켜야 한다. 반면 채널 식별자 맵에 액세스할 수 있다면, kermit=simpson을 새로 매핑할 수 있다. 이렇게 하면 두 번째 스텝에서 ‘kermit’을 ‘simpson’이란 채널로 리졸브되는 채널 식별자로 취급한다.

PayloadTypeRouter에서도 동일하다. 특정 페이로드 타입을 다시 매핑하거나 기존 매핑을 제거하면 된다. 사실, 표현식 기반 라우터는 물론, 다른 라우터들도 전부 동일하다. 매핑을 추가해주면 이제 라우터에서 계산하는 값들은 실제 채널 이름을 리졸브하기 위한 두 번째 스텝을 거칠 수 있기 때문이다.

channelMappingAbstractMappingMessageRouter 단에서 정의되기 때문에 , AbstractMappingMessageRouter를 상속한 라우터는 전부 다이나믹 라우터라고 할 수 있다 (프레임워크에서 정의하는 대부분의 라우터가 그렇다). 이 맵의 setter 메소드는 public으로 노출돼있으며, ‘setChannelMapping’, ‘removeChannelMapping’ 메소드도 함께 제공한다. 따라서 라우터 자체에 대한 참조만 가지고 있다면 이 메소드들을 이용해 런타임에 라우터 매핑을 변경, 추가, 제거할 수 있다. JMX(JMX 지원 참고)나 Spring Integration 컨트롤 버스(컨트롤 버스 참고) 기능을 통해서도 매핑 정보를 수정할 수 있다는 뜻이기도 하다.

채널 키를 채널 이름의 폴백으로 활용하면 다양한 상황을 쉽고 유연하게 대처할 수 있다. 하지만 메시지 생성자를 신뢰할 수 없는 상황이라면, 시스템을 잘 아는 누군가가 악의적으로 메시지를 생성해 예상치 못한 채널로 라우팅시킬 수도 있다. 예를 들어 키가 라우터의 입력 채널 이름으로 설정돼 있다면, 이런 메시지는 해당 라우터로 다시 라우팅되며, 종국엔 스택 오버플로 에러를 일으킨다. 따라서 이 기능은 비활성화하고 (channelKeyFallback 속성을 false로 설정) 필요한 경우 매핑 정보를 변경하는 게 더 좋을 수도 있다.

Manage Router Mappings using the Control Bus

라우터의 매핑 정보를 관리하는 방법 중에는 컨트롤 버스 패턴을 이용하는 방법이 있다. 이 패턴은 컨트롤 채널이라는 별도 채널로 메시지를 전송해 라우터를 포함한 Spring Integration 구성 요소들을 관리하고 모니터링하는 패턴이다.

컨트롤 버스에 관한 자세한 내용은 컨트롤 버스를 확인해봐라.

일반적으로 컨트롤 메시지를 전송할 땐, 메시지를 통해 관리 중인 특정 컴포넌트(라우터 등)에서 특정한 작업을 실행하도록 요청한다. 다음은 라우터의 리졸브 프로세스를 변경하는 관리성managed 연산(메소드)들이다:

이 메소드들은 단순한 변경 작업에 활용할 수 있다 (라우팅 정보 하나를 업데이트하거나 삭제하는 등). 하지만 라우팅 정보 하나를 지우고서 다른 라우팅 정보를 추가하려는 경우, 이 두 번의 업데이트는 원자적atomic이지 않다는 점에 주의하자. 즉, 두 번의 업데이트를 진행하는 찰나에는 라우팅 테이블이 이도저도 아닌 애매한 상태에 놓일 수 있다는 걸 의미한다. 4.0 버전부터는 컨트롤 버스를 이용해 전체 라우팅 테이블을 원자적으로 업데이트할 수 있다. 아래 메소드들을 이용하면 된다:

"@'router.handler'.replaceChannelMappings('foo=qux \n baz=bar')"

각각의 매핑 정보는 개행 문자(\n)로 구분한다. 코드를 통해 매핑 정보를 수정해야 한다면 type-safety 문제도 있기 때문에 setChannelMappings 메소드를 사용하는 것을 권장한다. replaceChannelMappings에선 키나 값이 String 객체가 아닌 정보는 무시한다.

Manage Router Mappings by Using JMX

스프링의 JMX 지원을 이용해 라우터 인스턴스를 하나 노출한 다음 자주 사용하는 JMX 클라이언트(ex. JConsole)를 사용해도 라우터 설정을 변경하는 연산(메소드)들을 관리할 수 있다.

스프링이 제공하는 JMX 관련 기능은 JMX 지원을 읽어봐라.

Routing Slip

Spring Integration은 4.1 버전부터 엔터프라이즈 통합 패턴 라우팅 슬립의 구현체를 제공한다. 이 구현체에선 routingSlip이라는 헤더를 활용한다. 엔드포인트에 outputChannel이 지정돼있지 않은 경우, AbstractMessageProducingHandler 인스턴스에서 이 routingSlip 헤더를 이용해 다음 채널을 결정한다. 이 패턴은 메시지 플로우를 결정하려면 여러 가지 라우터가 필요한, 복잡하고 동적인 환경에서 활용하기 좋다. 메시지가 output-channel을 가지고 있지 않은 엔드포인트에 도착하면 routingSlip을 참고해 메시지를 전송할 채널을 결정한다. 라우팅 슬립에서도 다음 채널을 찾지 못하면 일반적인 replyChannel 처리를 재개한다.

라우팅 슬립과 관련한 설정은 아래 보이는 HeaderEnricher 옵션으로 표현한다. 각 라우팅 슬립 path는 세미콜론으로 구분하고 있다:

<util:properties id="properties">
    <beans:prop key="myRoutePath1">channel1</beans:prop>
    <beans:prop key="myRoutePath2">request.headers[myRoutingSlipChannel]</beans:prop>
</util:properties>

<context:property-placeholder properties-ref="properties"/>

<header-enricher input-channel="input" output-channel="process">
    <routing-slip
        value="${myRoutePath1}; @routingSlipRoutingPojo.get(request, reply);
               routingSlipRoutingStrategy; ${myRoutePath2}; finishChannel"/>
</header-enricher>

위 예시에는 다음과 같은 설정이 있다:

라우팅 슬립 path 항목에는 MessageChannel 빈의 이름이나 RoutingSlipRouteStrategy 빈의 이름, 스프링 표현식(SpEL)을 담을 수 있다. RoutingSlipHeaderValueMessageProcessorprocessMessage 메소드가 최초로 실행됐을 때 각 라우팅 슬립 path 항목을 BeanFactory에서 조회해본다. 애플리케이션 컨텍스트 내에 있는 빈의 이름이 아닌 항목들은 ExpressionEvaluatingRoutingSlipRouteStrategy 인스턴스로 변환한다. RoutingSlipRouteStrategy 항목들은 null이나 비어있는 String을 반환하는 동안에는 반복해서 호출한다.

라우팅 슬립은 getOutputChannel 프로세스 중에 사용하는 패턴이기 때문에, request-reply 조합마다 컨텍스트를 가진다. RoutingSlipRouteStrategy는 현재 requestMessagereply 객체로 다음 outputChannel을 결정하는 용도로 도입했다. 이 strategy 구현체는 애플리케이션 컨텍스트에 빈으로 등록되어야 하며, 이 빈의 이름은 라우팅 슬립 path에서 사용할 수 있다. 현재는 ExpressionEvaluatingRoutingSlipRouteStrategy라는 구현체를 제공하고 있다. 이 구현체는 SpEL 표현식을 받으며 내부 ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReply 객체를 평가 컨텍스트의 루트 객체로 사용한다. 덕분에 ExpressionEvaluatingRoutingSlipRouteStrategy.getNextPath()를 호출할 때마다 EvaluationContext를 생성하는 오버헤드를 피할 수 있다. ExpressionEvaluatingRoutingSlipRouteStrategy.RequestAndReplyMessage<?> requestObject reply라는 두 가지 속성을 가지고 있는 간단한 자바 빈이다. 이 구현체를 사용하면 SpEL을 통해 라우팅 슬립 path 항목을 지정할 수 있으며 (ex. @routingSlipRoutingPojo.get(request, reply), request.headers[myRoutingSlipChannel]), RoutingSlipRouteStrategy 빈을 따로 정의하지 않아도 된다.

requestMessage 인자는 언제나 Message<?>다. 반면 reply 객체는 컨텍스트에 따라 Message<?>일 수도, AbstractIntegrationMessageBuilder일 수도 있으며, 애플리케이션에서 사용하는 임의의 도메인 객체일 수도 있다 (ex. 서비스 activator로 실행한 POJO 메소드가 반환한 객체). 앞의 두 케이스라면, SpEL(또는 자바 구현체)에서 평소대로 Message 프로퍼티(payloadheaders)를 사용할 수 있다. 하지만 임의의 도메인 객체라면 이 프로퍼티를 사용할 수 없다. 따라서 POJO 메소드의 실행 결과로 다음 path를 결정하는 경우 라우팅 슬립을 주의해서 사용해야 한다.

라우팅 슬립이 분산 환경과 연계된다면 라우팅 슬립 path에 인라인 표현식은 사용하지 않는 게 좋다. 이는 메시지 브로커(AMQP 지원 또는 JMS 지원)를 통해 request-reply를 주고받거나, 통합 플로우에서 영구persistent MessageStore(메시지 스토어)를 사용하는 교차 JVM 애플리케이션과 같은 분산 환경에서도 마찬가지다. 프레임워크는 RoutingSlipHeaderValueMessageProcessor를 사용해 path 값을 ExpressionEvaluatingRoutingSlipRouteStrategy 객체로 변환하며, 이 객체는 메시지 헤더 routingSlip에 채워진다. 이 클래스는 Serializable이 아니기 때문에 (BeanFactory에 의존하기 때문에 불가능하다), 전체 Message 역시 직렬화가 불가능하며, 모든 분산 작업에서 NotSerializableException을 만나게 된다. 따라서, 적합한 SpEL을 이용해 ExpressionEvaluatingRoutingSlipRouteStrategy 빈을 등록하고, 라우팅 슬립 path 설정에선 이 빈 이름을 사용해라.

자바 설정에선 다음과 같이 HeaderEnricher 빈 정의에 RoutingSlipHeaderValueMessageProcessor 인스턴스를 추가해주면 된다:

@Bean
@Transformer(inputChannel = "routingSlipHeaderChannel")
public HeaderEnricher headerEnricher() {
    return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
            new RoutingSlipHeaderValueMessageProcessor("myRoutePath1",
                                                       "@routingSlipRoutingPojo.get(request, reply)",
                                                       "routingSlipRoutingStrategy",
                                                       "request.headers[myRoutingSlipChannel]",
                                                       "finishChannel")));
}

엔드포인트가 응답을 생성했을 때 outputChannel이 정의돼있지 않은 경우, 라우팅 슬립 알고리즘은 다음과 같이 동작한다:

8.1.7. Process Manager Enterprise Integration Pattern

엔터프라이즈 통합 패턴 중에는 프로세스 매니저라는 패턴이 있다. 이제 커스텀 프로세스 매니저 로직을 작성하고 라우팅 슬립 안에서 RoutingSlipRouteStrategy로 캡슐화만 해주면 쉽게 이 패턴을 쉽게 구현할 수 있다. RoutingSlipRouteStrategy는 빈 이름 외에도 MessageChannel 객체라면 전부 반환할 수 있으며, 이 MessageChannel 인스턴스가 꼭 애플리케이션 컨텍스트 내 빈이어야 한다는 법은 없다. 어떤 채널을 사용해야 하는지를 미리 예측하기가 어려울 땐 이 패턴을 활용해 동적으로 메시지를 라우팅할 수 있다. MessageChannelRoutingSlipRouteStrategy 내에서 생성하고 반환해도 된다. 이런 케이스엔 MessageHandler 구현체가 고정으로 연결돼있는 FixedSubscriberChannel을 사용하는 것도 괜찮다. 예를 들어 다음 예제와 같이 리액티브 스트림즈로 라우팅할 수 있다:

@Bean
public PollableChannel resultsChannel() {
    return new QueueChannel();
}

@Bean
public RoutingSlipRouteStrategy routeStrategy() {
    return (requestMessage, reply) -> requestMessage.getPayload() instanceof String
            ? new FixedSubscriberChannel(m ->
            Mono.just((String) m.getPayload())
                    .map(String::toUpperCase)
                    .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)))
            : new FixedSubscriberChannel(m ->
            Mono.just((Integer) m.getPayload())
                    .map(v -> v * 2)
                    .subscribe(v -> messagingTemplate().convertAndSend(resultsChannel(), v)));
}

8.2. Filter

메시지 필터는 메시지 헤더나 컨텐츠같은 어떠한 기준에 따라 Message를 다음으로 전달할지, 아니면 버려야 할지를 결정한다. 즉, 메시지 필터는 라우터와 매우 유사하며, 필터의 입력 채널로 받은 메시지는 필터의 출력 채널로 전송할 수도, 전송하지 않을 수도 있다는 점만 다르다. 라우터와는 달리 메시지를 전송할 채널은 결정하지 않으며, 메시지를 전달할지 여부를 결정하는 게 전부다.

뒤에서도 설명하지만, 필터는 discard 채널도 지원한다. 사용하는 방법에 따라 boolean 조건을 기반으로 매우 간단한 라우터(또는 “스위치”)로 동작시킬 수 있다.

Spring Integration에서 메시지 필터를 설정할 땐 MessageSelector 인터페이스 구현체에 위임하는 메시지 엔드포인트를 만들면 된다. 이 인터페이스 자체는 아래에 보이는 것처럼 매우 단순하다:

public interface MessageSelector {

    boolean accept(Message<?> message);

}

MessageFilter 생성자는 아래 보이는 것처럼 selector 인스턴스를 하나 받는다:

MessageFilter filter = new MessageFilter(someSelector);

네임스페이스와 SpEL을 조합하면 자바 코드는 거의 없이도 쓸만한 필터들을 구성할 수 있다.

8.2.1. Configuring a Filter with XML

메시지를 선별해주는 엔드포인트를 만들려면 <filter> 요소를 사용하면 된다. input-channeloutput-channel 속성 외에도, ref라는 속성이 필요하다. ref에선 아래 예제와 같이 MessageSelector 구현체를 가리켜주면 된다:

<int:filter input-channel="input" ref="selector" output-channel="output"/>

<bean id="selector" class="example.MessageSelectorImpl"/>

아니면 method 속성을 추가하는 방법도 있다. 이때는 ref 속성에서 모든 객체를 참조할 수 있다. 참조하는 메소드에선 Message 타입이나 인바운드 메시지의 페이로드 타입을 받아 처리하면 되며, 반환 타입은 boolean이어야 한다. ‘true’를 반환하면 메시지를 출력 채널로 전송한다. 다음은 method 속성을 사용해 필터를 설정하는 예시다:

<int:filter input-channel="input" output-channel="output"
    ref="exampleObject" method="someBooleanReturningMethod"/>

<bean id="exampleObject" class="example.SomeObject"/>

selector 혹은 적당한 POJO 메소드에서 false를 반환해 거절한 메시지는 몇 가지 설정을 통해 제어한다. 기본적으로는 (앞의 예시처럼 설정한 경우) 거부된 메시지는 별다른 오류 없이 버려진다. 메시지를 거절할 때 에러를 발생시켜야 한다면 아래 예제처럼 throw-exception-on-rejection 속성을 true로 설정해라:

<int:filter input-channel="input" ref="selector"
    output-channel="output" throw-exception-on-rejection="true"/>

거절한 메시지를 특정 채널로 라우팅하고 싶다면, 다음 예제와 같이 discard-channel로 전송할 채널을 지정해라:

<int:filter input-channel="input" ref="selector"
    output-channel="output" discard-channel="rejectedMessages"/>

필터에 어드바이스 체인 적용하기도 함께 참고해라.

일반적으로 메시지 필터는 publish-subscribe 채널과 함께 사용한다. 같은 채널을 다양한 필터 엔드포인트가 구독할 수 있으며, 필터를 통해 다음 엔드포인트로 메시지를 전달할지를 결정하게 된다. 다음 엔드포인트는 지원하는 타입이라면 무엇이든 될 수 있다 (ex. 서비스 activator). point-to-point 입력 채널 하나와 여러 가지 출력 채널을 사용하는 메시지 라우터에선 메시지를 전송할 채널을 사전에 세팅해 놓는다면, 메시지 필터는 그대신 다양한 구독자별로 대응을 추가해나가는 엔드포인트라고 할 수 있다.

커스텀 필터 구현체를 다른 <filter> 정의에서도 참조할 수 있다면 ref 속성을 사용하길 권장한다. 반대로 커스텀 필터 구현체의 스코프가 단일 <filter> 요소로 한정된다면, 다음 예제와 같이 내부 빈 정의를 이용하는 게 좋다:

<int:filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
  <beans:bean class="org.foo.MyCustomFilter"/>
</filter>

동일한 <filter> 설정에서 ref 속성과 내부 핸들러 정의를 둘 다 사용하는 것은 허용하지 않는다. 둘 다 사용하면 조건이 모호해져 예외가 발생한다.

ref 속성으로 MessageFilter를 상속한 빈을 참조하는 경우 (프레임워크에서 자체적으로 제공하는 필터들처럼), 출력 채널을 필터 빈에 직접 주입하는 식으로 최적화된다. 이때는 각 ref 속성마다 별도 빈 인스턴스(또는 prototype 스코프 빈)를 참조하거나, 내부 <bean/> 설정을 이용해야 한다. 단, 여기서 말하는 최적화는 필터 XML 정의에 특정 필터의 전용 속성을 제공하지 않았을 때에만 적용된다. 무심코 여러 빈에서 동일한 메시지 핸들러를 참조하면 설정 예외를 만나게될 거다.

SpEL 지원을 시작하면서 Spring Integration은 필터 요소에 expression 속성을 추가했다. 이제 다음 예제와 같이 간단한 필터는 자바 코드 없이도 설정할 수 있다:

<int:filter input-channel="input" expression="payload.equals('nonsense')"/>

expression 속성 값으로 전달한 문자열은 평가 컨텍스트 내 메시지와 함께 SpEL 표현식으로 평가된다. 애플리케이션 컨텍스트 스코프 내 표현식 결과를 재사용해야 하는 경우, SpEL 레퍼런스 문서에서 정의하는 대로 #{} 표기법을 사용하면 된다:

<int:filter input-channel="input"
            expression="payload.matches(#{filterPatterns.nonsensePattern})"/>

표현식 자체를 동적으로 구성하고 싶다면 ‘expression’을 하위 요소로 추가하면 된다. 이렇게 하면 ExpressionSource에 있는 키를 통해 간접적으로 표현식을 리졸브한다. ExpressionSource는 전략 인터페이스로, 직접 구현해도 좋고, Spring Integration이 제공하는 구현체를 사용해도 된다. Spring Integration이 제공하는 구현체는 “리소스 번들”에서 표현식을 로드하고 지정 시간이 지나면 (초 단위) 변경 사항을 확인할 수 있다. 아래와 같이 설정해주면 내부 파일이 수정되는 경우 1분 이내로 표현식을 다시 로드할 수 있다:

<int:filter input-channel="input" output-channel="output">
    <int:expression key="filterPatterns.example" source="myExpressions"/>
</int:filter>

<beans:bean id="myExpressions"
    class="o.s.i.expression.ReloadableResourceBundleExpressionSource">
    <beans:property name="basename" value="config/integration/expressions"/>
    <beans:property name="cacheSeconds" value="60"/>
</beans:bean>

ExpressionSource 빈의 이름이 expressionSource라면 <expression> 요소에 source 속성을 지정하지 않아도 된다. 위 예제에선 참고용으로 표기해놓았다.

‘config/integration/expressions.properties’ 파일은 (리소스 번들을 로드하는 일반적인 방식으로 리졸브하는, locale 별 전용 확장파일도 가능) 다음과 같은 키/값 쌍을 가지고 있을 수 있다:

filterPatterns.example=payload > 100

속성이나 하위 요소로 expression을 사용하는 이 예제들은 모두 transformer, router, splitter, service-activator, header-enricher 요소에도 적용할 수 있다. 컴포넌트 유형마다 가지고 있는 시맨틱스와 역할에 따라, 메소드의 반환 값을 다르게 해석하고 그에 따라 평가 결과도 달라진다. 예를 들어, 라우터 컴포넌트라면 표현식에서 메시지 채널 이름으로 취급할 문자열을 반환할 수 있다. 하지만 메시지를 루트 객체로 두고 표현식을 평가한다는 점과, 앞에 ‘@’가 있으면 빈 이름을 리졸브하는 기본적인 기능은 Spring Integration 안에 있는 모든 핵심 EIP 컴포넌트에서 동일하다.

8.2.2. Configuring a Filter with Annotations

다음은 어노테이션을 이용해 필터를 설정하는 예시다:

public class PetFilter {
    ...
    @Filter  // (1)
    public boolean dogsOnly(String input) {
        ...
    }
}

(1) 이 메소드를 필터로 사용한다는 것을 나타내는 어노테이션. 이 클래스를 필터로 사용하려면 반드시 지정해야 한다.

XML 요소에서 제공하는 설정 옵션은 전부 @Filter 어노테이션에서도 사용할 수 있다.

이 필터는 XML 안에서 직접 참조해도 되고, 클래스 위에 @MessageEndpoint 어노테이션을 선언했다면 클래스패스 스캔을 통해 자동으로 감지할 수 있다.

어노테이션으로 설정한 엔드포인트에 어드바이스 체인 적용하기도 함께 참고해라.


8.3. Splitter

splitter는 메시지를 나눠서 독립적으로 처리할 수 있도록, 여러 메시지로 분할해 전송해주는 일을 담당한다. 보통은 파이프라인 뒷쪽에 aggregator를 가지고 있는 업스트림 프로듀서일 때가 많다.

8.3.1. Programming Model

분할 동작을 위한 API는 AbstractMessageSplitter라는 클래스에서 시작한다. 이 클래스는 새로 만드는 메시지에 적당한 메시지 헤더를 채우는 등 (CORRELATION_ID, SEQUENCE_SIZE, SEQUENCE_NUMBER), splitter에서 공통적으로 필요한 기능들을 캡슐화해놓은 MessageHandler 구현체다. 이런 헤더를 채움으로써 메시지와 그 처리 결과를 추적할 수 있게된다 (일반적인 상황에선 이런 헤더들은 다양한 엔드포인트에서 변환되어 만들어지는 메시지에 그대로 복사된다). 이런 값들은 복합composed 메시지 프로세서 등에서 활용할 수 있다.

다음은 AbstractMessageSplitter에서 가져온 코드다:

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

애플리케이션에 맞는 splitter를 구현하려면 AbstractMessageSplitter를 상속하고, splitMessage 메소드를 구현해 메시지 분할 로직을 넣어주면 된다. 이 메소드에선 다음 중 하나를 반환하면 된다:

Spring Integration에선 단일 인자를 받아 값을 반환하는 메소드를 하나 정의하기만 하면, 모든 POJO로 분할 알고리즘을 구현할 수 있다. POJO 메소드가 반환하는 값은 위에서 설명한 방법대로 해석된다. 입력 인자는 Message나 간단한 POJO로 정의할 수 있다. 후자의 경우 splitter엔 전달받은 메시지의 페이로드를 넘겨준다. 이 방법을 사용하면 애플리케이션 코드와 Spring Integration API와의 결합도를 낮출 수 있으며, 보통 테스트도 더 쉽기 때문에 더 권장하곤 한다.

Iterators

4.1 버전부터 AbstractMessageSplitter는 분할 결과를 Iterator 타입으로 생성할 수 있다. 주의할 점은, Iterator(혹은 Iterable)를 사용하면 내부 아이템 갯수에 액세스할 수 없으며, 그렇기 때문에 SEQUENCE_SIZE 헤더는 0으로 설정된다는 점이다. 즉, <aggregator>의 디폴트 SequenceSizeReleaseStrategy가 제대로 작동하지 않으며, 이 splitter에서 채운 CORRELATION_ID에 해당하는 그룹은 해제release되지 않고 incomplete 상태로 남아있게 된다. 이 경우 적절한 커스텀 ReleaseStrategy를 구현하거나, send-partial-result-on-expirygroup-timeout이나 MessageGroupStoreReaper와 함께 사용해야 한다.

5.0 버전부터 AbstractMessageSplitter는 가능한 경우 IterableIterator 객체의 사이즈를 결정할 수 있는 protected gatherSizeIfPossible() 메소드를 제공한다. 예를 들어 XPathMessageSplitter는 내부 NodeList 객체의 사이즈를 알아낼 수 있다. 게다가 5.0.9 버전부터는 com.fasterxml.jackson.core.TreeNode의 사이즈도 적절히 반환해준다.

Iterator 객체는 메시지를 분할하기 전 메모리에 전체 컬렉션을 만들어둘 필요가 없어 유용하다. 예를 들면 어떤 외부 시스템(ex. DataBase, FTP MGET)에서 이터레이션이나 스트림을 사용해 내부 아이템들을 채우는 경우가 그렇다.

Stream and Flux

5.0 버전부터 AbstractMessageSplitter는 분할 결과를 자바 Stream과 리액티브 스트림즈 Publisher 타입으로 생성할 수 있다. 이 경우 각 타입이 제공하는 iteration 기능에 맞게 타겟 Iterator를 생성한다.

덧붙이면, splitter의 출력 채널이 ReactiveStreamsSubscribableChannel의 인스턴스인 경우, AbstractMessageSplitterIterator 대신에 Flux로 된 결과를 생성하며, 이 출력 채널은 Flux를 구독해 다운스트림 플로우의 수요에 따라 back-pressure 기반으로 메시지를 분할해간다.

5.2 버전부터 splitter는 discardChannel 옵션을 지원한다. split 함수가 비어있는 컨테이너(컬렉션, 배열, 스트림, Flux 등)를 반환한 메시지는 이 discardChannel로 전송된다. 빈 컨테이너를 리턴했을 땐 순회할 아이템이 없어 outputChannel로 전송되지 않는다. split 결과가 null일 땐 그대로 반환해 플로우의 종료를 알린다.

8.3.2. Configuring a Splitter with XML

splitter는 다음과 같이 XML을 통해 설정할 수 있다:

<int:channel id="inputChannel"/>

<int:splitter id="splitter"            <!-- (1) -->
  ref="splitterBean"                   <!-- (2) -->
  method="split"                       <!-- (3) -->
  input-channel="inputChannel"         <!-- (4) -->
  output-channel="outputChannel"       <!-- (5) -->
  discard-channel="discardChannel" />  <!-- (6) -->

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>

(1) splitter의 ID는 선택사항이다.
(2) 애플리케이션 컨텍스트에 정의한 빈에 대한 참조. 이 빈은 반드시 앞 섹션에서 설명한 대로 분할 로직을 구현해야 하며, 생략할 수 있다. 빈 참조를 제공하지 않은 경우 input-channel에 도착한 메시지의 페이로드는 여러 가지 요소를 담고있는 java.util.Collection 구현체로 가정하고, 이 컬렉션에 디폴트 분할 로직을 적용한 뒤 output-channel로 전송한다.
(3) 분할 로직을 구현한 메소드 (빈에 정의돼 있는 메소드). 생략할 수 있다.
(4) splitter의 입력 채널. 생략할 수 없다.
(5) splitter가 받은 메시지를 분할해서 전송할 채널. 생략할 수 있다 (전달받은 메시지 자체에 응답 채널이 지정돼 있을 수 있기 때문).
(6) 분할 결과가 비어 있는 경우 해당 요청 메시지를 전송할 채널. 생략할 수 있다 (결과가 null이면 중단한다).

커스텀 splitter 구현체를 다른 <splitter> 정의에서도 참조할 수 있다면 보통 ref 속성을 사용하는 것이 좋다. 하지만 커스텀 splitter 구현체의 스코프를 단일 <splitter> 정의 내로 한정하고 싶다면, 다음 예제와 같이 내부 빈 정의를 제공해도 된다:

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>

동일한 <int:splitter> 설정에서 ref 속성과 내부 핸들러 정의를 둘 다 사용하는 것은 허용하지 않는다. 둘 다 사용하면 조건이 모호해져 예외가 발생한다.

ref 속성으로 AbstractMessageProducingHandler를 상속한 빈을 참조하는 경우 (프레임워크에서 자체적으로 제공하는 splitter들처럼), 출력 채널을 핸들러에 직접 주입하는 식으로 최적화된다. 이때는 각 ref 속성마다 별도 빈 인스턴스(또는 prototype 스코프 빈)를 참조하거나, 내부 <bean/> 설정을 이용해야 한다. 단, 여기서 말하는 최적화는 splitter XML 정의에 특정 splitter의 전용 속성을 제공하지 않았을 때에만 적용된다. 무심코 여러 빈에서 동일한 메시지 핸들러를 참조한다면 설정 예외를 만나게될 거다.

8.3.3. Configuring a Splitter with Annotations

@Splitter 어노테이션은 Message 타입이나 메시지 페이로드 타입을 받는 메소드에 선언할 수 있다. 이때 메소드는 어떠한 타입의 Collection을 반환해야 한다. 반환한 값들이 실제 Message 객체가 아니라면 각 항목을 페이로드로 갖는 Message로 감싼다. 결과로 만들어진 Message@Splitter를 정의한 엔드포인트에 지정된 출력 채널로 전송된다.

다음은 @Splitter 어노테이션을 사용해 splitter를 설정하는 방법을 보여주는 예시다:

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}

어노테이션으로 설정한 엔드포인트에 어드바이스 체인 적용하기, splitter, 파일 splitter도 함께 참고해라.


8.4. Aggregator

aggregator는 여러 메시지를 받아 하나의 메시지로 결합해주는 메시지 핸들러로, splitter와 정반대 개념이다. 사실 aggregator는 파이프라인 앞쪽에 splitter를 가지고있는 다운스트림 컨슈머인 경우가 많다.

기술적으로 접근하면 aggregator는 상태를 가지기 때문에stateful splitter보다 복잡하다. 집계할 메시지들을 따로 보관해야 하며, 메시지 그룹을 집계할 준비가 됐는지도 판단해야 한다. 이와 같은 이유로 aggregator에선 MessageStore가 필요하다.

8.4.1. Functionality

Aggregator는 그룹이 완전히 준비됐다고 판단될 때까지 관련 메시지들을 연계해서 저장하고 그룹으로 결합한다. 그룹이 준비되면 전체 메시지를 가지고 하나의 메시지를 만들어 출력 채널로 전송한다.

aggregator를 구현하려면 집계 로직을 제공해야 한다 (여러 메시지로 하나의 메시지를 만드는 로직). 여기서는 correlation과 release라는 두 가지 개념이 나온다.

Correlation은 집계할 메시지들을 어떻게 묶을지를 판단하는 거다. Spring Integration에선 기본적으로 메시지 헤더 IntegrationMessageHeaderAccessor.CORRELATION_ID를 기반으로 correlation을 진행한다. 같은 IntegrationMessageHeaderAccessor.CORRELATION_ID를 가지고 있는 메시지들은 함께 묶인다. 물론, correlation 전략을 커스텀해서 메시지를 묶는 방법을 직접 결정할 수도 있다. 이때는 CorrelationStrategy를 구현하면 된다 (뒤에서 다룬다).

메시지 그룹을 처리할 준비가 됐는지는 ReleaseStrategy를 통해 결정한다. aggregator의 디폴트 release 전략에선 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 헤더를 기준으로 시퀀스 내 모든 메시지를 가지고 있다고 판단되면 그룹이 준비됐다고 보고 처리를 시작한다release. 디폴트 전략을 재정의하려면 커스텀 ReleaseStrategy 구현체의 참조를 제공하면 된다.

8.4.2. Programming Model

Aggregation API엔 다양한 클래스가 들어있다:

AggregatingMessageHandler

AggregatingMessageHandler(AbstractCorrelatingMessageHandler의 하위 클래스)는 MessageHandler 구현체로, 다음과 같은 aggregator의 공통 기능을 캡슐화하고 있다 (다른 유스 케이스들도 함께):

메시지들을 어떻게 함께 묶어줄지 결정하는 일은 CorrelationStrategy 인스턴스에, 메시지 그룹을 release해도 되는지 결정하는 일은 ReleaseStrategy 인스턴스에 위임한다.

다음은 AbstractAggregatingMessageGroupProcessor에서 중요한 부분만 간략하게 나타낸 코드다 (aggregatePayloads 메소드는 개발자가 직접 구현한다):

public abstract class AbstractAggregatingMessageGroupProcessor
              implements MessageGroupProcessor {

    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        // default implementation exists
    }

    protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);

}

기본으로 제공하는 AbstractAggregatingMessageGroupProcessor 구현체는 DefaultAggregatingMessageGroupProcessor, ExpressionEvaluatingMessageGroupProcessor, MethodInvokingMessageGroupProcessor를 확인해봐라.

5.2 버전부터 AbstractAggregatingMessageGroupProcessorFunction<MessageGroup, Map<String, Object>> 전략을 이용해 출력 메시지에 사용할 헤더를 통합하고 계산(집계)할 수 있다. 디폴트 구현체 DefaultAggregateHeadersFunction은 그룹 내에서 충돌이 없는 헤더들을 전부 반환한다; 그룹 내 모든 메시지에 담겨있지 않으면 충돌로 간주하지 않는다. 충돌하는 헤더는 제외시킨다. 그외 임의의 MessageGroupProcessor 구현체에선 (AbstractAggregatingMessageGroupProcessor를 구현하지 않은 구현체) 이 함수와 새로 도입된 DelegatingMessageGroupProcessor를 함께 사용한다. 스프링은 기본적으로, 설정해준 함수를 AbstractAggregatingMessageGroupProcessor 인스턴스에 주입하며, 그 외 다른 구현체는 모두 DelegatingMessageGroupProcessor로 감싼다. AbstractAggregatingMessageGroupProcessorDelegatingMessageGroupProcessor의 로직상 차이점은, 후자는 delegate 전략을 호출하기 전에 미리 헤더를 계산하지 않고, delegate가 MessageAbstractIntegrationMessageBuilder를 반환하면 이 함수를 호출하지 않는다는 점이다. 이 경우 프레임워크는 타겟 구현체가 필요한 헤더들을 넣어서 반환했다고 가정한다. Function<MessageGroup, Map<String, Object>> 전략은 XML 설정에선 headers-function 참조 속성으로, Java DSL에선 AggregatorSpec.headersFunction() 옵션으로, 일반 자바 설정에선 AggregatorFactoryBean.setHeadersFunction으로 설정할 수 있다.

CorrelationStrategyAbstractCorrelatingMessageHandler가 가지고 있으며, 아래 보이는 것처럼 디폴트로는 메시지 헤더 IntegrationMessageHeaderAccessor.CORRELATION_ID를 기반으로 동작하는 구현체를 사용한다:

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
        CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
    ...
    this.correlationStrategy = correlationStrategy == null ?
        new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
    this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
    ...
}

메시지 그룹을 실제로 처리하는 기본 구현체는 DefaultAggregatingMessageGroupProcessor다. 이 클래스는 그룹으로 건내받은 페이로드의 List를 페이로드로 가지는 Message를 하나 생성한다. 이 구현체는 업스트림에 splitter나 publish-subscribe 채널, recipient list 라우터가 있을 때 간단한 scatter-gather 패턴을 구현하기 좋다.

나중에 집계할 목적으로 publish-subscribe 채널이나 recipient list 라우터를 사용한다면, apply-sequence 플래그를 활성화해줘야 한다. 그래야만 필요한 헤더들이 추가된다 (CORRELATION_ID, SEQUENCE_NUMBER, SEQUENCE_SIZE). Spring Integration에 있는 splitter에선 기본적으로 활성화돼 있지만, publish-subscribe 채널이나 recipient list 라우터는 이런 헤더가 필요 없는 매우 다양한 컨텍스트에서 사용할 수 있기 때문에 기본으로 활성화되지 않는다.

애플리케이션에 필요한 전용 aggregator 전략을 구현하려면 AbstractAggregatingMessageGroupProcessor를 상속받아 aggregatePayloads 메소드를 구현하면 된다. 하지만 다른 방법을 이용하면 Aggregation API와의 결합도는 낮게 유지한 채 집계 로직을 구현할 수 있는데, 이때는 XML이나 어노테이션으로 설정을 추가해주면 된다.

대개는 단일 인자로 java.util.List를 받는 메소드만 있으면 모든 POJO로 집계 알고리즘을 구현할 수 있다 (리스트에 객체도 담을 수 있다). 메시지들을 집계할 땐 다음과 같은 방법으로 메소드를 실행한다:

코드 복잡도나, 결합도, 테스트 난이도 등을 고려했을 땐, POJO를 통해 집계 로직을 구현하고 XML이나 어노테이션 설정을 추가하는 방식을 권장한다.

5.3 버전부터 AbstractCorrelatingMessageHandler는 메시지 그룹을 처리한 뒤 MessageBuilder.popSequenceDetails()를 호출해 메시지 헤더를 수정한다. 따라서 splitter-aggregator가 중첩돼있는 경우를 대응할 수 있다. 단, 메시지 그룹을 release한 결과가 메시지 컬렉션이 아닐 때만 호출한다. 이때는 타겟 MessageGroupProcessor가 메시지를 빌드하면서 MessageBuilder.popSequenceDetails()를 호출해야 한다.

MessageGroupProcessorMessage 하나를 반환하면, 그룹 내 첫 번째 메시지와 sequenceDetails가 일치할 때만 출력 메시지에서 MessageBuilder.popSequenceDetails()를 실행할 거다. (이전에는 MessageGroupProcessor에서 순수 페이로드나 AbstractIntegrationMessageBuilder를 반환했을 때만 실행했었다.)

이 기능은 새롭게 지원하는 boolean 프로퍼티 popSequence로 제어할 수 있다. 따라서 표준 splitter에서 세부 correlation 정보를 채워넣지 않았다면 MessageBuilder.popSequenceDetails()를 비활성해도 된다. 이 프로퍼티는 사실상 업스트림에서 applySequence = true로 설정돼있는 가장 가까운 AbstractMessageSplitter가 수행한 작업을 원복한다. 자세한 내용은 Splitter를 참고해라.

SimpleMessageGroup.getMessages() 메소드는 unmodifiableCollection을 반환한다. 따라서 집계 로직이 담긴 POJO 메소드에 Collection<Message> 파라미터가 있을 때 전달하는 인자는 완전히 같은 Collection 인스턴스이며, aggregator에 SimpleMessageStore를 사용할 땐 그룹을 release하고 나면 본래 Collection<Message>는 비워진다. 결과적으로 POJO의 Collection<Message> 변수도 aggregator 바깥으로 전달되면 비워진다. 별도 처리를 위해 해당 컬렉션은 유지하면서 release하고 싶다면 반드시 Collection을 새로 생성해야 한다 (ex. new ArrayList<Message>(messages)). 4.3 버전부터 스프링은 불필요한 객체를 생성하지 않기 위해 더 이상 메시지들을 새 컬렉션으로 복사하지 않는다.

MessageGroupProcessorprocessMessageGroup 메소드가 컬렉션을 반환한다면, 이 컬렉션은 반드시 Message<?> 객체의 컬렉션이어야 한다. 이 경우 메시지들은 각각 따로 release된다. 4.2 버전 이전에는 XML 설정을 통해 MessageGroupProcessor를 제공할 수 없었고, 집계에는 오직 POJO 메소드만 사용할 수 있었다. 이제는 스프링이 참조하는 빈(또는 내부 빈)이 MessageProcessor를 구현한 것을 감지하면 이 빈을 aggregator의 출력 프로세서로 사용한다.

커스텀 MessageGroupProcessor가 반환한 객체 컬렉션을 메시지 페이로드로 삼아 release하려면 직접 AbstractAggregatingMessageGroupProcessor를 상속받아 aggregatePayloads()를 구현해야 한다.

추가로, 4.2 버전부터 SimpleMessageGroupProcessor를 제공한다. 이 구현체는 앞에서 지정한 그룹의 메시지 컬렉션을 그대로 반환하므로, release된 메시지들은 개별적으로 전송된다.

이 클래스를 활용하면 aggregator를 메시지 barrier로 동작시킬 수 있다. 도착한 메시지들은 release 전략이 시행돼 해당 그룹을 개별 메시지들의 시퀀스로 release할 때까지 전송하지 않고 보류된다.

ReleaseStrategy

ReleaseStrategy 인터페이스는 다음과 같이 정의돼있다:

public interface ReleaseStrategy {

  boolean canRelease(MessageGroup group);

}

일반적으로는 java.util.List를 단일 인자로 받아 (리스트에 파라미터 타입을 지정해도 된다) boolean 값을 반환하는 메소드를 제공한다면 POJO로도 그룹의 준비 여부를 판단하는 로직을 구현할 수 있다. POJO의 메소드는 메시지가 새로 도착할 때마다 다음과 같은 방법으로 호출돼 그룹이 온전히 준비되었는지를 판단한다:

다음은 Message 타입 List에서 @ReleaseStrategy 어노테이션을 사용하는 예시다:

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<Message<?>>) {...}
}

다음은 String 타입 List에서 @ReleaseStrategy 어노테이션을 사용하는 예시다:

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<String>) {...}
}

POJO 기반 release 전략에선 위 두 예제에 보이는 시그니처에 따라, 아직 release되지 않은 메시지들의 Collection(Message 전체가 필요할 때)이나 페이로드 객체들의 Collection(타입 파라미터가 Message가 아닐 때)을 전달받는다. 대부분의 유스 케이스에선 이 두 가지로도 충분할 거다. 하지만 어떠한 이유로 전체 MessageGroup에 접근해야 한다면, ReleaseStrategy 인터페이스의 구현체를 제공해야 한다.

그룹이 release되기 전까지는 release 전략을 여러 번 반복해서 호출할 수 있으므로 규모가 큰 그룹을 처리할 때는 이런 메소드들이 호출되는 방식을 이해하고 있어야 한다. 가장 효율적인 방법은 ReleaseStrategy의 구현체를 사용하는 거다. ReleaseStrategy의 구현체는 aggregator가 직접 호출할 수 있기 때문이다. 그 다음으로 효율적인 방법은 파라미터 타입으로 Collection<Message<?>>를 사용하는 POJO 메소드다. Collection<Something> 타입을 이용하는 POJO 메소드가 가장 효율적이지 못하다. 프레임워크는 release 전략을 호출할 때마다 그룹에 있는 메시지들의 페이로드를 새 컬렉션으로 복사해야 한다 (게다가 페이로드를 Something으로 변환해야 할 수도 있다). Collection<?>을 사용하면 변환을 피할 수 있지만 Collection을 새로 만들어야 한다는 사실은 변하지 않는다.

이러한 이유로 규모가 큰 그룹을 사용할 때는 ReleaseStrategy를 구현하는 것이 좋다.

그룹을 release하고 집계할 때는, release되지 않았던 메시지들을 모두 처리해 그룹에서 제거한다. 그룹이 준비도 마쳤다면 (즉, 한 시퀀스의 모든 메시지가 도착했거나 정의한 시퀀스가 없는 경우) 그 그룹은 complete로 마킹된다. 이 그룹에 새 메시지가 도착하면 전부 discard 채널(정의했다면)로 전송된다. expire-groups-upon-completiontrue로 설정하면 (기본값은 false다) 그룹을 통으로 제거하며, 새 메시지가 도착하면 (제거된 그룹과 동일한 correlation ID를 가지고 있는 메시지) 새로운 그룹을 형성한다. send-partial-result-on-expirytrue로 설정한 상태에서 MessageGroupStoreReaper를 이용하면 시퀀스가 일부만 모였을 때에도 release할 수 있다.

뒤늦게 도착한 메시지들을 폐기할 수 있으려면 aggregator는 반드시 그룹이 release된 이후에도 그룹에 대한 상태를 유지하고 있어야 한다. 이로 인해 종국엔 OOMout-of-memory이 발생하기도 한다. 이런 상황이 발생하지 않도록 하려면 MessageGroupStoreReaper를 설정해 그룹 메타데이터를 제거하는 방법을 검토해보는 것이 좋다. expiry 파라미터는 더 이상 뒤늦게 도착하는 메시지가 없을 것으로 예상되는 시점에 그룹을 만료시킬 수 있도록 설정해야 한다. reaper 설정에 대한 자세한 내용은 Aggregator에서 상태 관리하기: MessageGroupStore를 참고해라.

Spring Integration은 ReleaseStrategy의 구현체 SimpleSequenceSizeReleaseStrategy를 제공한다. 이 구현체는 도착하는 각 메시지에 있는 SEQUENCE_NUMBER, SEQUENCE_SIZE 헤더를 통해 메시지 그룹이 완성돼 집계할 준비가 됐는지를 판단한다. 앞에서도 언급했지만 이 구현체가 디폴트 전략이다.

5.0 버전 이전에 사용하던 디폴트 release 전략은 SequenceSizeReleaseStrategy로, 규모가 큰 그룹에선 활용하기가 어려웠다. 이 전략을 사용하면 중복된 시퀀스 넘버를 감지해 거절하는데, 이 작업은 비용이 커지기 십상이다.

집계하는 그룹의 규모가 크고, 그룹을 일부만 release할 필요가 없으며, 중복 시퀀스를 감지/거절할 필요가 없다면 이대신 SimpleSequenceSizeReleaseStrategy를 사용하는 것을 검토해봐라. 이런 유스 케이스라면 SimpleSequenceSizeReleaseStrategy가 훨씬 더 효율적이며, 5.0 버전 이후부턴 그룹을 부분적으로 release하지 않을 때 디폴트로 사용한다.

Aggregating Large Groups

4.3 릴리즈에선 SimpleMessageGroup에 메시지들을 담는 디폴트 CollectionHashSet으로 변경했다. 전에는 BlockingQueue를 사용했는데, 규모가 큰 그룹에선 개별 메시지들을 제거하는 비용이 상당했다 (O(n)에 해당하는 선형 탐색이 필요했다). 해시 셋은 일반적으로 제거 연산이 훨씬 빠르긴 하지만, 삽입과 제거 연산 모두 해시 값을 계산해야 하기 때문에 대용량 메시지라면 역시 비용이 커질 수 있다. 메시지에서 해시 값을 계산해내는 비용이 크다면 다른 컬렉션 타입을 고려해봐야 한다. MessageGroupFactory 사용하기에서도 설명하지만, SimpleMessageGroupFactory라는 구현체를 제공하므로 요구사항에 가장 잘맞는 Collection을 선택해주면 된다. 아니면 자체 팩토리 구현체를 제공해서 다른 Collection<Message<?>>를 생성하는 것도 가능하다.

다음은 이전에 사용했던 구현체와 SimpleSequenceSizeReleaseStrategy로 aggregator를 설정하는 예제다:

<int:aggregator input-channel="aggregate"
    output-channel="out" message-store="store" release-strategy="releaser" />

<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
    <property name="messageGroupFactory">
        <bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
            <constructor-arg value="BLOCKING_QUEUE"/>
        </bean>
    </property>
</bean>

<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />

aggregator의 업스트림에 필터 엔드포인트가 있는 경우, 필터에서 시퀀스에 속하는 일부 메시지를 제거할 수 있기 때문에 시퀀스 사이즈 release 전략(고정 사이즈를 사용하거나 sequenceSize 헤더를 이용하는 전략)은 원래 목적대로 사용할 수 없다. 이런 경우엔 다른 ReleaseStrategy를 선택하는 것이 좋다. 아니면 하위 discard 플로우에서 건너뛸 컨텐츠에 대한 정보를 담은 보상 메시지를 전송하고, complete 그룹 함수를 커스텀해 이 메시지를 활용하는 방법도 있다. 자세한 내용은 필터를 참고해라.

Correlation Strategy

CorrelationStrategy 인터페이스는 다음과 같이 정의돼있다:

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

이 메소드가 반환하는 Object는 메시지를 메시지 그룹으로 연결하는 데 사용하는 correlation 키를 나타낸다. 이 객체의 equals()hashCode() 메소드를 구현할 땐, 반드시 Map에서 키로 사용할 수 있도록 기준에 맞게 구현해야 한다.

일반적으로 POJO로도 correlation 로직을 구현할 수 있으며, 메시지가 메소드의 인자로 매핑되는 규칙은 (인자는 복수개를 선언할 수 있다) ServiceActivator에서와 동일하다 (@Header 어노테이션 등). 이 메소드는 반드시 값을 하나 반환해야 하며, null이어선 안 된다.

Spring Integration은 CorrelationStrategy의 구현체 HeaderAttributeCorrelationStrategy를 제공한다. 이 구현체는 메시지 헤더 중 하나의 값을 correlation 키로 반환한다 (생성자 인자를 통해 사용할 헤더의 이름을 지정한다). 디폴트로 사용하는 correlation 전략은 CORRELATION_ID 헤더 값을 반환하는 HeaderAttributeCorrelationStrategy다. correlation에 이용하고 싶은 커스텀 헤더가 있다면 HeaderAttributeCorrelationStrategy 인스턴스를 따로 하나 설정해서 aggregator에서 사용할 correlation 전략으로 참조해주면 된다.

Lock Registry

그룹을 변경하는 일은 스레드로부터 안전하다thread-safe. 따라서 동시에 같은 correlation ID로 여러 메시지를 전송하더라도, aggregator에선 그 중 하나의 메시지만 처리하고 있을 거고, 사실상 메시지 그룹당 하나의 스레드로 작업하게 된다. correlation ID를 리졸브한 뒤 락lock을 얻어올 땐 LockRegistry를 사용한다. 기본적으론 DefaultLockRegistry를 사용한다 (인메모리 구현체). 같은 MessageGroupStore를 공유하는 서버들 간에 업데이트 내역을 동기화하려면 반드시 공유shared 락 레지스트리를 설정해줘야 한다.

Avoiding Deadlocks

위에서 설명한 바와 같이, 메시지 그룹이 변경될 때는 (메시지를 추가하거나 release할 땐) 락lock을 획득해 들고있는다.

이제 다음과 같은 플로우를 생각해보자:

...->aggregator1-> ... ->aggregator2-> ...

멀티 스레드를 이용하고 있고, 여러 aggregator가 하나의 공통 락 레지스트리를 공유하는 경우 교착 상태deadlock에 빠지게 될 수 있다. 교착 상태에 빠지면 스레드가 멈추게 되고hang, jstack <pid>를 실행하면 다음과 같은 결과를 확인할 수 있다:

Found one Java-level deadlock:
=============================
"t2":
  waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t1"
"t1":
  waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t2"

이 문제를 방지할 수 있는 방법은 여러 가지다:

어떤 이유로 특정 aggregator의 출력이 결국 동일한 aggregator로 다시 라우팅되는 경우에도 이 문제가 발생할 수 있다. 이 경우엔 당연히 위에 있는 첫 번째 방법으론 해결할 수 없다.

8.4.3. Configuring an Aggregator in Java DSL

Java DSL을 이용해 aggregator를 설정하는 방법은 Aggregator와 Resequencer를 참고해라.

Configuring an Aggregator with XML

Spring Integration에서 XML로 aggregator를 설정하려면 <aggregator/> 요소를 이용하면 된다. 다음은 aggregator를 하나 설정하는 예시다:

<channel id="inputChannel"/>

<int:aggregator id="myAggregator"                          <!-- (1) -->
        auto-startup="true"                                <!-- (2) -->
        input-channel="inputChannel"                       <!-- (3) -->
        output-channel="outputChannel"                     <!-- (4) -->
        discard-channel="throwAwayChannel"                 <!-- (5) -->
        message-store="persistentMessageStore"             <!-- (6) -->
        order="1"                                          <!-- (7) -->
        send-partial-result-on-expiry="false"              <!-- (8) -->
        send-timeout="1000"                                <!-- (9) -->

        correlation-strategy="correlationStrategyBean"     <!-- (10) -->
        correlation-strategy-method="correlate"            <!-- (11) -->
        correlation-strategy-expression="headers['foo']"   <!-- (12) -->

        ref="aggregatorBean"                               <!-- (13) -->
        method="aggregate"                                 <!-- (14) -->

        release-strategy="releaseStrategyBean"             <!-- (15) -->
        release-strategy-method="release"                  <!-- (16) -->
        release-strategy-expression="size() == 5"          <!-- (17) -->

        expire-groups-upon-completion="false"              <!-- (18) -->
        empty-group-min-timeout="60000"                    <!-- (19) -->

        lock-registry="lockRegistry"                       <!-- (20) -->

        group-timeout="60000"                              <!-- (21) -->
        group-timeout-expression="size() ge 2 ? 100 : -1"  <!-- (22) -->
        expire-groups-upon-timeout="true"                  <!-- (23) -->

        scheduler="taskScheduler" >                        <!-- (24) -->
            <expire-transactional/>                        <!-- (25) -->
            <expire-advice-chain/>                         <!-- (26) -->
</aggregator>

<int:channel id="outputChannel"/>

<int:channel id="throwAwayChannel"/>

<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
    <constructor-arg ref="dataSource"/>
</bean>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>

(1) aggregator의 id는 선택사항이다.
(2) 애플리케이션 컨텍스트를 기동하면서 aggregator를 시작해야 하는지 여부를 나타내는 라이프사이클 관련 속성이다.
생략할 수 있다 (디폴트는 ‘true’).

(3) aggregator가 메시지를 받아올 채널.
필수 값이다.

(4) aggregator가 집계 결과를 전송할 채널.
생략할 수 있다 (수신한 메시지 자체의 헤더 ‘replyChannel’에 응답 채널이 지정돼 있을 수도 있기 때문).

(5) aggregator가 타임아웃된 메시지들을 전송할 채널 (send-partial-result-on-expiryfalse인 경우에).
생략할 수 있다.

(6) 메시지 그룹이 완성될 때까지 correlation 키 아래 메시지들을 저장하는 MessageGroupStore에 대한 참조.
생략할 수 있다.
기본적으로 휘발성의 인메모리 저장소를 사용한다.
자세한 내용은 메시지 스토어를 참고해라.

(7) 둘 이상의 핸들러가 동일한 DirectChannel을 구독하는 경우 참고하는 이 aggregator의 순서 (로드 밸런싱 목적으로 사용).
생략할 수 있다.

(8) 메시지들을 담고있는 MessageGroup이 만료되면 해당 메시지들을 집계해 ‘output-channel’이나 ‘replyChannel’로 보내야 하는지를 나타낸다 (MessageGroupStore.expireMessageGroups(long) 참고).
MessageGroup을 만료시키는 방법으로는 MessageGroupStoreReaper를 설정하는 방법이 있다.
하지만 이 방법 대신 MessageGroupStore.expireMessageGroups(timeout)을 호출해도 MessageGroup을 만료시킬 수 있다.
컨트롤 버스를 통해도 되고, MessageGroupStore 인스턴스에 대한 참조를 가지고 있다면 expireMessageGroups(timeout)을 호출하면 된다.
MessageGroup이 만료되지 않는다면 이 속성만으로는 아무런 일도 일어나지 않는다.
곧 만료되는 MessageGroup에 아직 남아 있는 메시지들을 전부 버릴지 출력/응답 채널로 보낼지를 나타내는 단순한 지표라고 볼 수 있다.
이 속성은 생략할 수 있다 (디폴트는 false).
참고로, expire-groups-upon-timeoutfalse로 설정한 경우 그룹이 실제로 만료되지 않을 수 있으므로 send-partial-result-on-timeout이라고 부르는 게 더 적합하다고 볼 수도 있다.

(9) 응답 Messageoutput-channel 또는 discard-channel로 전송할 때 대기하는 타임아웃 간격.
기본값은 -1로 무한으로 블로킹된다.
고정 ‘capacity’를 사용하는 QueueChannel같이, ‘전송’에 제한이 있는 출력 채널을 사용할 때만 적용된다.
타임아웃이 발생하면 MessageDeliveryException을 던진다.
AbstractSubscribableChannel의 구현체들은 send-timeout을 무시한다.
group-timeout(-expression)의 경우 예약된 expire 태스크에서 MessageDeliveryException이 발생하면 해당 태스크를 다시 스케줄링한다.
생략할 수 있다.

(10) 메시지 correlation (grouping) 알고리즘을 구현한 빈에 대한 참조.
이 빈은 CorrelationStrategy 인터페이스의 구현체일 수도, POJO일 수도 있다.
후자라면 correlation-strategy-method 속성도 반드시 함께 정의해야 한다.
생략할 수 있다 (aggregator는 기본적으로 IntegrationMessageHeaderAccessor.CORRELATION_ID 헤더를 사용한다).

(11) correlation-strategy가 참조하는 빈에 정의돼있는 메소드.
이 메소드에서 correlation 결정 알고리즘을 구현한다.
제약이 있긴 하지만 (correlation-strategy를 반드시 정의해야 한다) 생략할 수 있다.

(12) correlation 전략을 나타내는 SpEL 표현식.
ex: "headers['something']".
correlation-strategycorrelation-strategy-expression은 둘 중 하나만 사용할 수 있다.

(13) 애플리케이션 컨텍스트에 정의돼있는 빈에 대한 참조.
이 빈은 앞에서 설명했듯이 반드시 집계 로직을 구현해야 한다.
생략할 수 있다 (기본적으론 집계한 메시지들의 리스트를 출력 메시지의 페이로드로 활용한다).

(14) ref 속성으로 참조하는 빈에 정의돼있는 메소드.
이 메소드에서 메시지 집계 알고리즘을 구현한다.
생략할 수 있다 (ref 속성을 정의했는지에 따라 다르다).

(15) release 전략을 구현한 빈에 대한 참조.
이 빈은 ReleaseStrategy 인터페이스의 구현체일 수도, POJO일 수도 있다.
후자라면 release-strategy-method 속성도 반드시 함께 정의해야 한다.
생략할 수 있다 (aggregator는 기본적으로 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 헤더를 사용한다).

(16) release-strategy 속성이 참조하는 빈에 정의돼있는 메소드.
이 메소드에서 completion 결정 알고리즘을 구현한다.
생략할 수 있으며, 제약이 존재한다 (release-strategy를 반드시 정의해야 한다).

(17) release 전략을 나타내는 SpEL 표현식.
표현식의 루트 객체는 MessageGroup이다.
ex: "size() == 5".
release-strategyrelease-strategy-expression은 둘 중 하나만 사용할 수 있다.

(18) true로 설정하면 (디폴트는 false다) 완료된 그룹은 메시지 스토어에서 제거되며, 이후 correlation이 같은 메시지가 도착하면 새 그룹을 만들게된다.
기본 동작에선 완료된 그룹과 동일한 correlation을 가진 메시지들은 discard-channel로 전송된다.

(19) <aggregator>MessageStoreMessageGroupStoreReaper를 설정했을 때만 적용된다.
MessageGroupStoreReaper가 그룹을 부분적으로 만료하도록 설정돼있다면 기본적으로 비어있는 그룹 역시 제거한다.
빈 그룹은 그룹이 정상적으로 release된 후에 존재하게 된다.
이 빈 그룹 덕분에 늦게 도착하는 메시지들을 감지하고 폐기할 수 있다.
그룹을 부분적으로 만료시키는 것보다 더 긴 주기로 빈 그룹을 만료시키고 싶다면 이 속성을 설정해라.
비어있는 그룹들은 최소한 이 밀리세컨드 동안 수정되지 않는다면 MessageStore에서 제거되지 않을 거다.
빈 그룹이 실제로 만료되는 시간은 reaper의 timeout 속성에도 영향을 받으며, 이 값에 타임아웃을 더한 시간만큼 걸릴 수도 있다.

(20) org.springframework.integration.util.LockRegistry 빈에 대한 참조.
groupId를 기반으로 Lock을 획득하는데 사용한다. 덕분에 같은 MessageGroup에 동시에 접근하는 상황을 대응할 수 있다.
기본적으론 내부 DefaultLockRegistry를 사용한다.
ZookeeperLockRegistry같은 분산 LockRegistry를 사용하면 하나의 그룹에선 동시에 하나의 aggregator 인스턴스만이 작업할 수 있다.
자세한 내용은 Redis Lock Registry, Gemfire Lock Registry, Zookeeper Lock Registry를 참고해라.

(21) 현재 메시지가 도착했을 때 ReleaseStrategy가 그룹을 release하지 않으면 MessageGroup을 강제로 완료 상태로 만드는 타임아웃 (밀리세컨드 단위).
이 속성 덕분에 aggregator에 시간 기반 릴리즈 전략이 하나 내장된다고 볼 수 있다. MessageGroup에 마지막으로 메시지가 도착한 이후 타임아웃 기간 동안 새 메시지가 도착하지 않는 경우, 부분적인 결과를 내보내야 할 때 (혹은 그룹을 폐기해야 할 때) 활용할 수 있다.
MessageGroup이 생성된 시점부터 타임아웃을 계산하고 싶다면 group-timeout-expression 속성을 검토해봐라.
aggregator에 메시지가 새로 도착하면 해당 MessageGroup에 예약돼있는 기존 ScheduledFuture<?>는 모두 취소된다. ReleaseStrategyfalse를 반환하고 (release하지 않음을 의미) groupTimeout > 0이라면, 그룹을 만료시키는 태스크를 새로 예약한다.
이 속성을 0이나 음수 값으로 설정하는 것은 권하지 않는다.
이렇게 되면 모든 메시지 그룹이 즉시 완료되기 때문에 사실상 aggregator를 비활성화하는 거나 마찬가지다.
하지만 표현식을 사용하면 조건부로 0이나 음수로 설정할 수 있다.
자세한 내용은 group-timeout-expression을 참고해라.
그룹을 완료 상태로 만들면서 하는 일들은 ReleaseStrategysend-partial-group-on-expiry 속성에 따라 달라진다.
자세한 내용은 Aggregator와 그룹 타임아웃을 참고해라.
이 속성은 group-timeout-expression과 함께 사용할 수 없다.

(22) groupTimeout으로 평가되는 SpEL 표현식. #root 평가 컨텍스트 객체로 MessageGroup을 사용한다.
이 속성을 이용하면 MessageGroup을 강제로 완료 상태로 만드는 태스크를 예약할 수 있다.
표현식이 null로 평가되면 태스크를 예약하지 않는다.
0으로 평가되면 해당 그룹은 현재 스레드에서 즉시 완료된다.
사실상 이 속성은 group-timeout을 동적으로 만들어준다고 볼 수 있다.
예를 들어 그룹이 만들어지고 나서 10초가 지나면 MessageGroup을 강제로 완료시키고 싶다면, 이 SpEL 표현식을 검토해볼 수 있다: timestamp + 10000 - T(System).currentTimeMillis(). MessageGroup#root 평가 컨텍스트 객체이므로, 여기서 timestampMessageGroup.getTimestamp()로 제공된다.
하지만 그룹의 생성 시각은 다른 그룹 만료 속성들을 어떻게 설정했는지에 따라, 메시지가 처음 도착한 시간과는 다를 수 있다는 사실을 명심해라.
자세한 내용은 group-timeout을 참고해라.
group-timeout 속성과는 함께 사용할 수 없다.

(23) 타임아웃으로 인해 (또는 MessageGroupStoreReaper로 인해) 그룹이 완료되면 기본적으로 해당 그룹은 만료된다 (완전히 제거된다).
이후 도착하는 메시지들은 새 그룹을 만들게 된다.
이 속성을 false로 설정하면 그룹을 완료하되 메타데이터는 남겨둘 수 있어 늦게 도착한 메시지들을 폐기할 수 있다.
빈 그룹은 empty-group-min-timeout 속성과 MessageGroupStoreReaper를 함께 사용하면 이후 만료시킬 수 있다.
기본값은 true다.

(24) TaskScheduler 빈에 대한 참조. MessageGroupgroupTimeout 내로 새 메시지가 도착하지 않으면 MessageGroup을 강제로 완료시키는 태스크를 예약할 때 사용한다.
따로 지정하지 않으면 ApplicationContext에 등록돼있는 기본 스케줄러 taskScheduler(ThreadPoolTaskScheduler)를 사용한다.
이 속성은 group-timeout이나 group-timeout-expression을 지정하지 않았다면 적용되지 않는다.

(25) 4.1 버전부터 지원.
forceComplete 작업을 위한 트랜잭션을 시작할 수 있다.
forceComplete 작업은 group-timeout(-expression)이나 MessageGroupStoreReaper에 의해 시작되며, 일반적인 add, release, discard 작업에는 적용되지 않는다.
하위 요소는 이 요소와 <expire-advice-chain/>만 허용한다.

(26) 4.1 버전부터 지원.
forceComplete 작업에 원하는 Advice를 설정할 수 있다.
forceComplete 작업은 group-timeout(-expression)이나 MessageGroupStoreReaper에 의해 시작되며, 일반적인 add, release, discard 작업에는 적용되지 않는다.
하위 요소는 이 요소나 <expire-transactional/>만 허용한다.
스프링 tx 네임스페이스를 사용하면 이곳에 트랜잭션 Advice를 구성할 수도 있다.

Expiring Groups

그룹 만료(완전히 제거)와 관련해서는 두 가지 속성이 있다. 그룹이 만료되고 나면 관련 기록이 사라지며, 같은 correlation을 가진 메시지가 새로 도착하면 새로운 그룹을 시작한다. 그룹이 완료되면 (아직 만료는 되지 않고 완료만) 빈 그룹이 남아 있게되며, 늦게 도착한 메시지들은 버려진다. 이 비어있는 그룹은 empty-group-min-timeout 속성과 MessageGroupStoreReaper를 조합해서 사용하면 이후에 제거할 수 있다.

expire-groups-upon-completionReleaseStrategy가 그룹을 release하는 "정상적인" 완료와 관련된 속성이다. 기본값은 false다.

그룹이 정상적으로 완료되진 않았지만 타임아웃으로 인해 release됐거나 폐기되었다면 해당 그룹은 통상적으로 만료된다. 이 동작은 4.1 버전부터 expire-groups-upon-timeout을 이용해 제어할 수 있다. 이전 버전과의 호환성을 위해 true가 디폴트다.

그룹에 주어진 시간이 다 경과해 타임아웃되면 ReleaseStrategy가 그룹을 release할 수 있는 기회가 한 번 더 주어진다. 이때 그룹이 release되고 expire-groups-upon-timeout이 false인 경우, expire-groups-upon-completion에 따라 만료 여부가 결정된다. 타임아웃이 발생했는데도 릴리즈 전략으로 그룹이 release되지 않은 경우, expire-groups-upon-timeout에 따라 만료 여부가 결정된다. 타임아웃된 그룹들은 폐기되거나 부분적으로 release된다 (send-partial-result-on-expiry에 따라서).

5.0 버전부터는 empty-group-min-timeout 만큼 시간이 경과했을 때도 빈 그룹을 제거하는 태스크가 예약된다. 일반적인 release나 부분적인 시퀀스 release가 발생했을 때 expireGroupsUponCompletion == false이면서 minimumTimeoutForEmptyGroups > 0이라면 그룹을 삭제하는 태스크가 예약된다.

5.4 버전부터 aggregator(및 resequencer)는 설정을 통해 버려진orphaned 그룹을 만료시키도록 만들어줄 수 있다 (영구persistent 메시지 스토어에 있으며, 이 설정이 없으면 release되지 않을 수도 있는 그룹들을 뜻한다). expireTimeout(0보다 클 때)은 스토어에서 이 값보다 오래된 그룹은 제거purge해야 한다는 걸 나타낸다. purgeOrphanedGroups() 메소드는 기동 시에 한번 호출하며, 지정한 expireDuration 간격으로 스케줄링되는 태스크 내에서 주기적으로 호출한다. 이 메소드는 또한 언제든지 외부에서도 호출할 수 있다. 만료 로직은 위에서 언급한 만료 옵션들에 따라 forceComplete(MessageGroup)에 완전히 위임한다. 이런 주기적인 퍼지purge 기능은 일반적인 메시지 도착 로직으로는 더 이상 release되지 않는 오래된 그룹에서 메시지 스토어를 정리해줘야 할 때 유용하다. 이런 케이스는 대부분 영구persistent 메시지 그룹 스토어를 사용 중일 때 애플리케이션이 재시작되고나서 발생한다. 이 기능은 예약된 태스크에서 MessageGroupStoreReaper를 사용하는 것과 유사하지만, reaper 대신 그룹 타임아웃을 사용할 때 특정 컴포넌트 내에서 오래된 그룹들을 쉽게 처리할 수 있게 해준다. MessageGroupStore는 현재 correlation 엔드포인트에 대해서만 배타적으로 제공돼야 한다. 그렇지 않으면 특정 aggregator에서 다른 aggregator의 그룹을 제거해버릴 수도 있다. 이렇게 aggregator를 사용하면, 이 테크닉으로 만료된 그룹은 expireGroupsUponCompletion 속성에 따라 버려지거나 부분적으로 release된다.

커스텀 aggregator 핸들러 구현체를 다른 <aggregator> 정의에서도 참조할 수 있다면 보통 ref 속성 사용을 권장한다. 하지만 커스텀 aggregator 구현체를 하나의 <aggregator> 정의에서만 사용한다면, 다음과 같이 내부 빈 정의를 사용해 (1.0.3 버전부터) <aggregator> 요소 내에 POJO를 설정해도 된다:

<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.PojoAggregator"/>
</aggregator>

동일한 <aggregator> 설정에서 ref 속성과 내부 빈 정의를 둘 다 사용하는 것은 허용하지 않는다. 둘 다 사용하면 조건이 모호해져 예외가 발생한다.

다음은 aggregator 빈의 구현 예시다:

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }
}

위 예시에서 사용할 completion strategy 빈은 다음과 같이 구현할 수 있다:

public class PojoReleaseStrategy {
...
  public boolean canRelease(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}

상황에 따라 필요하다면 release strategy 메소드와 aggregator 메소드를 하나의 빈으로 결합할 수도 있다.

위 예시에서 사용할 correlation strategy 빈은 다음과 같이 구현할 수 있다:

public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

위 예제에서 aggregator는 어떠한 기준(이 경우 10으로 나눈 나머지 값)에 따라 숫자들을 그룹으로 묶으며, 페이로드에 해당하는 숫자들의 합이 특정 값을 넘어가기 전까지 그룹을 유지한다.

상황에 따라 필요하다면 release strategy 메소드와 correlation strategy 메소드, aggregator 메소드를 하나의 빈으로 결합할 수도 있다. (사실 전부 다 합쳐도 좋고, 두 개만 결합해도 좋다.)

Aggregators and Spring Expression Language (SpEL)

Spring Integration 2.0부터 SpEL을 사용해 다양한 전략들(correlation, release, aggregation)을 처리할 수 있으며, 이런 release 전략 등이 비교적 단순한 로직이라면 사용을 권장하고 있다. 객체의 배열을 받도록 설계된 레거시 컴포넌트가 있다고 가정해보자. 우리는 디폴트 release 전략이 List에 집계된 모든 메시지를 조립한다는 것을 알고 있다. 여기서는 두 가지 요구사항이 있다. 먼저, 리스트에서 메시지들을 개별적으로 추출해야 한다. 둘째, 각 메시지의 페이로드를 추출해서 객체들의 배열로 조합해야 한다. 다음은 두 요구사항을 모두 충족시키는 코드다:

public String[] processRelease(List<Message<String>> messages){
    List<String> stringList = new ArrayList<String>();
    for (Message<String> message : messages) {
        stringList.add(message.getPayload());
    }
    return stringList.toArray(new String[]{});
}

하지만 SpEL을 사용한다면, 실제로 이런 요구사항은 한 줄짜리 표현식으로 비교적 간단하게 처리할 수 있다. 그렇기 때문에 커스텀 클래스를 작성하고 빈으로 설정해줄 필요가 없어진다. 다음은 SpEL을 사용한 예제다:

<int:aggregator input-channel="aggChannel"
    output-channel="replyChannel"
    expression="#this.![payload].toArray()"/>

위 설정에선 collection projection 표현식을 사용해 리스트에 있는 모든 메시지들의 페이로드를 모아 새 컬렉션을 만든 다음, 이를 배열로 변환한다. 즉, 앞서 보여준 자바 코드와 동일한 결과를 만들어낸다.

커스텀 release, correlation 전략을 처리할 때도 마찬가지로 표현식을 이용할 수 있다.

correlation-strategy 속성으로 커스텀 CorrelationStrategy 빈을 정의하는 대신, 다음 예제와 같이 SpEL 표현식으로 간단한 correlation 로직을 구현하고 correlation-strategy-expression 속성에 설정해주면 된다:

correlation-strategy-expression="payload.person.id"

위 예제에선 페이로드에 id가 있는 person이란 속성이 있다고 가정하고 있다. 메시지를 연계할 땐 바로 이 속성을 사용할 거다.

ReleaseStrategy에서도 마찬가지로 release 로직을 SpEL 표현식으로 구현하고 release-strategy-expression 속성에 설정해주면 된다. 평가 컨텍스트의 루트 객체는 MessageGroup 자체다. 표현식 내에서 메시지들의 List를 참조하려면 이 그룹의 message 프로퍼티를 사용하면 된다.

5.0 버전 이전 릴리즈에서 루트 객체는 이전 예제에서도 알 수 있듯이 Message<?>의 컬렉션이었다:

release-strategy-expression="!messages.?[payload==5].empty"

위 예제에선 SpEL 평가 컨텍스트의 루트 객체는 MessageGroup 자체이며, 이 그룹에 5라는 페이로드를 가진 메시지가 생기는 즉시 그룹을 release해야 한다고 명시하고 있다.

Aggregator and Group Timeout

4.0 버전부터 새로운 두 가지 속성 group-timeout, group-timeout-expression을 도입했다. 이 둘은 상호 배타적이다 (함께 사용할 수 없다). XML로 Aggregator 설정하기를 함께 참고해라. 경우에 따라서는 현재 메시지가 도착했을 때 ReleaseStrategy가 그룹을 release시키지 않는다면, 타임아웃 이후 집계 결과를 내보내거나 그룹을 폐기해야 할 수도 있다. 이때는 다음과 같이 groupTimeout 옵션을 이용해 MessageGroup을 강제로 완료시키는 태스크를 예약할 수 있다:

<aggregator input-channel="input" output-channel="output"
        send-partial-result-on-expiry="true"
        group-timeout-expression="size() ge 2 ? 10000 : -1"
        release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>

이 예제에선 release-strategy-expression에 정의된 대로, aggregator가 시퀀스 내 마지막 메시지를 수신하면 정상적인 release가 가능하다. release를 유발해줄 메시지가 도착하지 않을 때는, 그룹에 메시지가 최소 두 개 들어있기만 한다면 groupTimeout이 10초 뒤 그룹을 강제로 완료 상태로 바꿔준다.

그룹을 강제로 완료한 뒤의 결과는 ReleaseStrategysend-partial-result-on-expiry에 따라 달라진다. 먼저 release 전략을 다시 호출해 정상적인 release가 가능한지를 확인해본다. 그룹이 변경되진 않았더라도, ReleaseStrategy로 이번엔 그룹을 release할지를 결정할 수 있다. release 전략에서 이번에도 그룹을 release하지 않는다면 그 그룹은 만료된다. 이때 send-partial-result-on-expirytrue이면 (일부만 담겨있는) MessageGroup에 있는 기존 메시지들은 output-channel로 보내는 일반적인 aggregator 응답 메시지로 release되며, true가 아닐 땐 폐기된다.

groupTimeout 관련 동작과 MessageGroupStoreReaper에는 한 가지 차이점이 있다 (XML로 Aggregator 설정하기 참고). Reaper는 주기적으로 MessageGroupStore의 모든 MessageGroup에 대한 강제 완료 처리를 시작한다. groupTimeoutgroupTimeout이 지나도 새 메시지가 도착하지 않으면 각 MessageGroup에 대해 같은 처리를 개별적으로 수행한다. 또한 reaper는 빈 그룹을 제거할 때에도 사용할 수 있다 (빈 그룹들을 유지하는 이유는 expire-groups-upon-completion이 false일 때 뒤늦게 도착하는 메시지들을 폐기하기 위함이다).

5.5 버전부터 groupTimeoutExpressionjava.util.Date 인스턴스로 평가될 수 있다. groupTimeoutExpressionlong으로 평가해서 메시지가 도착한 시간을 기반으로 타임아웃을 계산하는 대신, 그룹 생성 시간(MessageGroup.getTimestamp())을 기반으로 태스크를 예약할 수 있어 유용하다:

group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"

Configuring an Aggregator with Annotations

다음은 어노테이션을 이용해 aggregator를 설정하는 예시다:

public class Waiter {
  ...

  @Aggregator  // (1)
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }

  @ReleaseStrategy  // (2)
  public boolean releaseChecker(List<Message<?>> messages) {
    ...
  }

  @CorrelationStrategy  // (3)
  public String correlateBy(OrderItem item) {
    ...
  }
}

(1) 이 어노테이션은 이 메소드를 aggregator로 사용해야 한다는 걸 나타낸다. 이 클래스를 aggregator로 사용한다면 반드시 명시해야 한다.
(2) 이 어노테이션은 이 메소드를 aggregator의 release 전략으로 사용해야 한다는 걸 나타낸다. 어떤 메소드에도 선언하지 않으면 aggregator는 SimpleSequenceSizeReleaseStrategy를 사용한다.
(3) 이 어노테이션은 이 메소드를 aggregator의 correlation 전략으로 사용해야 한다는 걸 나타낸다. correlation 전략을 지정하지 않으면 aggregator는 CORRELATION_ID 기반 HeaderAttributeCorrelationStrategy를 사용한다.

@Aggregator 어노테이션을 이용할 때도 XML 요소에서 제공하는 모든 설정 옵션을 사용할 수 있다.

aggregator는 XML에 명시할 수도 있고, 클래스에 @MessageEndpoint를 정의했다면 클래스패스 스캔을 통해 자동으로 감지할 수도 있다.

Aggregator 컴포넌트를 어노테이션으로 설정한다면 (@Aggregator 등), 대부분이 디폴트 옵션만으로 해결되는 간단한 유스 케이스만 구성할 수 있다. 어노테이션 설정을 사용하는데 관련 옵션들을 좀더 커스텀해야 한다면, 다음과 같이 AggregatingMessageHandler@Bean으로 정의하고 이 @Bean 메소드를 @ServiceActivator로 마킹하는 것을 검토해봐라:

@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
     AggregatingMessageHandler aggregator =
                       new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                                                 jdbcMessageGroupStore);
     aggregator.setOutputChannel(resultsChannel());
     aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
     aggregator.setTaskScheduler(this.taskScheduler);
     return aggregator;
}

자세한 정보는 프로그래밍 모델@Bean 메소드 위에 선언할 수 있는 어노테이션들을 읽어봐라.

4.2 버전부터는 AggregatorFactoryBean을 이용해 좀더 간단한 자바 설정으로 AggregatingMessageHandler를 구성할 수 있다.

8.4.4. Managing State in an Aggregator: MessageGroupStore

Aggregator는 일정 시간 동안 같은 correlation 키를 사용해 도착한 메시지들로 메시지 그룹을 결정해야 하는 stateful 패턴이다 (Spring Integration에는 aggregator 말고도 stateful에 해당하는 몇 가지 다른 패턴들도 존재한다). Stateful 패턴에 쓰이는 인터페이스들은 (ReleaseStrategy 등) 해당 컴포넌트가 (정의한 주체가 프레임워크이든 사용자든) stateless 상태를 유지할 수 있어야 한다는 원칙 하에 설계한다. 모든 상태 정보는 MessageGroup에 담아 전달되며, 상태 관리는 MessageGroupStore에 위임한다. MessageGroupStore 인터페이스는 다음과 같이 정의돼있다:

public interface MessageGroupStore {

    int getMessageCountForAllMessageGroups();

    int getMarkedMessageCountForAllMessageGroups();

    int getMessageGroupCount();

    MessageGroup getMessageGroup(Object groupId);

    MessageGroup addMessageToGroup(Object groupId, Message<?> message);

    MessageGroup markMessageGroup(MessageGroup group);

    MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);

    MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);

    void removeMessageGroup(Object groupId);

    void registerMessageGroupExpiryCallback(MessageGroupCallback callback);

    int expireMessageGroups(long timeout);
}

자세한 정보는 Javadoc을 참고해라.

MessageGroupStore는 release 전략이 트리거되기를 기다리면서 MessageGroups에 상태 정보를 누적하는데, release 전략은 끝내 트리거되지 않을 수도 있다. 따라서 MessageGroupStore를 사용할 땐 MessageGroup이 만료될 때 적용할 콜백을 등록할 수 있다. 콜백을 활용하면 오래된stale 메시지를 정리해줄 수 있으며, 휘발성volatile 저장소를 이용할 땐 애플리케이션을 종료하면서 리소스를 정리할 수 있는 훅을 등록할 수 있다. 콜백 인터페이스는 아래 보이는 것처럼 매우 직관적이다:

public interface MessageGroupCallback {

    void execute(MessageGroupStore messageGroupStore, MessageGroup group);

}

콜백에선 메시지 스토어와 메시지 그룹에 직접 접근할 수 있으므로, 현재 보관 중인persistent 상태를 관리할 수 있다 (예를 들면 메시지 스토어에서 그룹을 통째로 제거할 수도 있다).

MessageGroupStore는 이 콜백 목록을 가지고 있으며, 타임스탬프가 파라미터로 전달된 시간보다 앞선 모든 메시지를 대상으로 콜백을 실행한다 (앞에서 설명한 registerMessageGroupExpiryCallback(..), expireMessageGroups(..) 메소드 참고).

expireMessageGroups 기능을 이용한다면, 같은 MessageGroupStore 인스턴스 하나를 여러 aggregator 컴포넌트들에서 사용하지 않도록 주의해야 한다. AbstractCorrelatingMessageHandler는 전부 forceComplete() 콜백 기반의 자체 MessageGroupCallback을 등록한다. 그렇기 때문에 하나의 스토어를 공유해서 쓰면 만료되는 그룹들이 엉뚱한 aggregator에 의해 완료되거나 폐기될 수 있다. 5.0.10 버전부턴 AbstractCorrelatingMessageHandler에서 MessageGroupStore에 콜백을 등록할 땐 UniqueExpiryCallback을 사용한다. MessageGroupStore에선 해당 클래스의 인스턴스가 이미 있는지를 확인하고, 콜백 셋에 같은 인스턴스가 존재하는 경우 적당한 에러 로그를 남긴다. 프레임워크는 이와 같이 다른 aggregator/resequencer에서 MessageGroupStore 인스턴스를 사용하지 못하도록 해서, 금방 언급한 특정 correlation 핸들러가 직접 만들지 않는 그룹을 만료시키는 부작용을 방지해준다.

expireMessageGroups 메소드를 호출할 땐 타임아웃 값을 넘길 수 있다. 현재 시간에서 이 값을 뺀 시간보다 오래된 메시지들은 전부 만료되고 콜백을 실행한다. 따라서 메시지 그룹의 “만료”가 의미하는 바는 메시지 스토어의 사용자가 결정할 수 있다.

Spring Integration은 편의를 위해 메시지 만료를 위한 래퍼wrapper MessageGroupStoreReaper를 제공한다:

<bean id="reaper" class="org...MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="30000"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

이 reaper는 Runnable이다. 위 예제에선 메시지 그룹 스토어의 expire 메소드를 10초마다 호출한다. 자체 타임아웃은 30초다.

MessageGroupStoreReaper의 ‘timeout’ 프로퍼티는 대략적인 근사치라는 걸 이해해야 한다. 이 프로퍼티는 다음에 예약된 MessageGroupStoreReaper 태스크를 실행할 때에만 확인하기 때문에, 실제 타임아웃은 태스크 스케줄러의 속도에 따라 달라질 수 있다. 예를 들어 타임아웃은 10분으로 설정돼 있지만, MessageGroupStoreReaper 태스크가 매시간 실행되도록 예약돼 있고 MessageGroupStoreReaper 태스크를 마지막으로 실행한 게 타임아웃 1분 전이라면, 앞으로 59분 동안은 MessageGroup이 만료되지 않는다. 따라서 태스크 실행 간격은 최소한 타임아웃과 같거나 더 짧은 간격으로 설정하는 것이 좋다.

만료 콜백은 리퍼 외에도 AbstractCorrelatingMessageHandler의 라이프사이클 콜백을 통해 애플리케이션이 종료될 때도 실행된다.

AbstractCorrelatingMessageHandler는 자체 만료 콜백을 등록하며, 이 콜백은 aggregator의 XML 설정에 있는 boolean 플래그 send-partial-result-on-expiry와 연결된다. 이 플래그를 true로 설정하면 만료 콜백이 실행될 때 아직 release 전인 그룹에 있는, 마킹되지 않는 메시지들을 출력 채널로 전송할 수 있다.

MessageGroupStoreReaper는 예약된 태스크에서 호출해서 다운스트림 플로우로 메시지를 보낼 수도 있다 (sendPartialResultOnExpiry 옵션에 따라 다르다). 그렇기 때문에 errorChannel을 통해 예외를 처리할 수 있도록 MessagePublishingErrorHandler와 커스텀 TaskScheduler를 제공하는 것을 권장한다. 일반적인 aggregator release 기능에서도 기대하는 방식이기도 하다. 마찬가지로 TaskScheduler를 사용하는 그룹 타임아웃 기능에서도 동일하다. 자세한 내용은 에러 핸들링을 참고해라.

서로 다른 correlation 엔드포인트에서 하나의 MessageStore를 공유하고 있다면 반드시 적합한 CorrelationStrategy를 설정해서 그룹 ID의 고유성을 보장해줘야 한다. 그렇지 않으면 특정 correlation 엔드포인트가 다른 엔드포인트의 메시지를 release하거나 만료해서 의도와는 다르게 동작할 수도 있다. correlation 키가 동일한 메시지는 동일한 메시지 그룹에 저장된다.

MessageStore 구현체 중에는 데이터를 파티셔닝해서 물리적으로 동일한 리소스를 사용할 수 있게 해주는 구현체도 있다. 예를 들어 JdbcMessageStore에는 region이란 속성이 있으며, MongoDbMessageStore에는 collectionName이란 속성이 있다.

MessageStore 인터페이스와 구현체에 대한 좀더 자세한 내용은 메시지 스토어를 읽봐라.

8.4.5. Flux Aggregator

5.2 버전에선 FluxAggregatorMessageHandler 컴포넌트를 도입했다. 이 핸들러는 프로젝트 리액터의 Flux.groupBy(), Flux.window() 연산자를 사용한다. 들어오는 메시지들은 생성자 안에서 Flux.create()로 시작했던 FluxSink로 방출한다. outputChannel을 제공하지 않거나 ReactiveStreamsSubscribableChannel의 인스턴스가 아닌 경우, 메인 Flux에 대한 구독은 Lifecycle.start() 구현부에서 수행한다. 그 외는 ReactiveStreamsSubscribableChannel 구현체에서 구독을 수행하는 것으로 연기한다. 메시지들은 Flux.groupBy()를 통해 묶이며, 그룹 키엔 CorrelationStrategy를 사용한다. 기본적으론 메시지의 IntegrationMessageHeaderAccessor.CORRELATION_ID 헤더를 참조한다.

기본적으로 윈도우가 닫히면 페이로드에 Flux를 담은 메시지를 만들어 윈도우를 release한다. 이 메시지는 윈도우의 첫 번째 메시지에 있는 모든 헤더를 가지고 있다. 출력 메시지 페이로드 안에 담겨있는 이 Flux는 다운스트림에서 반드시 구독하고 처리해야 한다. 관련 로직은 FluxAggregatorMessageHandler의 설정 옵션 setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)로 커스텀(또는 대체)할 수 있다. 예를 들어, 최종적으로 만드는 메시지에 페이로드의 List를 담고싶다면 다음과 같이 Flux.collectList()를 설정해주면 된다:

fluxAggregatorMessageHandler.setCombineFunction(
                (messageFlux) ->
                        messageFlux
                                .map(Message::getPayload)
                                .collectList()
                                .map(GenericMessage::new));

FluxAggregatorMessageHandler에는 여러 가지 옵션이 있어 적절한 윈도우 전략을 선택할 수 있다:

이 컴포넌트는 MessageHandler의 구현체이므로 간단히 메시징 어노테이션 @ServiceActivator@Bean 정의로 사용할 수 있다. Java DSL에선 EIP 메소드 .handle()에서 사용하면 된다. 아래 보이는 샘플은 런타임에 IntegrationFlow를 등록하고, FluxAggregatorMessageHandler를 업스트림 splitter와 연계하는 방법을 보여준다:

IntegrationFlow fluxFlow =
        (flow) -> flow
                .split()
                .channel(MessageChannels.flux())
                .handle(new FluxAggregatorMessageHandler());

IntegrationFlowContext.IntegrationFlowRegistration registration =
        this.integrationFlowContext.registration(fluxFlow)
                .register();

@SuppressWarnings("unchecked")
Flux<Message<?>> window =
        registration.getMessagingTemplate()
                .convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);

8.4.6. Condition on the Message Group

5.5 버전부터 AbstractCorrelatingMessageHandler(자바 & XML DSL 포함)는 BiFunction<Message<?>, String, String> 타입 옵션 groupConditionSupplier를 제공한다. 이 함수는 그룹에 메시지가 추가될 때마다 호출하며, 리턴한 condition 문자열은 그룹에 저장돼 이후에 참작할 수 있다. ReleaseStrategy에선 그룹에 있는 모든 메시지를 순회하는 대신에 이 condition을 참조할 수 있다. 자세한 내용은 GroupConditionProvider JavaDocs와 메시지 그룹 Condition을 참고해라.

File Aggregator도 함께 읽어보면 좋다.


8.5. Resequencer

resequencer는 aggregator와 관련이 있지만 사용하는 목적은 다르다. Aggregator는 메시지들을 결합해주는 반면, resequencer를 거치더라도 메시지는 변경되지 않는다.

8.5.1. Functionality

resequencer는 CORRELATION_ID를 사용해 메시지들을 그룹에 저장한다는 점에선 aggregator와 유사하다. 차이점은 Resequencer는 어떤 방식으로든 메시지를 가공하지 않는다는 것이다. 대신 SEQUENCE_NUMBER 헤더 값을 기준으로 메시지들을 정렬해서 release한다.

이와 관련해서는, 모든 메시지를 한 번에 release할 수도 있고 (SEQUENCE_SIZE 등에 따라 전체 시퀀스가 모이고 나면), 유효한 시퀀스가 하나라도 있으면 즉시 release할 수 있다. (“유효한 시퀀스”가 의미하는 바는 이 챕터 뒤 에서 설명한다.)

resequencer는 짧은 간격으로 들어오는, 상대적으로 짧은 메시지 시퀀스를 재배열하는 용도다. 긴 시간 동안 따로 따로 들어오는 시퀀스가 많다면 성능 문제를 겪을 수도 있다.

8.5.2. Configuring a Resequencer

Java DSL을 이용해 resequencer를 설정한다면 Aggregator와 Resequencer를 참고해라.

resequencer를 설정할 땐 XML에 적당한 요소만 추가해주면 된다.

다음은 resequencer 설정 예시다:

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer"  <!-- (1) -->
  input-channel="inputChannel"  <!-- (2) -->
  output-channel="outputChannel"  <!-- (3) -->
  discard-channel="discardChannel"  <!-- (4) -->
  release-partial-sequences="true"  <!-- (5) -->
  message-store="messageStore"  <!-- (6) -->
  send-partial-result-on-expiry="true"  <!-- (7) -->
  send-timeout="86420000"  <!-- (8) -->
  correlation-strategy="correlationStrategyBean"  <!-- (9) -->
  correlation-strategy-method="correlate"  <!-- (10) -->
  correlation-strategy-expression="headers['something']"  <!-- (11) -->
  release-strategy="releaseStrategyBean"  <!-- (12) -->
  release-strategy-method="release"  <!-- (13) -->
  release-strategy-expression="size() == 10"  <!-- (14) -->
  empty-group-min-timeout="60000"  <!-- (15) -->

  lock-registry="lockRegistry"  <!-- (16) -->

  group-timeout="60000"  <!-- (17) -->
  group-timeout-expression="size() ge 2 ? 100 : -1"  <!-- (18) --> 
  scheduler="taskScheduler" />  <!-- (19) -->
  expire-group-upon-timeout="false" />  <!-- (20) -->

(1) resequencer의 id는 생략할 수 있다.
(2) resequencer의 입력 채널.
생략할 수 없다.

(3) resequencer가 재정렬한 메시지들을 전송할 채널.
생략할 수 있다.

(4) resequencer가 타임아웃된 메시지들을 전송할 채널 (send-partial-result-on-timeoutfalse로 설정한 경우).
생략할 수 있다.

(5) 정렬한 시퀀스들을 가능할 때 바로 전송할지 아니면 전체 메시지 그룹이 모이고 나면 보낼지 여부.
생략할 수 있다.
(기본값은 false다.)

(6) 메시지 그룹이 완성될 때까지 correlation 키 아래 메시지 그룹을 저장하는 데 사용하는 MessageGroupStore에 대한 참조.
생략할 수 있다.
(기본적으로 휘발성의 인메모리 저장소를 사용한다.)

(7) 그룹이 만료되면 정렬한 그룹을 전송해야 하는지 여부 (일부 메시지가 누락됐더라도).
생략할 수 있다.
(디폴트는 false다.)
Aggregator에서 상태 관리하기: MessageGroupStore를 참고해라.

(8) 응답 Messageoutput-channel 또는 discard-channel에 전송할 때 대기하는 타임아웃 간격. 기본값은 -1로 무한으로 블로킹된다.
고정 ‘capacity’를 사용하는 QueueChannel같이, ‘전송’에 제한이 있는 출력 채널을 사용할 때만 적용된다. 타임아웃이 발생하면 MessageDeliveryException을 던진다.
AbstractSubscribableChannel의 구현체들은 send-timeout을 무시한다.
group-timeout(-expression)의 경우 예약된 expire 태스크에서 MessageDeliveryException이 발생하면 해당 태스크를 다시 스케줄링한다.
생략할 수 있다.

(9) 메시지 correlation (grouping) 알고리즘을 구현한 빈에 대한 참조.
이 빈은 CorrelationStrategy 인터페이스의 구현체일 수도, POJO일 수도 있다.
후자라면 correlation-strategy-method 속성도 반드시 함께 정의해야 한다.
생략할 수 있다 (aggregator는 기본적으로 IntegrationMessageHeaderAccessor.CORRELATION_ID 헤더를 사용한다.)

(10) correlation-strategy가 참조하는 빈에 정의돼있는 메소드. 이 메소드에서 correlation 결정 알고리즘을 구현한다.
제약이 있긴 하지만 (correlation-strategy를 반드시 정의해야 한다) 생략할 수 있다.

(11) correlation 전략을 나타내는 SpEL 표현식.
ex: "headers['something']").
correlation-strategycorrelation-strategy-expression은 둘 중 하나만 사용할 수 있다.

(12) release 전략을 구현한 빈에 대한 참조.
이 빈은 ReleaseStrategy 인터페이스의 구현체일 수도, POJO일 수도 있다.
후자라면 release-strategy-method 속성도 반드시 함께 정의해야 한다.
생략할 수 있다 (aggregator는 기본적으로 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 헤더를 사용한다).

(13) release-strategy가 참조하는 빈에 정의돼있는 메소드. 이 메소드에서 completion 결정 알고리즘을 구현한다.
생략할 수 있으며, 제약이 존재한다 (release-strategy를 반드시 정의해야 한다).

(14) release 전략을 나타내는 SpEL 표현식.
표현식의 루트 객체는 MessageGroup이다.
ex: "size() == 5".
release-strategyrelease-strategy-expression은 둘 중 하나만 사용할 수 있다.

(15) <resequencer>MessageStoreMessageGroupStoreReaper를 설정했을 때만 적용된다.
MessageGroupStoreReaper가 그룹을 부분적으로 만료하도록 설정돼있다면 기본적으로 비어있는 그룹 역시 제거한다.
빈 그룹은 그룹이 정상적으로 release된 후에 존재하는데, 덕분에 늦게 도착하는 메시지들을 감지하고 폐기할 수 있다.
그룹을 부분적으로 만료시키는 것보다 더 긴 주기로 빈 그룹을 만료시키고 싶다면 이 속성을 설정해라.
비어있는 그룹들은 최소한 이 밀리세컨드 동안 수정되지 않는다면 MessageStore에서 제거되지 않을 거다.
빈 그룹이 실제로 만료되는 시간은 reaper의 타임아웃 속성에도 영향을 받으며, 이 값에 타임아웃을 더한 시간만큼 걸릴 수도 있다.

(16) XML로 Aggregator 설정하기 참고.
(17) XML로 Aggregator 설정하기 참고.
(18) XML로 Aggregator 설정하기 참고.
(19) XML로 Aggregator 설정하기 참고.
(20) 타임아웃으로 인해 (또는 MessageGroupStoreReaper로 인해) 그룹이 완료되면 기본적으로 빈 그룹의 메타데이터를 보관한다.
따라서 늦게 도착하는 메시지들을 즉시 폐기할 수 있다.
그룹을 완전히 제거하려면 이 속성을 true로 설정해라.
그러면 늦게 도착한 메시지들은 새 그룹을 시작하고 그룹이 다시 타임아웃되기 전까진 폐기하지 않는다.
이때 타임아웃이 발생한 이유는 시퀀스 범위 안에 “구멍”이 존재하기 때문이므로, 새로 만들어진 그룹은 절대 정상적으로 release되지 않는다.
빈 그룹들은 empty-group-min-timeout 속성과 MessageGroupStoreReaper를 함께 사용하면 이후 만료시킬 수 있다 (완전히 제거된다).
5.0 버전부터 empty-group-min-timeout이 경과하면 빈 그룹도 함께 제거하도록 스케줄링된다.
기본값은 ‘false’다.

자세한 내용은 Aggregator 그룹 만료시키기를 함께 참고해라.

resequencer와 관련해서는, 자바 클래스에서 구현할만한 커스텀 동작이 없기 때문에 따로 지원하는 어노테이션은 없다.


8.6. Message Handler Chain

MessageHandlerChain은 실제 동작은 필터, transformer, splitter 등과 같은 다른 핸들러들로 구성된 체인에 위임하면서, 단일 메시지 엔드포인트로 설정할 수 있는 MessageHandler의 구현체다. 여러 가지 핸들러를 고정된 순서로 연결해서 선형으로 실행해야 한다면 MessageHandlerChain을 이용하는 게 훨씬 더 간단하다. 예를 들면 다른 구성 요소 앞에 transformer를 두는 경우가 꽤 많다. 비슷하게 체인 앞부분에 필터를 두면 사실상 선택적인selective 컨슈머를 만들게 된다. 두 체인 모두 하나의 input-channel과 하나의 output-channel만 있으면 되기 때문에, 구성 요소마다 개별적으로 채널을 정의해주지 않아도 된다.

MessageHandlerChain은 대부분 XML 설정 위주로 설계됐다. Java DSL의 IntegrationFlow 정의는 하나의 체인 컴포넌트로 취급할 순 있지만, 아래에서 설명하는 개념과 원칙들과는 관련이 없다. 자세한 내용은 Java DSL을 참고해라.

Spring Integration의 FilterthrowExceptionOnRejection이라는 boolean 프로퍼티를 제공한다. 하나의 point-to-point 채널에서 메시지를 수락하는 기준이 다른 여러 가지 선택적selective 컨슈머가 있다면, 이 값을 ‘true’로 설정해주는 것이 좋다 (기본값은 false다). 그러면 dispatcher에서 메시지가 거부되었음을 알 수 있으며, 결과적으로 메시지를 다른 구독자에게 전달해볼 수 있다. 예외를 던지지 않으면, 필터에서 메시지를 버려 추가적인 처리가 일어나지 않는 경우에도 dispatcher는 메시지 전달에 성공한 것으로 밖에 알 수가 없다. 정말로 메시지를 “삭제drop“하고 싶을 때는 필터의 ‘discard-channel’도 유용할 수 있다. ‘discard-channel’을 이용하면 삭제된 메시지로 원하는 작업을 수행할 수 있다 (JMS 큐에 메시지를 전송하거나 로그로 남기는 등).

핸들러 체인 설정은 매우 단순한데, 내부 구성 요소 간의 결합도는 똑같이 느슨하게 유지해준다. 어느 시점엔 비선형적인 조합이 필요해지더라도, 쉽게 설정을 변경할 수 있다.

체인의 내부에선 가지고 있는 엔드포인트들을 선형적으로 확장한다. 이 엔드포인트들은 익명 채널로 구분된다. 체인 내에선 reply 채널 헤더를 고려하지 않는다. 마지막 핸들러를 호출했을 때만 결과 메시지를 reply 채널이나 체인의 출력 채널로 전달한다. 이와 같은 구조로 인해 마지막 핸들러를 제외한 모든 핸들러는 MessageProducer 인터페이스를 구현해야 한다 (‘setOutputChannel()’ 메소드). MessageHandlerChainoutputChannel이 설정돼 있으면 마지막 핸들러는 출력 채널 하나만 있으면 된다.

다른 엔드포인트와 마찬가지로 output-channel은 선택 사항이다. 체인이 끝났을 때 응답 메시지가 있는 경우 output-channel을 우선시한다. 하지만 응답 메시지가 없다면 체인 핸들러는 폴백으로 인바운드 메시지에서 reply 채널 헤더를 확인해본다.

대부분의 경우 MessageHandler를 직접 구현할 필요가 없다. 다음 섹션에선 네임스페이스 지원을 이용해 체인 요소를 설정하는 방법에 집중한다. 서비스 activator와 transformer같은 대부분의 Spring Integration 엔드포인트들은 MessageHandlerChain 내에서 사용하기 알맞게 설계돼있다.

8.6.1. Configuring a Chain

<chain> 요소는 input-channel 속성을 제공한다. 체인의 마지막 요소가 응답 메시지를 생성하는 경우를 위해 (선택 사항이다), output-channel 속성도 지원한다. 하위 요소로는 filter, transformer, splitter, service-activator가 있다. 마지막 요소는 라우터나 아웃바운드 채널 어댑터일 수도 있다. 다음은 체인을 정의하는 예시다:

<int:chain input-channel="input" output-channel="output">
    <int:filter ref="someSelector" throw-exception-on-rejection="true"/>
    <int:header-enricher>
        <int:header name="thing1" value="thing2"/>
    </int:header-enricher>
    <int:service-activator ref="someService" method="someMethod"/>
</int:chain>

위 예제에서 사용한 <header-enricher> 요소는 메시지에 thing1이라는 헤더를 thing2라는 값으로 설정해준다. 헤더 enricher는 헤더 값에만 손을 대는 특화된 Transformer다. 헤더를 수정하는 MessageHandler를 구현하고 빈으로 연결해줘도 같은 결과를 얻을 수 있지만, header-enricher가 훨씬 더 간단하다.

<chain>은 메시지 플로우를 “마감하는closed-box” 마지막 컨슈머로 설정해도 된다. 이럴 땐 아래 예제와 같이 <chain>의 끝에 원하는 <outbound-channel-adapter>를 넣어주면 된다:

<int:chain input-channel="input">
    <int-xml:marshalling-transformer marshaller="marshaller" result-type="StringResult" />
    <int:service-activator ref="someService" method="someMethod"/>
    <int:header-enricher>
        <int:header name="thing1" value="thing2"/>
    </int:header-enricher>
    <int:logging-channel-adapter level="INFO" log-full-message="true"/>
</int:chain>

Disallowed Attributes and Elements

체인 내에 있는 구성 요소에선 orderinput-channel같은 몇 가지 속성들을 지정할 수 없다. 하위요소 poller에서도 마찬가지다.

Spring Integration 핵심 컴포넌트들의 제약 조건 일부는 XML 스키마 자체만으로 강제된다. 하지만 비핵심 컴포넌트들이나 사용자의 커스텀 컴포넌트의 경우, 이런 제약들은 XML 스키마가 아닌 XML 네임스페이스 파서를 통해 적용된다.

XML 네임스페이스 파서의 제약 조건은 Spring Integration 2.2에서 추가됐다. 허용하지 않는 속성이나 요소를 사용하려고 하면 XML 네임스페이스 파서에서 BeanDefinitionParsingException을 던진다.

8.6.2. Using the ‘id’ Attribute

Spring Integration 3.0부터 체인 요소에 id 속성이 주어지면, 체인의 id와 요소 자체의 id를 조합한 값을 요소의 빈 이름으로 사용한다. id 속성이 없는 요소는 빈으로 등록되진 않지만, 각각은 체인 id를 포함하는 componentName이 부여된다. 아래 예시를 살펴보자:

<int:chain id="somethingChain" input-channel="input">
    <int:service-activator id="somethingService" ref="someService" method="someMethod"/>
    <int:object-to-json-transformer/>
</int:chain>

위 예제에선:

<chain> 요소들에 id 속성을 사용하면 JMX 익스포터에도 적합하며, 메시지 히스토리에서 추적하기도 좋다. 앞에서 언급한 것처럼 적당한 빈 이름을 사용하면 BeanFactory로 액세스할 수도 있다.

<chain> 요소에 id 속성을 명시하면, 로그에서 하위 구성 요소를 간단하게 식별하고, BeanFactory에서 액세스할 수 있어 유용하다.

8.6.3. Calling a Chain from within a Chain

간혹 체인 내에서 다른 체인을 중첩해서 실행한 뒤, 다시 돌아와 원래의 체인을 이어서 실행해야 할 때가 있다. 이럴 땐 다음 예제와 같이 <gateway> 요소를 추가해서 메시징 게이트웨이를 사용하면 된다:

<int:chain id="main-chain" input-channel="in" output-channel="out">
    <int:header-enricher>
      <int:header name="name" value="Many" />
    </int:header-enricher>
    <int:service-activator>
      <bean class="org.foo.SampleService" />
    </int:service-activator>
    <int:gateway request-channel="inputA"/>
</int:chain>

<int:chain id="nested-chain-a" input-channel="inputA">
    <int:header-enricher>
        <int:header name="name" value="Moe" />
    </int:header-enricher>
    <int:gateway request-channel="inputB"/>
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

<int:chain id="nested-chain-b" input-channel="inputB">
    <int:header-enricher>
        <int:header name="name" value="Jack" />
    </int:header-enricher>
    <int:service-activator>
        <bean class="org.foo.SampleService" />
    </int:service-activator>
</int:chain>

위 예제에선 main-chain 처리가 끝나갈 때 체인 끝에 설정돼있는 ‘gateway’ 요소가 nested-chain-a를 호출한다. nested-chain-a를 처리하는 동안은 헤더를 추가enrichment한 후에 nested-chain-b를 호출한다. 이후 원래 흐름으로 돌아와 nested-chain-b에서의 실행을 완료한다. 마지막엔 main-chain으로 돌아간다. 체인 안에 <gateway> 요소를 중첩해서 정의할 땐 service-interface 속성이 필요하지 않다. 대신 메시지를 현재 상태로 가져와 request-channel 속성에 정의돼있는 채널에 배치한다. 이 게이트웨이가 시작한 다운스트림 플로우가 완료되면 게이트웨이로 Message가 반환되고 현재 체인에서 여정을 이어간다.


8.7. Scatter-Gather

Spring Integration은 4.1 버전부터 엔터프라이즈 통합 패턴 scatter-gather의 구현체를 제공한다. 이 구현체는 복합 엔드포인트로, 여러 수신자recipient에게 메시지를 보내고 그 결과를 집계하는 것이 목적이다. Enterprise Integration Patterns에서 언급하는 것 처럼, 이 컴포넌트는 “최상의 견적best quote“을 찾는 시나리오에 적용할 수 있다. 여기서 말하는 최상의 견적이란, 여러 공급 업체supplier에 정보를 요청하고 요청 항목에 가장 적합한 조건을 제공할 수 있는 업체를 결정하는 것을 말한다.

예전엔 이 패턴을 구현하려면 필요한 컴포넌트들을 개별적으로 구성했었다. 이제는 전보다 간편하게 설정할 수 있다.

ScatterGatherHandlerPublishSubscribeChannel(또는 RecipientListRouter)과 AggregatingMessageHandler를 결합한 request-reply 엔드포인트다. 요청 메시지는 scatter 채널로 전송되며, ScatterGatherHandler는 응답을 기다렸다가 aggregator를 이용해 응답을 outputChannel로 전송한다.

8.7.1. Functionality

Scatter-Gather 패턴에는 “경매auction” 방식과 “배급distribution” 방식이 있다. 두 시나리오 모두 aggregation 기능은 동일하며, AggregatingMessageHandler에서 사용할 수 있는 모든 옵션을 제공한다. (사실 ScatterGatherHandler의 생성자엔 AggregatingMessageHandler 인자만 넘겨주면 된다.) 자세한 내용은 Aggregator를 참고해라.

Auction

Scatter-Gather의 auction 버전은 요청 메시지에 “publish-subscribe” 로직을 적용한다. 이때 “scatter” 채널은 apply-sequence="true"PublishSubscribeChannel이다. 물론 이 채널엔 어떤 MessageChannel 구현체라도 사용할 수 있다 (ContentEnricherrequest-channel에서처럼 — Content Enricher 참고). 하지만 그러려면 aggregation에 사용할 자체 커스텀 correlationStrategy를 만들어야 한다.

Distribution

Scatter-Gather의 distribution 버전은 RecipientListRouter 기반이어서, RecipientListRouter에서 제공하는 모든 옵션을 사용할 수 있다 (RecipientListRouter 참고). recipient-list-routeraggregator에서 디폴트 correlationStrategy만 사용하고 싶다면 apply-sequence="true"를 지정해야 한다. 그렇지 않으면 aggregator에서 사용할 커스텀 correlationStrategy를 제공해줘야 한다. PublishSubscribeChannel 방식(auction 버전)과는 달리, recipient-list-router selector 옵션을 사용하면 메시지를 기반으로 타겟 supplier를 필터링할 수 있다. apply-sequence="true"를 사용하면 디폴트 sequenceSize가 제공되서 aggregator에서 그룹을 적당히 release할 수 있다. distribution 옵션과 auction 옵션을 함께 사용할 순 없다.

auction과 distribution 방식 모두, aggregator의 응답 메시지를 기다릴 수 있도록 요청 (scatter) 메시지에 gatherResultChannel 헤더를 추가한다.

기본적으로 모든 supplier는 replyChannel 헤더로 결과를 전송해야 한다 (보통은 최종 엔드포인트에서 output-channel을 생략하는 식으로). 하지만 gatherChannel 옵션도 제공하므로, supplier는 이 채널에 응답을 보내 집계할 수도 있다.

8.7.2. Configuring a Scatter-Gather Endpoint

다음은 Scatter-Gather 빈을 정의하는 자바 설정 예시다:

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

위 예시에선 applySequence="true"와 recipient 채널 리스트를 사용해 RecipientListRouter distributor 빈을 설정하고 있다. 그 다음 보이는 빈은 AggregatingMessageHandler다. 마지막으로 이 두 가지 빈을 ScatterGatherHandler 빈 정의에 주입하고 @ServiceActivator로 마킹해서, 이 scatter-gather 컴포넌트를 인티그레이션 플로우에 연결한다.

다음은 XML 네임스페이스를 이용해 <scatter-gather> 엔드포인트를 설정하는 예시다:

<scatter-gather
		id=""  <!-- (1) -->
		auto-startup=""  <!-- (2) -->
		input-channel=""  <!-- (3) -->
		output-channel=""  <!-- (4) -->
		scatter-channel=""  <!-- (5) -->
		gather-channel=""  <!-- (6) -->
		order=""  <!-- (7) -->
		phase=""  <!-- (8) -->
		send-timeout=""  <!-- (9) -->
		gather-timeout=""  <!-- (10) -->
		requires-reply="" > <!-- (11) -->
			<scatterer/>  <!-- (12) -->
			<gatherer/>  <!-- (13) -->
</scatter-gather>

(1) 이 엔드포인트의 ID.
ScatterGatherHandler 빈은 id + '.handler'라는 alias와 함께 등록된다.
RecipientListRouter 빈은 id + '.scatterer'라는 alias로 등록된다.
AggregatingMessageHandler 빈은 id + '.gatherer'라는 alias로 등록된다.
생략할 수 있다.
(BeanFactory가 디폴트 id 값을 생성해준다.)

(2) 애플리케이션 컨텍스트를 초기화할 때 이 엔드포인트를 시작해야 하는지를 나타내는 라이프사이클 속성.
참고로, ScatterGatherHandlerLifecycle도 구현하고 있으며, gather-channel을 제공하면 내부적으로 생성하는 gatherEndpoint를 시작하고 중지한다.
생략할 수 있다.
(디폴트는 true다.)

(3) ScatterGatherHandler로 처리할 요청 메시지를 수신하는 채널.
필수 값이다.

(4) ScatterGatherHandler가 집계 결과를 전송할 채널.
생략할 수 있다.
(전달받는 메시지 자체에서 replyChannel 헤더를 이용해 응답 채널을 지정할 수 있다.)

(5) auction 시나리오에서 scatter 메시지를 전송할 채널.
생략할 수 있다.
하위 요소 <scatterer>와 함께 사용할 수 없다.

(6) 집계를 위해 각 supplier로부터 응답을 수신할 채널.
scatter 메시지의 replyChannel 헤더로 사용한다.
생략할 수 있다.
기본적으로 FixedSubscriberChannel이 만들어진다.

(7) 둘 이상의 핸들러가 같은 DirectChannel을 구독하는 경우를 위한 이 컴포넌트의 순서 (로드 밸런싱 용도).
생략할 수 있다.

(8) 이 엔드포인트를 시작하고 중지해야 하는 phase를 지정한다.
시작은 제일 낮은 값에서부터 제일 높은 값 순서로 진행하며, 종료는 높은 값에서 낮은 값 순이다.
기본적으로 이 값은 Integer.MAX_VALUE로, 이 컨테이너는 가능한 한 늦게 시작하며, 중지는 가능한 한 빨리 진행한다.
생략할 수 있다.

(9) 응답 Messageoutput-channel로 전송할 때 대기하는 타임아웃 간격.
기본적으론 전송하는 1초 동안 블로킹한다.
고정 ‘capacity’를 사용하는 QueueChannel같이, ‘전송’에 제한이 있는 출력 채널을 사용할 때만 적용된다.
타이아웃이 발생하면 MessageDeliveryException을 던진다.
AbstractSubscribableChannel의 구현체들은 send-timeout을 무시한다.
group-timeout(-expression)의 경우 예약된 expire 태스크에서 MessageDeliveryException이 발생하면 해당 태스크를 다시 스케줄링한다.
생략할 수 있다.

(10) scatter-gather가 응답 메시지를 반환하기 전 대기할 시간을 지정할 수 있다.
기본적으론 무한정 대기한다.
응답 시간을 초과하면 ‘null’을 반환한다.
생략할 수 있다.
기본값은 무기한 대기를 뜻하는 -1이다.

(11) scatter-gather가 반드시 null이 아닌 값을 반환해야 하는지 여부를 지정한다.
기본값은 true다.
따라서 내부 aggregator가 gather-timeout이 지나고 나서 null 값을 반환하면 ReplyRequiredException이 발생한다.
null을 반환할 수 있다면, gather-timeout을 지정해야 무한정 대기하지 않는다는 점에 주의하자.

(12) <recipient-list-router> 옵션들.
생략할 수 있다.
scatter-channel 속성과는 함께 사용할 수 없다.

(13) <aggregator> 옵션들.
필수 값이다.

8.7.3. Error Handling

Scatter-Gather는 여러 개의 request와 reply를 다루는 컴포넌트기 때문에 에러를 핸들링하기가 조금 더 복잡한 편이다. ReleaseStrategy에서 요청보다 응답이 적어도 처리를 종료하도록 만든다면, 경우에 따라서는 다운스트림에서 발생한 예외를 잡아 그저 무시하는 게 더 좋을 수도 있다. 그렇지 않을 땐 에러가 발생하면 하위 플로우에서 “보상 메시지compensation message“같은 걸 반환하는 것을 고려해봐야 한다.

하위 플로우 중 비동기async 플로우가 있다면 전부 errorChannel 헤더를 설정해야 MessagePublishingErrorHandler가 적절한 에러 메시지를 전송할 수 있다. 그렇지 않으면 공통 에러 핸들링 로직을 통해 글로벌 errorChannel로 에러를 전송한다. 비동기 에러 처리에 대한 자세한 내용은 에러 핸들링을 참고해라.

동기Synchronous 플로우는 ExpressionEvaluatingRequestHandlerAdvice를 사용해 예외를 무시하거나 보상 메시지를 반환할 수 있다. 하위 플로우 중 하나가 ScatterGatherHandler로 예외를 던지면 ScatterGatherHandler는 업스트림으로 예외를 다시 던진다. 그렇기 때문에 다른 하위 플로우에서 무언가를 처리했더라도 ScatterGatherHandler에선 그 응답을 무시하게 된다. 원하는 동작일 수도 있지만, 대부분의 경우 다른 플로우와 gatherer 동작에 영향을 주지 않도록 하위 플로우에서 발생한 에러를 적당히 처리해주는 것이 좋다.

5.1.3 버전부터 ScatterGatherHandlererrorChannelName 옵션과 함께 제공된다. 이 값은 scatter 메시지의 errorChannel 헤더에 채워지며, 비동기 에러가 발생할 때 사용하거나, 일반적인 동기식 하위 플로우에서 에러 메시지를 직접 전송할 때 사용할 수 있다.

다음은 보상 메시지를 반환해 에러를 비동기로 처리하는 예시다:

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .applySequence(true)
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

적절한 응답을 생성하려면, MessagePublishingErrorHandlerscatterGatherErrorChannel로 전송한 MessagingException에서 failedMessage의 헤더를 복사해야 한다 (replyChannel, errorChannel 포함). 이렇게 하면 응답 메시지 그룹을 완료할 수 있도록 ScatterGatherHandler gatherer로 타겟 exception이 반환된다. 이런 exception payload는 gatherer의 MessageGroupProcessor에서 필터링하거나, scatter-gather 엔드포인트 이후 다운스트림에서 다른 식으로 처리할 수 있다.

ScatterGatherHandler는 scatter 결과를 gatherer에게 보내기 전에 reply, error 채널을 포함한 (있다면) 요청 메시지 헤더를 복원한다. 이렇게 하면 하위 플로우(scatter 수신자recipient)와 비동기로 메시지를 주고 받더라도 호출자에게 AggregatingMessageHandler에서 발생한 에러를 전파할 수 있다. 제대로 동작하려면 반드시 하위 플로우(scatter 수신자recipient)의 응답으로 gatherResultChannel, originalReplyChannel, originalErrorChannel 헤더가 전달돼야 한다. 이땐 반드시 ScatterGatherHandler에 적당한 gatherTimeout을 설정해줘야 한다 (유한한 값으로). 그렇지 않으면 디폴트 동작에 따라 gatherer의 응답을 한없이 기다리며 블로킹된다.


8.8. Thread Barrier

간혹 다른 비동기 이벤트가 발생할 때까지 메시지 플로우 스레드를 잠시 중단해야 할 때가 있다. 예를 들어서 HTTP 요청을 통해 RabbitMQ에 메시지를 발행한다고 생각해보자. RabbitMQ 브로커가 메시지를 수신했다고 승인acknowledgment하기 전까진 사용자에게 응답을 전송하지 않길 바랄 수 있다.

Spring Integration은 4.2 버전에서 이런 용도로 사용할 수 있는 <barrier/> 컴포넌트를 도입했다. 내부에서 사용하는 MessageHandlerBarrierMessageHandler다. 이 클래스는 MessageTriggerAction도 구현하고 있는데, 여기 있는 trigger() 메소드에 메시지가 전달되면 handleRequestMessage() 메소드에서 그에 맞는 스레드를 (있으면) 놓아준다release.

일시 중단 중인 스레드suspended thread와 트리거 스레드trigger thread는 메시지로 CorrelationStrategy를 호출해서 연계한다. input-channel로 메시지가 전달되면 이 스레드는 그에 상응하는 트리거 메시지를 기다리며 최대 requestTimeout 밀리세컨드 동안 일시 중단된다. 디폴트 correlation 전략은 IntegrationMessageHeaderAccessor.CORRELATION_ID 헤더를 사용한다. 동일한 correlation을 가진 트리거 메시지가 도착하면 이 스레드를 놓아준다release. 스레드를 해제하고 나서 output-channel로 전송하는 메시지는 MessageGroupProcessor를 사용해 구성한다. 이 메시지는 기본적으로 두 개의 페이로드를 가진 Collection<?>이며, 헤더는 DefaultAggregatingMessageGroupProcessor를 사용해 병합한다.

스레드를 일시 중단하기도 전에 trigger() 메소드가 먼저 호출되면 (또는 메인 스레드에서 타임아웃이 발생한 이후 호출되면), 최대 triggerTimeout 동안 일시 중단을 유발했어야 할 메시지가 도착하기를 기다린다. 트리거 스레드를 일시 중단하고 싶지 않다면, 대신 TaskExecutor를 사용해서 여기 있는 스레드가 일시 중단되도록 하는 것도 괜찮다.

5.4 버전 이전에는 요청 메시지와 트리거 메시지에 타임아웃을 지정할 땐 timeout 옵션 하나를 공유했었지만, 사용하기에 따라 각각에 타임아웃 값을 다르게 설정하는 게 더 좋은 경우가 있었다. 따라서 requestTimeouttriggerTimeout 옵션을 새로 도입했다.

requires-reply 프로퍼티는 일시 중단 중인 스레드가 트리거 메시지 도착 전에 타임아웃된 경우 수행할 일을 결정한다. 기본값은 false로, 엔드포인트는 null을 리턴해 플로우를 종료하고, 스레드는 호출자로 반환된다. true일 땐 ReplyRequiredException이 발생한다.

trigger() 메소드는 코드에서 직접 호출할 수 있다 (barrier.handler라는 이름으로 빈 참조를 얻어서 — 이때 barrier는 barrier 엔드포인트 빈의 이름이다). 또는 <outbound-channel-adapter/>를 설정해서 release를 트리거해도 된다.

하나의 correlation은 하나의 스레드만 중단할 수 있다. 같은 correlation을 여러 번 사용할 수는 있지만 동시에 사용하는 것은 안 된다. 같은 correlation으로 스레드가 두 번 도착하면 예외가 발생한다.

다음은 correlation에 커스텀 헤더를 사용하는 예시다:

Java XML
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator(inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger(message);
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

어느 쪽에 먼저 메시지가 도착했는지에 따라 in에 메시지를 전송하는 스레드 혹은 release에 메시지를 전송하는 스레드가 상대 메시지가 도착할 때까지 최대 10초 동안 대기하게 된다. 메시지가 release되면 myOutputProcessor라는 커스텀 MessageGroupProcessor 빈을 실행해 합친 메시지를 out 채널로 전송한다. 메인 스레드가 타임아웃되고 나서 트리거가 도착하면, 뒤늦게 도착한 트리거를 전송할 discard 채널을 설정할 수 있다.

이 컴포넌트를 사용하는 예제가 궁금하다면, barrier 샘플 애플리케이션을 확인해봐라.


Next :
Message Transformation
트랜스포머 인터페이스와 구현체들

전체 목차는 여기에 있습니다.

<< >>

TOP