토리맘의 한글라이즈 프로젝트 logo 토리맘의 한글라이즈 프로젝트

스프링 인티그레이션 공식 레퍼런스를 한글로 번역한 문서입니다.

전체 목차는 여기에 있습니다.


Spring Integration의 Message는 데이터를 담기 위한 범용 컨테이너다. 페이로드로 원하는 객체를 제공할 수 있으며, Message 인스턴스마다 헤더에 사용자가 다양하게 확장할 수 있는 프로퍼티(키-값 쌍)를 담을 수 있다.

목차


7.1. The Message Interface

다음은 Message 인터페이스의 정의다:

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message 인터페이스는 API의 핵심 인터페이스다. 데이터를 범용 래퍼wrapper로 캡슐화하기 때문에 메시징 시스템에선 데이터 타입을 알지 못해도 데이터를 전달하고 전파할 수 있다. 애플리케이션에 새로운 타입이 추가되거나 기존 타입 자체가 수정, 확장되더라도 메시징 시스템은 영향 받지 않는다. 한편 메시징 시스템의 구성 요소에서 Message에 관한 정보에 접근해야 할 때는 보통, 메시지 헤더에 관련 메타데이터를 저장해놓고 여기서 조회해가면 된다.


7.2. Message Headers

Spring Integration에선 원하는 모든 ObjectMessage의 페이로드로 사용할 수 있듯이, 헤더 값 역시 모든 Object 타입을 지원한다. 실제로 MessageHeaders 클래스는 아래 클래스 정의에서 확인할 수 있듯이 java.util.Map 인터페이스를 구현하고 있다:

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}

MessageHeaders 클래스는 Map을 구현하고 있지만 사실상 읽기 전용 구현체다. Map에 값을 put하려고 하면 UnsupportedOperationException이 발생한다. removeclear에서도 마찬가지다. 메시지는 여러 컨슈머에 전달될 수 있기 때문에 Map의 구조를 수정할 순 없다. 마찬가지로 메시지의 페이로드 Object도 초기 생성 이후 set은 불가능하다. 하지만 헤더 값 (또는 페이로드 객체) 자체에 대한 수정mutability은 프레임워크 사용자가 결정할 수 있도록 남겨뒀다.

MessageHeadersMap을 구현하고 있기 때문에 헤더의 이름으로 get(..)을 호출하면 헤더를 조회할 수 있다. 아니면 예상하는 Class를 추가 파라미터로 제공해도 된다. 더 간단하게는 미리 정의돼있는 전용 getter들을 이용해 조회해갈 수도 있다. 다음은 이 세 가지 옵션을 각각 보여주는 예시다:

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

다음은 미리 정의돼있는 메시지 헤더들을 담고있는 테이블이다:

Table 1. Pre-defined Message Headers

Header Name Header Type Usage
MessageHeaders.ID java.util.UUID 이 메시지 인스턴스의 식별자. 메시지가 변경될 때마다 바뀐다.
MessageHeaders.TIMESTAMP java.lang.Long 메시지를 생성한 시간. 메시지가 변경될 때마다 바뀐다.
MessageHeaders.REPLY_CHANNEL java.lang.Object (String or MessageChannel) 출력 채널을 명시하지 않은 상태에서, ROUTING_SLIP이 없거나 ROUTING_SLIP을 다 소진한 경우 응답(있으면)을 전송할 채널. String을 사용한다면, 반드시 빈의 이름이나 ChannelRegistry로 생성한 값을 사용해야 한다.
MessageHeaders.ERROR_CHANNEL java.lang.Object (String or MessageChannel) 에러를 전송할 채널. String을 사용한다면, 반드시 빈의 이름이나 ChannelRegistry로 생성한 값을 사용해야 한다.

다양한 인바운드/아웃바운드 어댑터 구현체에서 헤더를 활용하고 있으며, 사용자 정의 헤더를 추가로 설정할 수도 있다. 헤더를 사용하는 모듈에선 관련 헤더를 위한 상수를 확인할 수 있다 (ex. AmqpHeaders, JmsHeaders 등).

7.2.1. MessageHeaderAccessor API

스프링 프레임워크 4.0, Spring Integration 4.0부터 메시지 처리를 추상화하는 핵심 로직은 spring-messaging 모듈로 이동했으며, MessageHeaderAccessor API를 도입해 메시지 처리를 한 층 더 추상화시켰다. 이제 모든 (코어) Spring Integration 전용 메시지 헤더 상수들은 IntegrationMessageHeaderAccessor 클래스에 선언돼있다. 다음은 미리 정의돼있는 메시지 헤더들을 담아놓은 테이블이다:

Table 2. Pre-defined Message Headers

Header Name Header Type Usage
IntegrationMessageHeaderAccessor.
CORRELATION_ID
java.lang.Object 둘 이상의 메시지를 연관시킬 때 사용한다.
IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
java.lang.Integer 일반적으로 SEQUENCE_SIZE 만큼의 메시지를 가지고 있는 메시지 그룹 내에서의 시퀀스 번호를 나타내지만, <resequencer/>에서 제한이 없는unbounded 메시지 그룹의 순서를 재지정하는 데 사용하기도 한다.
IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
java.lang.Integer 연관 메시지 그룹 안에 들어있는 메시지 수.
IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
java.lang.Long 메시지가 만료되는 시기를 나타낸다. 프레임워크에서 직접 사용하진 않지만, header enricher로 세팅할 수 있으며, UnexpiredMessageSelector를 구성한 <filter/>에서 활용한다.
IntegrationMessageHeaderAccessor.
PRIORITY
java.lang.Integer 메시지 우선순위 — 예를 들면 PriorityChannel 안에서 활용한다.
IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
java.lang.Boolean Idempotent receiver 인터셉터에서 중복 메시지를 감지하면 true로 세팅된다. Idempotent Receiver 엔터프라이즈 통합 패턴을 참고해라.
IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
java.io.Closeable 이 헤더는 메시지가 메시지 처리를 완료할 때 close해야 하는 Closeable과 관련있을 때 추가하는 헤더다. 예시로 FTP, SFTP 등을 이용한 파일 스트리밍 전송 관련 Session을 들 수 있다.
IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
java.lang. AtomicInteger message-driven 채널 어댑터에서 RetryTemplate 설정을 지원하는 경우, 이 헤더에 현재 전송 시도 횟수를 저장한다.
IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
o.s.i.support. Acknowledgment Callback 인바운드 엔드포인트에서 콜백을 지원하는 경우, 메시지를 수락accept, 거부reject, 대기열에 재추가requeue할 수 있는 콜백. Deferred Acknowledgement Pollable Message SourceMQTT Manual Acks를 참고해라.

이 헤더들 중에는 다음과 같이 IntegrationMessageHeaderAccessor 클래스에서 간편한 전용 getter를 제공하는 헤더도 있다:

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

다음은 IntegrationMessageHeaderAccessor에 정의는 돼있지만 일반적으로 사용자 코드에선 사용할 일이 없는 헤더를 나타낸 테이블이다 (즉, 이 헤더들은 보통 Spring Integration 내부에서 사용하며, 참고용으로만 함께 정리했다):

Table 3. Pre-defined Message Headers

Header Name Header Type Usage
IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
java.util. List<List<Object>> 상관 관계 중첩이 필요할 때 사용하는 correlation 데이터 스택 (ex. splitter→…→splitter→…→aggregator→…→aggregator).
IntegrationMessageHeaderAccessor.
ROUTING_SLIP
java.util. Map<List<Object>, Integer> Routing Slip 참고.

7.2.2. Message ID Generation

애플리케이션 내에서 메시지가 이동하면서 변형될 때마다 (ex. transformer) 메시지 ID가 새로 할당된다. 이 메시지 ID는 UUID 값이다. 이전에는 java.util.UUID.randomUUID()로 ID를 생성했지만, Spring Integration 3.0부터는 디폴트로 좀더 효율적인 전략을 사용한다. 이 전략에선 매번 secure random number를 생성하는 대신, secure random seed를 기반으로 만든 단순 난수를 사용한다.

다른 전략을 통해 UUID를 생성하려면 애플리케이션 컨텍스트에 org.springframework.util.IdGenerator를 구현한 빈을 선언해주면 된다.

UUID 생성 전략은 클래스로더 당 하나만 사용할 수 있다. 즉, 둘 이상의 애플리케이션 컨텍스트를 같은 클래스로더에서 실행하게 되면 동일한 전략을 공유한다는 뜻이다. 컨텍스트 중 하나가 전략을 변경하면 모든 컨텍스트에서 변경된 전략을 사용하게 된다. 같은 클래스로더에 있는 둘 이상의 컨텍스트가 org.springframework.util.IdGenerator 타입 빈을 선언한다면 모두 동일한 클래스의 인스턴스여야 한다. 그렇지 않으면 커스텀 전략을 교체하려는 컨텍스트는 초기화에 실패한다. 같은 전략이지만 파라미터를 이용하는 경우 첫 번째로 초기화되는 컨텍스트의 전략을 사용한다.

