토리맘의 한글라이즈 프로젝트 logo 토리맘의 한글라이즈 프로젝트

스프링 부트 공식 레퍼런스를 한글로 번역한 문서입니다.

전체 목차는 여기에 있습니다.

목차


7.14. Messaging

스프링 프레임워크는 JmsTemplate을 통한 간단한 JMS API 사용부터 메세지를 비동기로 수신하는 완전한 인프라까지, 메세지 처리 시스템 통합을 위한 광범위한 기능을 지원한다. 스프링 AMQP는 AMQPAdvanced Message Queuing Protocol를 위한 유사한 기능 셋을 제공한다. 스프링 부트는 RabbitTemplateRabbitMQ를 위한 자동 설정 옵션도 제공한다. 스프링 WebSocket은 본래 STOMP 메세지 처리를 지원하고 있으며, 스프링 부트는 스타터와 약간의 자동 설정으로 STOMP 메세지 처리를 지원한다. 스프링 부트에선 아파치 카프카도 지원한다.

7.14.1. JMS

javax.jms.ConnectionFactory 인터페이스는 JMS 브로커와 상호작용할 때 필요한 javax.jms.Connection 생성을 위한 표준 메소드를 제공한다. 스프링에선 JMS를 다루려면 ConnectionFactory가 필요하지만, 보통은 직접 사용할 필요는 없으며, 그대신 더 높은 수준으로 추상화해 놓은 메세지 처리 클래스들을 이용하면 된다. (자세한 내용은 스프링 프레임워크 레퍼런스 문서의 관련 섹션을 참고해라.) 이와 더불어 스프링 부트는 메세지를 주고 받을 때 필요한 인프라를 자동으로 설정해준다.

ActiveMQ Support

클래스패스에 ActiveMQ가 있으면 스프링 부트는 ConnectionFactory도 설정할 수 있다. 클래스패스에 브로커가 있으면 임베디드 브로커를 자동으로 시작하고 구성한다 (설정에 브로커 URL을 지정하지 않으면).

spring-boot-starter-activemq를 사용하면 JMS 통합을 위한 스프링 인프라와 함께 ActiveMQ 인스턴스를 연결하거나 임베딩시키는데 필요한 의존성이 추가된다.

ActiveMQ 설정은 spring.activemq.*에 있는 외부 설정 프로퍼티로 제어한다. 예를 들어 application.properties에서 아래 설정을 추가할 수 있다:

properties yaml
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
spring:
  activemq:
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

기본적으로 CachingConnectionFactoryspring.jms.*로 제어하는 적당한 설정을 가지고 네이티브 ConnectionFactory를 래핑한다:

properties yaml
spring.jms.cache.session-cache-size=5
spring:
  jms:
    cache:
      session-cache-size: 5

네이티브 풀링을 사용하고 싶다면 다음 예제처럼 org.messaginghub:pooled-jms 의존성을 추가하고 그에 따라 JmsPoolConnectionFactory를 설정하면 된다:

properties yaml
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
spring:
  activemq:
    pool:
      enabled: true
      max-connections: 50

지원하는 다른 옵션들은 ActiveMQProperties를 참고해라. 다른 것들을 좀 더 커스텀하고 싶으면, ActiveMQConnectionFactoryCustomizer를 구현하는 빈을 원하는 만큼 추가해도 된다.

기본적으로 ActiveMQ는 destination이 아직 없다면 하나를 생성해서, 전달받은 이름으로 destination을 리졸브한다.

ActiveMQ Artemis Support

스프링 부트는 클래스패스에서 ActiveMQ Artemis를 사용할 수 있음을 감지하면 ConnectionFactory를 자동으로 설정할 수 있다. 클래스패스에 브로커가 있으면 임베디드 브로커를 자동으로 시작하고 구성한다 (mode 프로퍼티를 명시하지 않았다면). 지원하는 모드는 embedded와 (임베디드 브로커가 필요하다는 걸 명시해서, 클래스패스에 브로커가 없으면 오류가 발생해야 한다는 걸 알린다) native다 (netty 전송 프로토콜을 통해 브로커에 연결한다). 후자를 설정하면 스프링 부트는 디폴트 설정을 가지고, 로컬 머신에서 실행 중는 브로커에 연결하는 ConnectionFactory를 설정한다.

