스프링 부트 공식 레퍼런스를 한글로 번역한 문서입니다.
전체 목차는 여기에 있습니다.
목차
7.14. Messaging
스프링 프레임워크는 JmsTemplate
을 통한 간단한 JMS API 사용부터 메세지를 비동기로 수신하는 완전한 인프라까지, 메세지 처리 시스템 통합을 위한 광범위한 기능을 지원한다. 스프링 AMQP는 AMQPAdvanced Message Queuing Protocol를 위한 유사한 기능 셋을 제공한다. 스프링 부트는 RabbitTemplate
과 RabbitMQ
를 위한 자동 설정 옵션도 제공한다. 스프링 WebSocket은 본래 STOMP 메세지 처리를 지원하고 있으며, 스프링 부트는 스타터와 약간의 자동 설정으로 STOMP 메세지 처리를 지원한다. 스프링 부트에선 아파치 카프카도 지원한다.
7.14.1. JMS
javax.jms.ConnectionFactory
인터페이스는 JMS 브로커와 상호작용할 때 필요한 javax.jms.Connection
생성을 위한 표준 메소드를 제공한다. 스프링에선 JMS를 다루려면 ConnectionFactory
가 필요하지만, 보통은 직접 사용할 필요는 없으며, 그대신 더 높은 수준으로 추상화해 놓은 메세지 처리 클래스들을 이용하면 된다. (자세한 내용은 스프링 프레임워크 레퍼런스 문서의 관련 섹션을 참고해라.) 이와 더불어 스프링 부트는 메세지를 주고 받을 때 필요한 인프라를 자동으로 설정해준다.
ActiveMQ Support
클래스패스에 ActiveMQ가 있으면 스프링 부트는 ConnectionFactory
도 설정할 수 있다. 클래스패스에 브로커가 있으면 임베디드 브로커를 자동으로 시작하고 구성한다 (설정에 브로커 URL을 지정하지 않으면).
spring-boot-starter-activemq
를 사용하면 JMS 통합을 위한 스프링 인프라와 함께 ActiveMQ 인스턴스를 연결하거나 임베딩시키는데 필요한 의존성이 추가된다.
ActiveMQ 설정은 spring.activemq.*
에 있는 외부 설정 프로퍼티로 제어한다. 예를 들어 application.properties
에서 아래 설정을 추가할 수 있다:
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
spring:
activemq:
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
기본적으로 CachingConnectionFactory
가 spring.jms.*
로 제어하는 적당한 설정을 가지고 네이티브 ConnectionFactory
를 래핑한다:
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
네이티브 풀링을 사용하고 싶다면 다음 예제처럼 org.messaginghub:pooled-jms
의존성을 추가하고 그에 따라 JmsPoolConnectionFactory
를 설정하면 된다:
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring:
activemq:
pool:
enabled: true
max-connections: 50
지원하는 다른 옵션들은
ActiveMQProperties
를 참고해라. 다른 것들을 좀 더 커스텀하고 싶으면,ActiveMQConnectionFactoryCustomizer
를 구현하는 빈을 원하는 만큼 추가해도 된다.
기본적으로 ActiveMQ는 destination이 아직 없다면 하나를 생성해서, 전달받은 이름으로 destination을 리졸브한다.
ActiveMQ Artemis Support
스프링 부트는 클래스패스에서 ActiveMQ Artemis를 사용할 수 있음을 감지하면 ConnectionFactory
를 자동으로 설정할 수 있다. 클래스패스에 브로커가 있으면 임베디드 브로커를 자동으로 시작하고 구성한다 (mode 프로퍼티를 명시하지 않았다면). 지원하는 모드는 embedded
와 (임베디드 브로커가 필요하다는 걸 명시해서, 클래스패스에 브로커가 없으면 오류가 발생해야 한다는 걸 알린다) native
다 (netty
전송 프로토콜을 통해 브로커에 연결한다). 후자를 설정하면 스프링 부트는 디폴트 설정을 가지고, 로컬 머신에서 실행 중는 브로커에 연결하는 ConnectionFactory
를 설정한다.
spring-boot-starter-artemis
를 사용하면 JMS 통합을 위한 스프링 인프라와 함께 기존 ActiveMQ Artemis 인스턴스 연결에 필요한 의존성이 추가된다. 애플리케이션에org.apache.activemq:artemis-jms-server
를 추가하면 embedded 모드를 사용할 수 있다.
ActiveMQ Artemis 설정은 spring.artemis.*
에 있는 외부 설정 프로퍼티로 제어한다. 예를 들어 application.properties
에서 아래 설정을 추가할 수 있다:
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
artemis:
mode: native
broker-url: "tcp://192.168.1.210:9876"
user: "admin"
password: "secret"
브로커를 임베딩시킬 땐, 원한다면 persistence를 활성화하고 사용 가능하게 만들어줄 destination을 나열할 수 있다. destination은 디폴트 옵션으로 만들 땐 리스트를 콤마로 구분해서 지정하면 되고, 큐나 토픽 설정을 직접 만지고 싶을 땐 org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration
이나 org.apache.activemq.artemis.jms.server.config.TopicConfiguration
타입 빈을 정의하면 된다.
기본적으로 CachingConnectionFactory
가 spring.jms.*
로 제어하는 적당한 설정을 가지고 네이티브 ConnectionFactory
를 래핑한다:
spring.jms.cache.session-cache-size=5
spring:
jms:
cache:
session-cache-size: 5
네이티브 풀링을 사용하고 싶다면 다음 예제처럼 org.messaginghub:pooled-jms
의존성을 추가하고 그에 따라 JmsPoolConnectionFactory
를 설정하면 된다:
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
artemis:
pool:
enabled: true
max-connections: 50
지원하는 다른 옵션들은 ArtemisProperties
를 참고해라.
JNDI lookup은 관여하지 않으며, destination들은 Artemis 설정에 있는 name
속성이나 설정으로 제공한 이름을 통해 리졸브된다.
Using a JNDI ConnectionFactory
애플리케이션을 애플리케이션 서버에서 실행한다면 스프링 부트는 JNDI를 사용해 JMS ConnectionFactory
를 배치한다. 기본적으로 java:/JmsXA
와 java:/XAConnectionFactory
위치를 확인한다. 다른 위치를 지정하고 싶으면 다음 예제처럼 spring.jms.jndi-name
프로퍼티를 사용하면 된다:
spring.jms.jndi-name=java:/MyConnectionFactory
spring:
jms:
jndi-name: "java:/MyConnectionFactory"
Sending a Message
스프링의 JmsTemplate
은 자동으로 설정되며, 다음 예제처럼 원하는 빈에 직접 autowire할 수 있다:
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
// ...
}
JmsMessagingTemplate
도 비슷한 방법으로 주입할 수 있다.DestinationResolver
나MessageConverter
빈을 정의하면 자동 설정된JmsTemplate
에 자동으로 연결된다.
Receiving a Message
JMS 인프라가 있을 땐 원하는 빈에 @JmsListener
어노테이션을 달아 리스너 엔드포인트를 생성할 수 있다. JmsListenerContainerFactory
를 정의하지 않았다면 디폴트 팩토리를 자동으로 설정한다. DestinationResolver
나, MessageConverter
, javax.jms.ExceptionListener
빈을 정의하면 이 디폴트 팩토리와 자동으로 연결된다.
기본적으로 디폴트 팩토리는 트랜잭션을 사용한다. JtaTransactionManager
를 가진 인프라에서 실행하면 기본적으로 JtaTransactionManager
가 리스너 컨테이너와 연결된다. 그 외엔 sessionTransacted
플래그를 활성화한다. 후자에선 리스너 메소드(또는 그 delegate)에 @Transactional
을 추가하면 수신한 메세지를 처리할 때 로컬 데이터 저장소 트랜잭션을 연계시킬 수 있다. 이렇게 하면 로컬 트랜잭션이 완료되고나면 수신한 메세지를 승인acknowledged해준다. 동일한 JMS 세션에서 응답 메세지를 전송할 때도 마찬가지다.
아래 컴포넌트는 someQueue
destination에 리스너 엔드포인트를 생성한다:
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
자세한 내용은
@EnableJms
Javadoc을 참고해라.
JmsListenerContainerFactory
인스턴스를 더 많이 만들고 싶거나 기본값을 재정의하고 싶을 때는, 스프링 부트의 DefaultJmsListenerContainerFactoryConfigurer
를 활용하면 된다. 여기서는 자동 설정되는 팩토리와 동일한 설정으로 DefaultJmsListenerContainerFactory
를 초기화할 수 있다.
예를 들어 다음 예제는 다른 팩토리 하나를 MessageConverter
를 따로 지정해서 정의하고 있다:
@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
이제 @JmsListener
를 선언한 메소드라면 모두 이 팩토리를 사용할 수 있다:
@Component
public class MyBean {
@JmsListener(destination = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
7.14.2. AMQP
AMQPAdvanced Message Queuing Protocol는 메세지 지향 미들웨어를 위한 플랫폼 중립적인 와이어 레벨wire-level 프로토콜이다. 스프링 AMQP 프로젝트는 AMQP 기반 메세지 처리 솔루션 개발에 핵심 스프링 개념을 적용한다. 스프링 부트를 활용하면 spring-boot-starter-amqp
“스타터”를 활용하는 등, RabbitMQ를 통해 AMQP 작업을 좀 더 수월하게 진행할 수 있다.
RabbitMQ support
RabbitMQ는 AMQP 프로토콜 기반 메세지 브로커로, 가볍고, 안정적이며, 확장 가능하고, 높은 이식성을 자랑하는 메세지 브로커다. 스프링은 RabbitMQ
를 사용해서 AMQP 프로토콜로 통신한다.
RabbitMQ 설정은 spring.rabbitmq.*
에 있는 외부 설정 프로퍼티로 제어한다. 예를 들어 application.properties
에서 아래 설정을 추가할 수 있다:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
아니면 같은 커넥션 설정을 addresses
속성으로도 설정할 수 있다:
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
이렇게 addresses를 지정하면
host
와port
프로퍼티는 무시한다. 주소에amqps
프로토콜을 사용하면 SSL 지원을 자동으로 활성화한다.
지원하는 프로퍼티 기반 설정을 더 알아보려면 RabbitProperties
를 참고해라. 스프링 AMQP에서 사용하는 RabbitMQ ConnectionFactory
내부에 있는 저수준 설정을 변경하려면 ConnectionFactoryCustomizer
빈을 정의해라.
컨텍스트에 ConnectionNameStrategy
빈이 있으면, 자동 설정된 CachingConnectionFactory
로 생성한 커넥션 이름은 ConnectionNameStrategy
빈을 사용해서 지정한다.
자세한 내용은 RabbitMQ가 사용하는 프로토콜, AMQP 이해하기를 참고해라.
Sending a Message
스프링의 AmqpTemplate
과 AmqpAdmin
은 자동으로 설정되며, 다음 예제처럼 자체 빈에 직접 autowire할 수 있다:
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
}
RabbitMessagingTemplate
도 비슷한 방법으로 주입할 수 있다.MessageConverter
빈을 정의하면 자동 설정된AmqpTemplate
에 자동으로 연결된다.
필요한 경우엔 빈으로 정의한 모든 org.springframework.amqp.core.Queue
를 자동으로 사용해서 RabbitMQ 인스턴스에 상응하는 큐를 선언한다.
작업을 재시도하려면 AmqpTemplate
에 재시도를 활성화해주면 된다 (브로커 커넥션을 유실됐을 때 등):
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
재시도는 기본적으로 비활성화돼 있다. RetryTemplate
은 RabbitRetryTemplateCustomizer
빈을 선언하면 코드로도 커스텀할 수 있다.
RabbitTemplate
인스턴스를 더 많이 만들고 싶거나 기본값을 재정의하고 싶을 때는, 스프링 부트의 RabbitTemplateConfigurer
빈을 활용하면 된다. 여기서는 자동 설정에서 사용하는 팩토리와 동일한 설정으로 RabbitTemplate
을 초기화할 수 있다.
Receiving a Message
Rabbit 인프라가 있을 땐 원하는 빈에 @RabbitListener
어노테이션을 달아 리스너 엔드포인트를 생성할 수 있다. RabbitListenerContainerFactory
를 정의하지 않았다면 디폴트 SimpleRabbitListenerContainerFactory
를 자동으로 설정하며, spring.rabbitmq.listener.type
프로퍼티를 사용해서 direct 컨테이너로 전환할 수 있다. MessageConverter
나 MessageRecoverer
빈을 정의하면 이 디폴트 팩토리와 자동으로 연결된다.
아래 샘플 컴포넌트는 someQueue
큐에 리스너 엔드포인트를 생성한다:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
자세한 내용은
@EnableRabbit
Javadoc을 참고해라.
RabbitListenerContainerFactory
인스턴스를 더 많이 만들고 싶거나 기본값을 재정의하고 싶을 때는, 스프링 부트의 SimpleRabbitListenerContainerFactoryConfigurer
와 DirectRabbitListenerContainerFactoryConfigurer
를 활용하면 된다. 여기서는 자동 설정에서 사용하는 팩토리와 동일한 설정으로 SimpleRabbitListenerContainerFactory
와 DirectRabbitListenerContainerFactory
를 초기화할 수 있다.
선택한 컨테이너 타입은 중요하지 않다. 이 두 빈은 자동 설정으로 정의된다.
예를 들어 아래 설정 클래스에선 다른 팩토리 하나를 MessageConverter
를 따로 지정해서 정의하고 있다:
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
이제 @RabbitListener
를 선언한 메소드라면 모두 이 팩토리를 사용할 수 있다:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
재시도를 활성화하면 리스너에서 예외를 던지는 상황에 대처할 수 있다. 재시도엔 기본적으로 RejectAndDontRequeueRecoverer
를 사용하지만, 직접 자체 MessageRecoverer
를 정의할 수도 있다. 재시도 횟수를 모두 소진하면 메세지를 거부하고, 브로커 설정에 따라 날려버리거나drop DLXdead-letter exchange로 라우팅한다. 재시도는 기본적으로 비활성화돼 있다. RabbitRetryTemplateCustomizer
빈을 선언하면 코드로도 RetryTemplate
을 커스텀할 수 있다.
재시도를 비활성한 상태에서 리스너에서 예외가 발생하면 기본적으로 무한정 재전송한다. 이 동작은 두 가지 방법으로 수정할 수 있다.
defaultRequeueRejected
프로퍼티를false
로 설정해서 재전송을 시도하지 않도록 만들거나,AmqpRejectAndDontRequeueException
을 던져 메세지를 거절해야 한다는 신호를 보낼 수 있다. 후자는 재시도를 활성화했을 때 최대로 전송을 시도해볼 횟수에 도달하면 사용하는 메커니즘이다.
7.14.3. Apache Kafka Support
Apache Kafka는 spring-kafka
프로젝트의 자동 설정으로 지원한다.
카프카 설정은 spring.kafka.*
에 있는 외부 설정 프로퍼티로 제어한다. 예를 들어 application.properties
에서 아래 설정을 추가할 수 있다:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
kafka:
bootstrap-servers: "localhost:9092"
consumer:
group-id: "myGroup"
기동 시에 토픽을 생성하려면
NewTopic
타입 빈을 추가해라. 토픽이 이미 있을 땐 이 빈은 무시된다.
지원하는 다른 옵션들은 KafkaProperties
를 참고해라.
Sending a Message
스프링의 KafkaTemplate
은 자동으로 설정되며, 다음 예제처럼 원하는 빈에 직접 autowire할 수 있다:
@Component
public class MyBean {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
}
spring.kafka.producer.transaction-id-prefix
프로퍼티를 정의하면KafkaTransactionManager
를 자동으로 설정한다. 더불어RecordMessageConverter
빈을 정의하면 자동 설정된KafkaTemplate
에 자동으로 연결된다.
Receiving a Message
Apache Kafka 인프라가 있을 땐 원하는 빈에 @KafkaListener
어노테이션을 달아 리스너 엔드포인트를 생성할 수 있다. KafkaListenerContainerFactory
를 정의하지 않았다면, spring.kafka.listener.*
로 정의한 키를 통해 디폴트 팩토리를 자동으로 설정한다.
아래 컴포넌트는 someTopic
토픽에 리스너 엔드포인트를 생성한다:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
KafkaTransactionManager
빈을 정의하면 자동으로 컨테이너 팩토리에 연결된다. 마찬가지로 RecordFilterStrategy
, ErrorHandler
, AfterRollbackProcessor
, ConsumerAwareRebalanceListener
빈을 정의하면 자동으로 디폴트 팩토리에 연결된다.
리스너 타입에 따라 RecordMessageConverter
나 BatchMessageConverter
빈이 디폴트 팩토리에 연결된다. 배치 리스너에서 RecordMessageConverter
빈만 존재한다면 BatchMessageConverter
로 래핑한다.
커스텀
ChainedKafkaTransactionManager
는 보통 자동 설정된KafkaTransactionManager
빈을 참조하므로, 반드시@Primary
로 마킹해야 한다.
Kafka Streams
Spring for Apache Kafka는 StreamsBuilder
객체를 생성하고, 스트림의 라이프사이클을 관리할 수 있는 팩토리 빈을 제공한다. 클래스패스에 kafka-streams
가 있을 때 @EnableKafkaStreams
어노테이션으로 Kafka Streams를 활성화하면 스프링 부트는 필요한 KafkaStreamsConfiguration
빈을 자동 설정한다.
Kafka Streams를 활성화한다는 건 애플리케이션 id와 부트스트랩 서버를 설정해야 한다는 뜻이다. 전자는 spring.kafka.streams.application-id
로 설정할 수 있으며, 설정하지 않았을 때 기본값은 spring.application.name
이다. 후자는 전역으로 설정할 수도 있고, 스트림즈에서만 사용할 서버를 재정의할 수도 있다.
몇 가지 프로퍼티는 전용 프로퍼티를 통해 추가할 수 있다. 그외 다른 카프카 프로퍼티는 spring.kafka.streams.properties
네임스페이스를 통해 설정할 수 있다. 자세한 내용은 그밖의 카프카 프로퍼티들을 참고해라.
팩토리 빈을 사용하려면 다음 예제처럼 원하는 @Bean
에 StreamsBuilder
를 넘겨라:
@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
return new KeyValue<>(key, value.toUpperCase());
}
}
팩토리 빈으로 StreamBuilder
객체를 만들게 되면 기본적으로 StreamBuilder
로 관리하는 스트림을 자동으로 시작한다. 이 동작은 spring.kafka.streams.auto-startup
프로퍼티로 커스텀할 수 있다.
Additional Kafka Properties
자동 설정에서 지원하는 프로퍼티는 공통 애플리케이션 프로퍼티에 나와있다. 이런 프로퍼티(하이픈으로 연결하거나 카멜 케이스 사용)는 대부분 점(.
)으로 구분하는 아파치 카프카 프로퍼티에 직접 매핑된다. 자세한 내용은 아파치 카프카 문서를 참고해라.
이 프로퍼티들 중 처음 몇 가지는 모든 컴포넌트(producers, consumers, admins, streams)에 적용되지만, 다른 값을 사용하고 싶을 땐 컴포넌트 레벨에 지정해도 된다. 아파치 카프카에선 프로퍼티 중요도를 HIGH, MEDIUM, LOW로 표현한다. 스프링 부트 자동 설정은 중요도가 HIGH인 프로퍼티는 모두 지원하고 있으며, MEDIUM/LOW 프로퍼티는 일부만, 기본값이 없는 프로퍼티는 모두 지원한다.
KafkaProperties
클래스로 직접 지정할 수 있는 프로퍼티는 카프카에서 지원하는 프로퍼티의 일부만이다. 프로듀서나 컨슈머에 직접 지원하지 않는 다른 프로퍼티를 설정하려면 아래와 같이 프로퍼티를 지정해라:
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
kafka:
properties:
"[prop.one]": "first"
admin:
properties:
"[prop.two]": "second"
consumer:
properties:
"[prop.three]": "third"
producer:
properties:
"[prop.four]": "fourth"
streams:
properties:
"[prop.five]": "fifth"
여기에선 공통 카프카 프로퍼티 prop.one
을 first
로 설정하고 있으며 (producers, consumers, admins에 적용된다), 어드민 프로퍼티 prop.two
는 second
로, 컨슈머 프로퍼티 prop.three
는 third
로, 프로듀서 프로퍼티 prop.four
는 fourth
로, 스트림즈 프로퍼티 prop.five
는 fifth
로 설정한다.
다음과 같이 스프링 카프카 JsonDeserializer
도 설정할 수 있다:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
kafka:
consumer:
value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
properties:
"[spring.json.value.default.type]": "com.example.Invoice"
"[spring.json.trusted.packages]": "com.example.main,com.example.another"
JsonSerializer
에서 헤더에 타입 정보를 전송하는 기본 동작도 비슷한 방법으로 비활성화할 수 있다:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
kafka:
producer:
value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
properties:
"[spring.json.add.type.headers]": false
이렇게 프로퍼티를 설정하면 스프링 부트가 명시적으로 지원하는 설정을 모두 재정의하게 된다.
Testing with Embedded Kafka
Spring for Apache Kafka를 사용하면 임베디드 아파치 카프카 브로커를 사용해서 간편하게 프로젝트를 테스트할 수 있다. 이 기능을 사용하려면 테스트 클래스에 spring-kafka-test
모듈에 있는 @EmbeddedKafka
어노테이션을 추가해라. 자세한 내용은 Spring for Apache Kafka 레퍼런스 매뉴얼을 참고해라.
위에서 말한 임베디드 아파치 카프카 브로커를 스프링 부트 자동 설정과 함께 사용하려면, 임베디드 브로커 주소에 사용하는 시스템 프로퍼티를 (EmbeddedKafkaBroker
에서 채우는 값) 스프링 부트의 아파치 카프카 설정 프로퍼티에 다시 한 번 매핑해줘야 한다. 여기에는 여러 가지 방법이 있다:
- 테스트 클래스에 시스템 프로퍼티를 제공해서 임베디드 브로커 주소를
spring.kafka.bootstrap-servers
에 매핑한다:
static {
System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
@EmbeddedKafka
어노테이션에 프로퍼티명을 설정한다:
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {
// ...
}
- 설정 프로퍼티에 플레이스홀더를 사용한다:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
kafka:
bootstrap-servers: "${spring.embedded.kafka.brokers}"
Next :Calling REST Services with RestTemplate
스프링 부트로 RestTemplate 자동 설정하고 커스텀하기
전체 목차는 여기에 있습니다.