디폴트 전략 외에도 두 가지 IdGenerator를 추가로 제공한다. 먼저 org.springframework.util.JdkIdGenerator는 이전 UUID.randomUUID() 메커니즘을 사용한다. 실제로 UUID까지는 필요하지 않고 단순히 증가하는 값으로 충분할 땐 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator를 활용해도 된다.

7.2.3. Read-only Headers

MessageHeaders.IDMessageHeaders.TIMESTAMP는 읽기 전용 헤더로 재정의가 불가능하다.

4.3.2 버전부터 MessageBuilder는 업스트림 Message에서 복사해오면 안 되는 헤더 목록을 커스텀할 수 있도록 readOnlyHeaders(String… readOnlyHeaders) API를 제공한다. 기본적으론 MessageHeaders.IDMessageHeaders.TIMESTAMP만 읽기 전용으로 취급한다. 글로벌 spring.integration.readOnly.headers 프로퍼티로는 프레임워크 컴포넌트들을 위한 DefaultMessageBuilderFactory를 커스텀할 수 있다 (글로벌 프로퍼티 참고). 이 프로퍼티는 ObjectToJsonTransformercontentType 헤더를 채우듯이, 기본적으로 채워지는 일부 헤더를 비활성화하고 싶을 때 유용할 거다 (JSON Transformers 참고).

MessageBuilder를 이용해 새 메시지를 만들 땐 이런 읽기 전용 헤더는 무시하며, 로그에 INFO 메시지를 기록한다.

5.0 버전부터 Messaging Gateway, Header Enricher, Content Enricher, Header Filter에선 DefaultMessageBuilderFactory를 사용할 때 MessageHeaders.IDMessageHeaders.TIMESTAMP 헤더는 설정할 수 없으며, BeanInitializationException이 발생한다.

7.2.4. Header Propagation

메시지를 생성하는 엔드포인트에서 메시지를 처리(및 수정)할 때는 (ex. service activator), 일반적으로 인바운드 헤더를 아웃바운드 메시지로 전파시킨다. 한 가지 예외가 있다면, 프레임워크에 완전한 메시지를 반환하는 transformer다. transformer가 완전한 메시지를 반환할 땐 전체 아웃바운드 메시지를 처리하는 일은 사용자 코드에서 담당해야 한다. transformer가 페이로드만 반환한다면 인바운드 헤더를 아웃바운드 메시지로 전파한다. 참고로, 헤더는 아웃바운드 메시지에 이미 존재하지 않을 때에만 전파되므로, 필요에 따라 헤더 값을 변경할 수도 있다.

4.3.10 버전부터는 메시지 핸들러(메시지를 수정하고 출력을 생성하는)를 구성해 특정 헤더의 전파를 억제할 수 있다. 복사를 막고싶은 헤더를 설정하려면 추상 클래스 MessageProducingMessageHandler에 있는 setNotPropagatedHeaders() 메소드나 addNotPropagatedHeaders() 메소드를 호출해라.

아니면 META-INF/spring.integration.propertiesreadOnlyHeaders 프로퍼티에 헤더 목록을 콤마로 구분해서 설정하면, 특정 메시지 헤더들의 전파를 전역적으로 억제할 수도 있다.

5.0 버전부터는 AbstractMessageProducingHandlersetNotPropagatedHeaders() 구현부에선 간단한 패턴(ex. xxx*, xxx, *xxx, xxx*yyy)을 적용해 공통 suffix나 prefix를 가지고 있는 헤더를 필터링할 수 있다. 자세한 내용은 PatternMatchUtils Javadoc을 참고해라. 패턴 중 하나가 *(asterisk)이면 아무런 헤더도 전파되지 않으며, 다른 패턴은 전부 무시한다. 이 경우 서비스 activator는 transformer와 동일하게 동작하며, 필요한 헤더들은 반드시 서비스 메소드에서 Message에 담아 반환해야 한다. Java DSL에선 ConsumerEndpointSpec에서 notPropagatedHeaders() 옵션을 이용할 수 있다. XML 설정 역시 <service-activator> 구성 요소의 not-propagated-headers 속성으로 이용할 수 있다.