spring-boot-starter-artemis를 사용하면 JMS 통합을 위한 스프링 인프라와 함께 기존 ActiveMQ Artemis 인스턴스 연결에 필요한 의존성이 추가된다. 애플리케이션에 org.apache.activemq:artemis-jms-server를 추가하면 embedded 모드를 사용할 수 있다.

ActiveMQ Artemis 설정은 spring.artemis.*에 있는 외부 설정 프로퍼티로 제어한다. 예를 들어 application.properties에서 아래 설정을 추가할 수 있다:

properties yaml
spring.artemis.mode=native
spring.artemis.broker-url=tcp://192.168.1.210:9876
spring.artemis.user=admin
spring.artemis.password=secret
spring:
  artemis:
    mode: native
    broker-url: "tcp://192.168.1.210:9876"
    user: "admin"
    password: "secret"

브로커를 임베딩시킬 땐, 원한다면 persistence를 활성화하고 사용 가능하게 만들어줄 destination을 나열할 수 있다. destination은 디폴트 옵션으로 만들 땐 리스트를 콤마로 구분해서 지정하면 되고, 큐나 토픽 설정을 직접 만지고 싶을 땐 org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration이나 org.apache.activemq.artemis.jms.server.config.TopicConfiguration 타입 빈을 정의하면 된다.

기본적으로 CachingConnectionFactoryspring.jms.*로 제어하는 적당한 설정을 가지고 네이티브 ConnectionFactory를 래핑한다:

properties yaml
spring.jms.cache.session-cache-size=5
spring:
  jms:
    cache:
      session-cache-size: 5

네이티브 풀링을 사용하고 싶다면 다음 예제처럼 org.messaginghub:pooled-jms 의존성을 추가하고 그에 따라 JmsPoolConnectionFactory를 설정하면 된다:

properties yaml
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
spring:
  artemis:
    pool:
      enabled: true
      max-connections: 50

지원하는 다른 옵션들은 ArtemisProperties를 참고해라.

JNDI lookup은 관여하지 않으며, destination들은 Artemis 설정에 있는 name 속성이나 설정으로 제공한 이름을 통해 리졸브된다.

Using a JNDI ConnectionFactory

애플리케이션을 애플리케이션 서버에서 실행한다면 스프링 부트는 JNDI를 사용해 JMS ConnectionFactory를 배치한다. 기본적으로 java:/JmsXAjava:/XAConnectionFactory 위치를 확인한다. 다른 위치를 지정하고 싶으면 다음 예제처럼 spring.jms.jndi-name 프로퍼티를 사용하면 된다:

properties yaml
spring.jms.jndi-name=java:/MyConnectionFactory
spring:
  jms:
    jndi-name: "java:/MyConnectionFactory"

Sending a Message

스프링의 JmsTemplate은 자동으로 설정되며, 다음 예제처럼 원하는 빈에 직접 autowire할 수 있다:

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

}

JmsMessagingTemplate도 비슷한 방법으로 주입할 수 있다. DestinationResolverMessageConverter 빈을 정의하면 자동 설정된 JmsTemplate에 자동으로 연결된다.

Receiving a Message

JMS 인프라가 있을 땐 원하는 빈에 @JmsListener 어노테이션을 달아 리스너 엔드포인트를 생성할 수 있다. JmsListenerContainerFactory를 정의하지 않았다면 디폴트 팩토리를 자동으로 설정한다. DestinationResolver나, MessageConverter, javax.jms.ExceptionListener 빈을 정의하면 이 디폴트 팩토리와 자동으로 연결된다.

