스프링 인티그레이션 공식 레퍼런스를 한글로 번역한 문서입니다.
전체 목차는 여기에 있습니다.
Spring Integration의 Message
는 데이터를 담기 위한 범용 컨테이너다. 페이로드로 원하는 객체를 제공할 수 있으며, Message
인스턴스마다 헤더에 사용자가 다양하게 확장할 수 있는 프로퍼티(키-값 쌍)를 담을 수 있다.
목차
- 7.1. The Message Interface
- 7.2. Message Headers
- 7.3. Message Implementations
- 7.4. The MessageBuilder Helper Class
7.1. The Message
Interface
다음은 Message
인터페이스의 정의다:
public interface Message<T> {
T getPayload();
MessageHeaders getHeaders();
}
Message
인터페이스는 API의 핵심 인터페이스다. 데이터를 범용 래퍼wrapper로 캡슐화하기 때문에 메시징 시스템에선 데이터 타입을 알지 못해도 데이터를 전달하고 전파할 수 있다. 애플리케이션에 새로운 타입이 추가되거나 기존 타입 자체가 수정, 확장되더라도 메시징 시스템은 영향 받지 않는다. 한편 메시징 시스템의 구성 요소에서 Message
에 관한 정보에 접근해야 할 때는 보통, 메시지 헤더에 관련 메타데이터를 저장해놓고 여기서 조회해가면 된다.
7.2. Message Headers
Spring Integration에선 원하는 모든 Object
를 Message
의 페이로드로 사용할 수 있듯이, 헤더 값 역시 모든 Object
타입을 지원한다. 실제로 MessageHeaders
클래스는 아래 클래스 정의에서 확인할 수 있듯이 java.util.Map
인터페이스를 구현하고 있다:
public final class MessageHeaders implements Map<String, Object>, Serializable {
...
}
MessageHeaders
클래스는Map
을 구현하고 있지만 사실상 읽기 전용 구현체다. Map에 값을put
하려고 하면UnsupportedOperationException
이 발생한다.remove
와clear
에서도 마찬가지다. 메시지는 여러 컨슈머에 전달될 수 있기 때문에Map
의 구조를 수정할 순 없다. 마찬가지로 메시지의 페이로드Object
도 초기 생성 이후set
은 불가능하다. 하지만 헤더 값 (또는 페이로드 객체) 자체에 대한 수정mutability은 프레임워크 사용자가 결정할 수 있도록 남겨뒀다.
MessageHeaders
는 Map
을 구현하고 있기 때문에 헤더의 이름으로 get(..)
을 호출하면 헤더를 조회할 수 있다. 아니면 예상하는 Class
를 추가 파라미터로 제공해도 된다. 더 간단하게는 미리 정의돼있는 전용 getter들을 이용해 조회해갈 수도 있다. 다음은 이 세 가지 옵션을 각각 보여주는 예시다:
Object someValue = message.getHeaders().get("someKey");
CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);
Long timestamp = message.getHeaders().getTimestamp();
다음은 미리 정의돼있는 메시지 헤더들을 담고있는 테이블이다:
Table 1. Pre-defined Message Headers
Header Name | Header Type | Usage |
---|---|---|
MessageHeaders.ID |
java.util.UUID |
이 메시지 인스턴스의 식별자. 메시지가 변경될 때마다 바뀐다. |
MessageHeaders.TIMESTAMP |
java.lang.Long |
메시지를 생성한 시간. 메시지가 변경될 때마다 바뀐다. |
MessageHeaders.REPLY_CHANNEL |
java.lang.Object (String or MessageChannel) |
출력 채널을 명시하지 않은 상태에서, ROUTING_SLIP 이 없거나 ROUTING_SLIP 을 다 소진한 경우 응답(있으면)을 전송할 채널. String 을 사용한다면, 반드시 빈의 이름이나 ChannelRegistry 로 생성한 값을 사용해야 한다. |
MessageHeaders.ERROR_CHANNEL |
java.lang.Object (String or MessageChannel) |
에러를 전송할 채널. String 을 사용한다면, 반드시 빈의 이름이나 ChannelRegistry 로 생성한 값을 사용해야 한다. |
다양한 인바운드/아웃바운드 어댑터 구현체에서 헤더를 활용하고 있으며, 사용자 정의 헤더를 추가로 설정할 수도 있다. 헤더를 사용하는 모듈에선 관련 헤더를 위한 상수를 확인할 수 있다 (ex. AmqpHeaders
, JmsHeaders
등).
7.2.1. MessageHeaderAccessor
API
스프링 프레임워크 4.0, Spring Integration 4.0부터 메시지 처리를 추상화하는 핵심 로직은 spring-messaging
모듈로 이동했으며, MessageHeaderAccessor
API를 도입해 메시지 처리를 한 층 더 추상화시켰다. 이제 모든 (코어) Spring Integration 전용 메시지 헤더 상수들은 IntegrationMessageHeaderAccessor
클래스에 선언돼있다. 다음은 미리 정의돼있는 메시지 헤더들을 담아놓은 테이블이다:
Table 2. Pre-defined Message Headers
Header Name | Header Type | Usage |
---|---|---|
IntegrationMessageHeaderAccessor. |
java.lang.Object |
둘 이상의 메시지를 연관시킬 때 사용한다. |
IntegrationMessageHeaderAccessor. |
java.lang.Integer |
일반적으로 SEQUENCE_SIZE 만큼의 메시지를 가지고 있는 메시지 그룹 내에서의 시퀀스 번호를 나타내지만, <resequencer/> 에서 제한이 없는unbounded 메시지 그룹의 순서를 재지정하는 데 사용하기도 한다. |
IntegrationMessageHeaderAccessor. |
java.lang.Integer |
연관 메시지 그룹 안에 들어있는 메시지 수. |
IntegrationMessageHeaderAccessor. |
java.lang.Long |
메시지가 만료되는 시기를 나타낸다. 프레임워크에서 직접 사용하진 않지만, header enricher로 세팅할 수 있으며, UnexpiredMessageSelector 를 구성한 <filter/> 에서 활용한다. |
IntegrationMessageHeaderAccessor. |
java.lang.Integer |
메시지 우선순위 — 예를 들면 PriorityChannel 안에서 활용한다. |
IntegrationMessageHeaderAccessor. |
java.lang.Boolean |
Idempotent receiver 인터셉터에서 중복 메시지를 감지하면 true로 세팅된다. Idempotent Receiver 엔터프라이즈 통합 패턴을 참고해라. |
IntegrationMessageHeaderAccessor. |
java.io.Closeable |
이 헤더는 메시지가 메시지 처리를 완료할 때 close해야 하는 Closeable 과 관련있을 때 추가하는 헤더다. 예시로 FTP, SFTP 등을 이용한 파일 스트리밍 전송 관련 Session 을 들 수 있다. |
IntegrationMessageHeaderAccessor. |
java.lang. AtomicInteger |
message-driven 채널 어댑터에서 RetryTemplate 설정을 지원하는 경우, 이 헤더에 현재 전송 시도 횟수를 저장한다. |
IntegrationMessageHeaderAccessor. |
o.s.i.support. Acknowledgment Callback |
인바운드 엔드포인트에서 콜백을 지원하는 경우, 메시지를 수락accept, 거부reject, 대기열에 재추가requeue할 수 있는 콜백. Deferred Acknowledgement Pollable Message Source와 MQTT Manual Acks를 참고해라. |
이 헤더들 중에는 다음과 같이 IntegrationMessageHeaderAccessor
클래스에서 간편한 전용 getter를 제공하는 헤더도 있다:
IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...
다음은 IntegrationMessageHeaderAccessor
에 정의는 돼있지만 일반적으로 사용자 코드에선 사용할 일이 없는 헤더를 나타낸 테이블이다 (즉, 이 헤더들은 보통 Spring Integration 내부에서 사용하며, 참고용으로만 함께 정리했다):
Table 3. Pre-defined Message Headers
Header Name | Header Type | Usage |
---|---|---|
IntegrationMessageHeaderAccessor. |
java.util. List<List<Object>> |
상관 관계 중첩이 필요할 때 사용하는 correlation 데이터 스택 (ex. splitter→…→splitter→…→aggregator→…→aggregator ). |
IntegrationMessageHeaderAccessor. |
java.util. Map<List<Object>, Integer> |
Routing Slip 참고. |
7.2.2. Message ID Generation
애플리케이션 내에서 메시지가 이동하면서 변형될 때마다 (ex. transformer) 메시지 ID가 새로 할당된다. 이 메시지 ID는 UUID
값이다. 이전에는 java.util.UUID.randomUUID()
로 ID를 생성했지만, Spring Integration 3.0부터는 디폴트로 좀더 효율적인 전략을 사용한다. 이 전략에선 매번 secure random number를 생성하는 대신, secure random seed를 기반으로 만든 단순 난수를 사용한다.
다른 전략을 통해 UUID를 생성하려면 애플리케이션 컨텍스트에 org.springframework.util.IdGenerator
를 구현한 빈을 선언해주면 된다.
UUID 생성 전략은 클래스로더 당 하나만 사용할 수 있다. 즉, 둘 이상의 애플리케이션 컨텍스트를 같은 클래스로더에서 실행하게 되면 동일한 전략을 공유한다는 뜻이다. 컨텍스트 중 하나가 전략을 변경하면 모든 컨텍스트에서 변경된 전략을 사용하게 된다. 같은 클래스로더에 있는 둘 이상의 컨텍스트가
org.springframework.util.IdGenerator
타입 빈을 선언한다면 모두 동일한 클래스의 인스턴스여야 한다. 그렇지 않으면 커스텀 전략을 교체하려는 컨텍스트는 초기화에 실패한다. 같은 전략이지만 파라미터를 이용하는 경우 첫 번째로 초기화되는 컨텍스트의 전략을 사용한다.
디폴트 전략 외에도 두 가지 IdGenerator
를 추가로 제공한다. 먼저 org.springframework.util.JdkIdGenerator
는 이전 UUID.randomUUID()
메커니즘을 사용한다. 실제로 UUID까지는 필요하지 않고 단순히 증가하는 값으로 충분할 땐 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator
를 활용해도 된다.
7.2.3. Read-only Headers
MessageHeaders.ID
와 MessageHeaders.TIMESTAMP
는 읽기 전용 헤더로 재정의가 불가능하다.
4.3.2 버전부터 MessageBuilder
는 업스트림 Message
에서 복사해오면 안 되는 헤더 목록을 커스텀할 수 있도록 readOnlyHeaders(String… readOnlyHeaders)
API를 제공한다. 기본적으론 MessageHeaders.ID
와 MessageHeaders.TIMESTAMP
만 읽기 전용으로 취급한다. 글로벌 spring.integration.readOnly.headers
프로퍼티로는 프레임워크 컴포넌트들을 위한 DefaultMessageBuilderFactory
를 커스텀할 수 있다 (글로벌 프로퍼티 참고). 이 프로퍼티는 ObjectToJsonTransformer
가 contentType
헤더를 채우듯이, 기본적으로 채워지는 일부 헤더를 비활성화하고 싶을 때 유용할 거다 (JSON Transformers 참고).
MessageBuilder
를 이용해 새 메시지를 만들 땐 이런 읽기 전용 헤더는 무시하며, 로그에 INFO
메시지를 기록한다.
5.0 버전부터 Messaging Gateway, Header Enricher, Content Enricher, Header Filter에선 DefaultMessageBuilderFactory
를 사용할 때 MessageHeaders.ID
와 MessageHeaders.TIMESTAMP
헤더는 설정할 수 없으며, BeanInitializationException
이 발생한다.
7.2.4. Header Propagation
메시지를 생성하는 엔드포인트에서 메시지를 처리(및 수정)할 때는 (ex. service activator), 일반적으로 인바운드 헤더를 아웃바운드 메시지로 전파시킨다. 한 가지 예외가 있다면, 프레임워크에 완전한 메시지를 반환하는 transformer다. transformer가 완전한 메시지를 반환할 땐 전체 아웃바운드 메시지를 처리하는 일은 사용자 코드에서 담당해야 한다. transformer가 페이로드만 반환한다면 인바운드 헤더를 아웃바운드 메시지로 전파한다. 참고로, 헤더는 아웃바운드 메시지에 이미 존재하지 않을 때에만 전파되므로, 필요에 따라 헤더 값을 변경할 수도 있다.
4.3.10 버전부터는 메시지 핸들러(메시지를 수정하고 출력을 생성하는)를 구성해 특정 헤더의 전파를 억제할 수 있다. 복사를 막고싶은 헤더를 설정하려면 추상 클래스 MessageProducingMessageHandler
에 있는 setNotPropagatedHeaders()
메소드나 addNotPropagatedHeaders()
메소드를 호출해라.
아니면 META-INF/spring.integration.properties
의 readOnlyHeaders
프로퍼티에 헤더 목록을 콤마로 구분해서 설정하면, 특정 메시지 헤더들의 전파를 전역적으로 억제할 수도 있다.
5.0 버전부터는 AbstractMessageProducingHandler
의 setNotPropagatedHeaders()
구현부에선 간단한 패턴(ex. xxx*
, xxx
, *xxx
, xxx*yyy
)을 적용해 공통 suffix나 prefix를 가지고 있는 헤더를 필터링할 수 있다. 자세한 내용은 PatternMatchUtils
Javadoc을 참고해라. 패턴 중 하나가 *
(asterisk)이면 아무런 헤더도 전파되지 않으며, 다른 패턴은 전부 무시한다. 이 경우 서비스 activator는 transformer와 동일하게 동작하며, 필요한 헤더들은 반드시 서비스 메소드에서 Message
에 담아 반환해야 한다. Java DSL에선 ConsumerEndpointSpec
에서 notPropagatedHeaders()
옵션을 이용할 수 있다. XML 설정 역시 <service-activator>
구성 요소의 not-propagated-headers
속성으로 이용할 수 있다.
헤더 전파 억제는 브리지bridge나 라우터router같이 메시지를 수정하지 않는 엔드포인트엔 적용되지 않는다.
7.3. Message Implementations
Message
인터페이스의 기본 구현체는 GenericMessage<T>
로, 다음과 같은 두 가지 생성자를 제공한다:
new GenericMessage<T>(T payload);
new GenericMessage<T>(T payload, Map<String, Object> headers)
Message
가 생성될 땐 랜덤한 고유 ID가 만들어진다. 헤더 Map
을 받는 생성자는 건내받은 헤더들을 새로 생성하는 Message
로 복사한다.
Message
구현체 중에는 에러 상태를 주고받을 수 있도록 설계한 구현체도 있다. 이 구현체는 다음과 같이 Throwable
객체를 페이로드로 받는다:
ErrorMessage message = new ErrorMessage(someThrowable);
Throwable t = message.getPayload();
이 구현체에선 상속받은 GenericMessage
가 타입을 파라미터화한다는 점을 활용하고 있다. 그렇기 때문에 위 예제에서 알 수 있듯이 타입 캐스팅 없이 Message
페이로드 Object
를 조회해갈 수 있다.
7.4. The MessageBuilder
Helper Class
Message
인터페이스는 페이로드와 헤더를 조회해갈 수 있는 메소드를 정의하고 있지만, setter는 제공하지 않는다는 것을 눈치챈 사람도 있을 거다. setter가 없는 이유는 Message
는 최초에 생성된 이후에는 수정할 수 없기 때문이다. 따라서 Message
인스턴스를 여러 컨슈머에 전송할 때는 (ex. publish-subscribe 채널을 통해), 컨슈머 중 다른 페이로드 타입으로 응답을 보내야 하는 컨슈머가 있는 경우 반드시 Message
를 새로 생성해야 한다. Message
를 새로 만들어 보내면 다른 컨슈머에선 페이로드 타입이 바뀌더라도 영향 받지 않는다. 여기서 중요한 것은 여러 컨슈머가 동일한 페이로드 인스턴스나 헤더 값에 액세스할 수 있으며, 이런 인스턴스 자체를 변경 불가능하게immutable 만들지에 대한 결정은 사용자에게 남겨져있다는 점이다. 즉, Message
인스턴스의 역할contract은 수정이 불가능한 Collection
인터페이스와 유사하며, MessageHeaders
맵을 보면 조금 더 이해가 될 거다. MessageHeaders
클래스가 java.util.Map
을 구현하고 있더라도, MessageHeaders
인스턴스에서 put
연산(또는 ‘remove’나 ‘clear’)을 호출하려고 하면 UnsupportedOperationException
이 발생한다.
Map을 생성하고 값을 채워서 GenericMessage의 생성자에 넘기는 방법도 있지만, Spring Integration은 훨씬 더 간편하게 메시지를 구성할 수 있는 MessageBuilder
를 제공한다. MessageBuilder
는 기존 Message
나 페이로드 Object
를 이용해 Message
인스턴스를 생성할 수 있는 두 가지 팩토리 메소드를 제공한다. 기존에 가지고 있는 Message
로부터 빌드할 때는 (fromMessage
) 아래 예제에서 알 수 있듯이 Message
에 있는 헤더와 페이로드를 새 Message
로 복사한다:
Message<String> message1 = MessageBuilder.withPayload("test")
.setHeader("foo", "bar")
.build();
Message<String> message2 = MessageBuilder.fromMessage(message1).build();
assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));
새로운 페이로드로 Message
를 생성해야 하지만 헤더는 기존 Message
에서 복사해오고 싶다면, 아래 보이는 ‘copy’ 메소드 중 하나를 이용하면 된다:
Message<String> message3 = MessageBuilder.withPayload("test3")
.copyHeaders(message1.getHeaders())
.build();
Message<String> message4 = MessageBuilder.withPayload("test4")
.setHeader("foo", 123)
.copyHeadersIfAbsent(message1.getHeaders())
.build();
assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));
copyHeadersIfAbsent
메소드는 기존에 있는 값들은 덮어쓰지 않는다는 점에 주의하자. 위 예제에선 setHeader
로 사용자 정의 헤더를 설정하는 방법도 확인할 수 있다. 마지막으로, 원하는 헤더를 직접 설정하는 메소드 외에도, 몇몇 헤더들은 전용으로 사용할 수 있는 set
메소드가 미리 정의돼 있다 (이 헤더들의 이름 역시 MessageHeaders
에 상수로 선언돼있다).
MessageBuilder
를 이용하면 다음과 같이 메시지들의 우선 순위도 설정할 수 있다:
Message<Integer> importantMessage = MessageBuilder.withPayload(99)
.setPriority(5)
.build();
assertEquals(5, importantMessage.getHeaders().getPriority());
Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
.build();
assertEquals(2, lessImportantMessage.getHeaders().getPriority());
priority
헤더는 PriorityChannel
을 사용할 때만 고려한다 (다음 챕터에서 설명한다). 헤더의 값은 java.lang.Integer
로 정의한다.
Next :Message Routing
메시지 router, filter, splitter, aggregator 소개
전체 목차는 여기에 있습니다.