헤더 전파 억제는 브리지bridge라우터router같이 메시지를 수정하지 않는 엔드포인트엔 적용되지 않는다.


7.3. Message Implementations

Message 인터페이스의 기본 구현체는 GenericMessage<T>로, 다음과 같은 두 가지 생성자를 제공한다:

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

Message 생설될 땐 랜덤한 고유 ID가 만들어진다. 헤더 Map을 받는 생성자는 건내받은 헤더들을 새로 생성하는 Message로 복사한다.

Message 구현체 중에는 에러 상태를 주고받을 수 있도록 설계한 구현체도 있다. 이 구현체는 다음과 같이 Throwable 객체를 페이로드로 받는다:

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

이 구현체에선 상속받은 GenericMessage가 타입을 파라미터화한다는 점을 활용하고 있다. 그렇기 때문에 위 예제에서 알 수 있듯이 타입 캐스팅 없이 Message 페이로드 Object를 조회해갈 수 있다.


7.4. The MessageBuilder Helper Class

Message 인터페이스는 페이로드와 헤더를 조회해갈 수 있는 메소드를 정의하고 있지만, setter는 제공하지 않는다는 것을 눈치챈 사람도 있을 거다. setter가 없는 이유는 Message는 최초에 생성된 이후에는 수정할 수 없기 때문이다. 따라서 Message 인스턴스를 여러 컨슈머에 전송할 때는 (ex. publish-subscribe 채널을 통해), 컨슈머 중 다른 페이로드 타입으로 응답을 보내야 하는 컨슈머가 있는 경우 반드시 Message를 새로 생성해야 한다. Message를 새로 만들어 보내면 다른 컨슈머에선 페이로드 타입이 바뀌더라도 영향 받지 않는다. 여기서 중요한 것은 여러 컨슈머가 동일한 페이로드 인스턴스나 헤더 값에 액세스할 수 있으며, 이런 인스턴스 자체를 변경 불가능하게immutable 만들지에 대한 결정은 사용자에게 남겨져있다는 점이다. 즉, Message 인스턴스의 역할contract은 수정이 불가능한 Collection 인터페이스와 유사하며, MessageHeaders 맵을 보면 조금 더 이해가 될 거다. MessageHeaders 클래스가 java.util.Map을 구현하고 있더라도, MessageHeaders 인스턴스에서 put 연산(또는 ‘remove’나 ‘clear’)을 호출하려고 하면 UnsupportedOperationException이 발생한다.

Map을 생성하고 값을 채워서 GenericMessage의 생성자에 넘기는 방법도 있지만, Spring Integration은 훨씬 더 간편하게 메시지를 구성할 수 있는 MessageBuilder를 제공한다. MessageBuilder는 기존 Message나 페이로드 Object를 이용해 Message 인스턴스를 생성할 수 있는 두 가지 팩토리 메소드를 제공한다. 기존에 가지고 있는 Message로부터 빌드할 때는 (fromMessage) 아래 예제에서 알 수 있듯이 Message에 있는 헤더와 페이로드를 새 Message로 복사한다:

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

새로운 페이로드로 Message를 생성해야 하지만 헤더는 기존 Message에서 복사해오고 싶다면, 아래 보이는 ‘copy’ 메소드 중 하나를 이용하면 된다:

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

copyHeadersIfAbsent 메소드는 기존에 있는 값들은 덮어쓰지 않는다는 점에 주의하자. 위 예제에선 setHeader로 사용자 정의 헤더를 설정하는 방법도 확인할 수 있다. 마지막으로, 원하는 헤더를 직접 설정하는 메소드 외에도, 몇몇 헤더들은 전용으로 사용할 수 있는 set 메소드가 미리 정의돼 있다 (이 헤더들의 이름 역시 MessageHeaders에 상수로 선언돼있다).

MessageBuilder를 이용하면 다음과 같이 메시지들의 우선 순위도 설정할 수 있다:

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

priority 헤더는 PriorityChannel을 사용할 때만 고려한다 (다음 챕터에서 설명한다). 헤더의 값은 java.lang.Integer로 정의한다.


Next :
Message Routing
메시지 router, filter, splitter, aggregator 소개

전체 목차는 여기에 있습니다.

<< >>

TOP