기본적으로 디폴트 팩토리는 트랜잭션을 사용한다. JtaTransactionManager를 가진 인프라에서 실행하면 기본적으로 JtaTransactionManager가 리스너 컨테이너와 연결된다. 그 외엔 sessionTransacted 플래그를 활성화한다. 후자에선 리스너 메소드(또는 그 delegate)에 @Transactional을 추가하면 수신한 메세지를 처리할 때 로컬 데이터 저장소 트랜잭션을 연계시킬 수 있다. 이렇게 하면 로컬 트랜잭션이 완료되고나면 수신한 메세지를 승인acknowledged해준다. 동일한 JMS 세션에서 응답 메세지를 전송할 때도 마찬가지다.

아래 컴포넌트는 someQueue destination에 리스너 엔드포인트를 생성한다:

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}

자세한 내용은 @EnableJms Javadoc을 참고해라.

JmsListenerContainerFactory 인스턴스를 더 많이 만들고 싶거나 기본값을 재정의하고 싶을 때는, 스프링 부트의 DefaultJmsListenerContainerFactoryConfigurer를 활용하면 된다. 여기서는 자동 설정되는 팩토리와 동일한 설정으로 DefaultJmsListenerContainerFactory를 초기화할 수 있다.

예를 들어 다음 예제는 다른 팩토리 하나를 MessageConverter를 따로 지정해서 정의하고 있다:

@Configuration(proxyBeanMethods = false)
public class MyJmsConfiguration {

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}

이제 @JmsListener를 선언한 메소드라면 모두 이 팩토리를 사용할 수 있다:

@Component
public class MyBean {

    @JmsListener(destination = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}

7.14.2. AMQP

AMQPAdvanced Message Queuing Protocol는 메세지 지향 미들웨어를 위한 플랫폼 중립적인 와이어 레벨wire-level 프로토콜이다. 스프링 AMQP 프로젝트는 AMQP 기반 메세지 처리 솔루션 개발에 핵심 스프링 개념을 적용한다. 스프링 부트를 활용하면 spring-boot-starter-amqp “스타터”를 활용하는 등, RabbitMQ를 통해 AMQP 작업을 좀 더 수월하게 진행할 수 있다.

RabbitMQ support

RabbitMQ는 AMQP 프로토콜 기반 메세지 브로커로, 가볍고, 안정적이며, 확장 가능하고, 높은 이식성을 자랑하는 메세지 브로커다. 스프링은 RabbitMQ를 사용해서 AMQP 프로토콜로 통신한다.

RabbitMQ 설정은 spring.rabbitmq.*에 있는 외부 설정 프로퍼티로 제어한다. 예를 들어 application.properties에서 아래 설정을 추가할 수 있다:

properties yaml
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

아니면 같은 커넥션 설정을 addresses 속성으로도 설정할 수 있다:

properties yaml
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"

이렇게 addresses를 지정하면 hostport 프로퍼티는 무시한다. 주소에 amqps 프로토콜을 사용하면 SSL 지원을 자동으로 활성화한다.

지원하는 프로퍼티 기반 설정을 더 알아보려면 RabbitProperties를 참고해라. 스프링 AMQP에서 사용하는 RabbitMQ ConnectionFactory 내부에 있는 저수준 설정을 변경하려면 ConnectionFactoryCustomizer 빈을 정의해라.

컨텍스트에 ConnectionNameStrategy 빈이 있으면, 자동 설정된 CachingConnectionFactory로 생성한 커넥션 이름은 ConnectionNameStrategy 빈을 사용해서 지정한다.

자세한 내용은 RabbitMQ가 사용하는 프로토콜, AMQP 이해하기를 참고해라.

Sending a Message

스프링의 AmqpTemplateAmqpAdmin은 자동으로 설정되며, 다음 예제처럼 자체 빈에 직접 autowire할 수 있다:

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;

    private final AmqpTemplate amqpTemplate;

    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

}

RabbitMessagingTemplate도 비슷한 방법으로 주입할 수 있다. MessageConverter 빈을 정의하면 자동 설정된 AmqpTemplate에 자동으로 연결된다.

필요한 경우엔 빈으로 정의한 모든 org.springframework.amqp.core.Queue를 자동으로 사용해서 RabbitMQ 인스턴스에 상응하는 큐를 선언한다.

작업을 재시도하려면 AmqpTemplate에 재시도를 활성화해주면 된다 (브로커 커넥션을 유실됐을 때 등):

properties yaml
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

재시도는 기본적으로 비활성화돼 있다. RetryTemplateRabbitRetryTemplateCustomizer 빈을 선언하면 코드로도 커스텀할 수 있다.

RabbitTemplate 인스턴스를 더 많이 만들고 싶거나 기본값을 재정의하고 싶을 때는, 스프링 부트의 RabbitTemplateConfigurer 빈을 활용하면 된다. 여기서는 자동 설정에서 사용하는 팩토리와 동일한 설정으로 RabbitTemplate을 초기화할 수 있다.

Receiving a Message

Rabbit 인프라가 있을 땐 원하는 빈에 @RabbitListener 어노테이션을 달아 리스너 엔드포인트를 생성할 수 있다. RabbitListenerContainerFactory를 정의하지 않았다면 디폴트 SimpleRabbitListenerContainerFactory를 자동으로 설정하며, spring.rabbitmq.listener.type 프로퍼티를 사용해서 direct 컨테이너로 전환할 수 있다. MessageConverterMessageRecoverer 빈을 정의하면 이 디폴트 팩토리와 자동으로 연결된다.

아래 샘플 컴포넌트는 someQueue 큐에 리스너 엔드포인트를 생성한다:

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}

자세한 내용은 @EnableRabbit Javadoc을 참고해라.

RabbitListenerContainerFactory 인스턴스를 더 많이 만들고 싶거나 기본값을 재정의하고 싶을 때는, 스프링 부트의 SimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurer를 활용하면 된다. 여기서는 자동 설정에서 사용하는 팩토리와 동일한 설정으로 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory를 초기화할 수 있다.

선택한 컨테이너 타입은 중요하지 않다. 이 두 빈은 자동 설정으로 정의된다.

예를 들어 아래 설정 클래스에선 다른 팩토리 하나를 MessageConverter를 따로 지정해서 정의하고 있다:

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        ConnectionFactory connectionFactory = getCustomConnectionFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(new MyMessageConverter());
        return factory;
    }

    private ConnectionFactory getCustomConnectionFactory() {
        return ...
    }

}

이제 @RabbitListener를 선언한 메소드라면 모두 이 팩토리를 사용할 수 있다:

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory = "myFactory")
    public void processMessage(String content) {
        // ...
    }

}

재시도를 활성화하면 리스너에서 예외를 던지는 상황에 대처할 수 있다. 재시도엔 기본적으로 RejectAndDontRequeueRecoverer를 사용하지만, 직접 자체 MessageRecoverer를 정의할 수도 있다. 재시도 횟수를 모두 소진하면 메세지를 거부하고, 브로커 설정에 따라 날려버리거나drop DLXdead-letter exchange로 라우팅한다. 재시도는 기본적으로 비활성화돼 있다. RabbitRetryTemplateCustomizer 빈을 선언하면 코드로도 RetryTemplate을 커스텀할 수 있다.

재시도를 비활성한 상태에서 리스너에서 예외가 발생하면 기본적으로 무한정 재전송한다. 이 동작은 두 가지 방법으로 수정할 수 있다. defaultRequeueRejected 프로퍼티를 false로 설정해서 재전송을 시도하지 않도록 만들거나, AmqpRejectAndDontRequeueException을 던져 메세지를 거절해야 한다는 신호를 보낼 수 있다. 후자는 재시도를 활성화했을 때 최대로 전송을 시도해볼 횟수에 도달하면 사용하는 메커니즘이다.

7.14.3. Apache Kafka Support

Apache Kafkaspring-kafka 프로젝트의 자동 설정으로 지원한다.

카프카 설정은 spring.kafka.*에 있는 외부 설정 프로퍼티로 제어한다. 예를 들어 application.properties에서 아래 설정을 추가할 수 있다:

properties yaml
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup"

기동 시에 토픽을 생성하려면 NewTopic 타입 빈을 추가해라. 토픽이 이미 있을 땐 이 빈은 무시된다.

지원하는 다른 옵션들은 KafkaProperties를 참고해라.

Sending a Message

스프링의 KafkaTemplate은 자동으로 설정되며, 다음 예제처럼 원하는 빈에 직접 autowire할 수 있다:

@Component
public class MyBean {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public MyBean(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}

spring.kafka.producer.transaction-id-prefix 프로퍼티를 정의하면 KafkaTransactionManager를 자동으로 설정한다. 더불어 RecordMessageConverter 빈을 정의하면 자동 설정된 KafkaTemplate에 자동으로 연결된다.

Receiving a Message

Apache Kafka 인프라가 있을 땐 원하는 빈에 @KafkaListener 어노테이션을 달아 리스너 엔드포인트를 생성할 수 있다. KafkaListenerContainerFactory를 정의하지 않았다면, spring.kafka.listener.*로 정의한 키를 통해 디폴트 팩토리를 자동으로 설정한다.

아래 컴포넌트는 someTopic 토픽에 리스너 엔드포인트를 생성한다:

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}

KafkaTransactionManager 빈을 정의하면 자동으로 컨테이너 팩토리에 연결된다. 마찬가지로 RecordFilterStrategy, ErrorHandler, AfterRollbackProcessor, ConsumerAwareRebalanceListener 빈을 정의하면 자동으로 디폴트 팩토리에 연결된다.

리스너 타입에 따라 RecordMessageConverterBatchMessageConverter 빈이 디폴트 팩토리에 연결된다. 배치 리스너에서 RecordMessageConverter 빈만 존재한다면 BatchMessageConverter로 래핑한다.

커스텀 ChainedKafkaTransactionManager는 보통 자동 설정된 KafkaTransactionManager 빈을 참조하므로, 반드시 @Primary로 마킹해야 한다.

Kafka Streams

Spring for Apache Kafka는 StreamsBuilder 객체를 생성하고, 스트림의 라이프사이클을 관리할 수 있는 팩토리 빈을 제공한다. 클래스패스에 kafka-streams가 있을 때 @EnableKafkaStreams 어노테이션으로 Kafka Streams를 활성화하면 스프링 부트는 필요한 KafkaStreamsConfiguration 빈을 자동 설정한다.

Kafka Streams를 활성화한다는 건 애플리케이션 id와 부트스트랩 서버를 설정해야 한다는 뜻이다. 전자는 spring.kafka.streams.application-id로 설정할 수 있으며, 설정하지 않았을 때 기본값은 spring.application.name이다. 후자는 전역으로 설정할 수도 있고, 스트림즈에서만 사용할 서버를 재정의할 수도 있다.

몇 가지 프로퍼티는 전용 프로퍼티를 통해 추가할 수 있다. 그외 다른 카프카 프로퍼티는 spring.kafka.streams.properties 네임스페이스를 통해 설정할 수 있다. 자세한 내용은 그밖의 카프카 프로퍼티들을 참고해라.

팩토리 빈을 사용하려면 다음 예제처럼 원하는 @BeanStreamsBuilder를 넘겨라:

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public class MyKafkaStreamsConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map(this::uppercaseValue).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

    private KeyValue<Integer, String> uppercaseValue(Integer key, String value) {
        return new KeyValue<>(key, value.toUpperCase());
    }

}

팩토리 빈으로 StreamBuilder 객체를 만들게 되면 기본적으로 StreamBuilder로 관리하는 스트림을 자동으로 시작한다. 이 동작은 spring.kafka.streams.auto-startup 프로퍼티로 커스텀할 수 있다.

Additional Kafka Properties

자동 설정에서 지원하는 프로퍼티는 공통 애플리케이션 프로퍼티에 나와있다. 이런 프로퍼티(하이픈으로 연결하거나 카멜 케이스 사용)는 대부분 점(.)으로 구분하는 아파치 카프카 프로퍼티에 직접 매핑된다. 자세한 내용은 아파치 카프카 문서를 참고해라.

이 프로퍼티들 중 처음 몇 가지는 모든 컴포넌트(producers, consumers, admins, streams)에 적용되지만, 다른 값을 사용하고 싶을 땐 컴포넌트 레벨에 지정해도 된다. 아파치 카프카에선 프로퍼티 중요도를 HIGH, MEDIUM, LOW로 표현한다. 스프링 부트 자동 설정은 중요도가 HIGH인 프로퍼티는 모두 지원하고 있으며, MEDIUM/LOW 프로퍼티는 일부만, 기본값이 없는 프로퍼티는 모두 지원한다.

KafkaProperties 클래스로 직접 지정할 수 있는 프로퍼티는 카프카에서 지원하는 프로퍼티의 일부만이다. 프로듀서나 컨슈머에 직접 지원하지 않는 다른 프로퍼티를 설정하려면 아래와 같이 프로퍼티를 지정해라:

properties yaml
spring.kafka.properties[prop.one]=first
spring.kafka.admin.properties[prop.two]=second
spring.kafka.consumer.properties[prop.three]=third
spring.kafka.producer.properties[prop.four]=fourth
spring.kafka.streams.properties[prop.five]=fifth
spring:
  kafka:
    properties:
      "[prop.one]": "first"
    admin:
      properties:
        "[prop.two]": "second"
    consumer:
      properties:
        "[prop.three]": "third"
    producer:
      properties:
        "[prop.four]": "fourth"
    streams:
      properties:
        "[prop.five]": "fifth"

여기에선 공통 카프카 프로퍼티 prop.onefirst로 설정하고 있으며 (producers, consumers, admins에 적용된다), 어드민 프로퍼티 prop.twosecond로, 컨슈머 프로퍼티 prop.threethird로, 프로듀서 프로퍼티 prop.fourfourth로, 스트림즈 프로퍼티 prop.fivefifth로 설정한다.

다음과 같이 스프링 카프카 JsonDeserializer도 설정할 수 있다:

properties yaml
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=com.example.Invoice
spring.kafka.consumer.properties[spring.json.trusted.packages]=com.example.main,com.example.another
spring:
  kafka:
    consumer:
      value-deserializer: "org.springframework.kafka.support.serializer.JsonDeserializer"
      properties:
        "[spring.json.value.default.type]": "com.example.Invoice"
        "[spring.json.trusted.packages]": "com.example.main,com.example.another"

JsonSerializer에서 헤더에 타입 정보를 전송하는 기본 동작도 비슷한 방법으로 비활성화할 수 있다:

properties yaml
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties[spring.json.add.type.headers]=false
spring:
  kafka:
    producer:
      value-serializer: "org.springframework.kafka.support.serializer.JsonSerializer"
      properties:
        "[spring.json.add.type.headers]": false

이렇게 프로퍼티를 설정하면 스프링 부트가 명시적으로 지원하는 설정을 모두 재정의하게 된다.

Testing with Embedded Kafka

Spring for Apache Kafka를 사용하면 임베디드 아파치 카프카 브로커를 사용해서 간편하게 프로젝트를 테스트할 수 있다. 이 기능을 사용하려면 테스트 클래스에 spring-kafka-test 모듈에 있는 @EmbeddedKafka 어노테이션을 추가해라. 자세한 내용은 Spring for Apache Kafka 레퍼런스 매뉴얼을 참고해라.

위에서 말한 임베디드 아파치 카프카 브로커를 스프링 부트 자동 설정과 함께 사용하려면, 임베디드 브로커 주소에 사용하는 시스템 프로퍼티를 (EmbeddedKafkaBroker에서 채우는 값) 스프링 부트의 아파치 카프카 설정 프로퍼티에 다시 한 번 매핑해줘야 한다. 여기에는 여러 가지 방법이 있다:

static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
@SpringBootTest
@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class MyTest {

    // ...

}
properties yaml
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
spring:
  kafka:
    bootstrap-servers: "${spring.embedded.kafka.brokers}"

전체 목차는 여기에 있습니다.

<< >>

TOP