스프링 클라우드 데이터 플로우 공식 레퍼런스를 한글로 번역한 문서입니다.
전체 목차는 여기에 있습니다.
이 가이드에선 Spring Cloud Stream을 사용해 세 가지 스프링 부트 애플리케이션을 개발하고, 클라우드 파운드리, 쿠버네티스, 로컬 머신에 배포해본다. Data Flow를 사용해서 이 애플리케이션들을 배포하는 일은 별도 가이드 문서에서 다룬다. 애플리케이션을 직접 수동으로 배포해보면 Data Flow가 자동화해주는 일들을 더 깊게 이해할 수 있다.
이어지는 섹션에선 이 애플리케이션들을 빌드하는 방법부터 설명한다.
원한다면 이 애플리케이션들의 소스 코드를 가지고 있는 zip 파일을 다운받아, 압축을 풀고 빌드해서 배포 섹션으로 이동해도 좋다.
이 세 가지 애플리케이션이 전부 들어있는 프로젝트는 브라우저에서 다운받을 수 있다. 아래 예시처럼 커맨드라인을 사용해도 된다:
wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/master/dataflow-website/stream-developer-guides/streams/standalone-stream-sample/dist/usage-cost-stream-sample.zip?raw=true -O usage-cost-stream-sample.zip
Building the downloaded sample
스트림 앱은 공통 코드 베이스는 유지하면서, 카프카 브로커나 RabbitMQ와 함께 실행하도록 설정할 수 있다. 유일한 차이점은 executable jar 파일에 있다. 카프카 브로커와 함께 동작하려면 카프카 바인더 의존성이 필요하다 (디폴트로 활성화된다). RabbitMQ에선 Rabbit 바인더가 필요하다.
Kafka Streams, Amazon Kinesis, Google PubSub (partner maintained), Solace PubSub+ (partner maintained), Azure Event Hubs (partner maintained)를 지원하는 Spring Cloud Stream 바인더 구현체도 있다. 바인더는 빌드 시점에 선택한다. 샘플 프로젝트는 메이븐 프로파일을 사용해 적당한 바인더를 활성화한다.
카프카 전용 샘플 스트림 앱을 빌드하려면 프로젝트 루트 디렉토리에서 아래 명령어를 실행해라:
$./mvnw clean package -Pkafka
RabbitMQ 전용 샘플 스트림 앱을 빌드하려면 프로젝트 루트 디렉토리에서 아래 명령어를 실행해라:
$./mvnw clean package -Prabbit
Development
설정한 바인더를 사용해서 통신하는 Spring Cloud Stream 애플리케이션을 세 가지 만들어보겠다.
이번 시나리오는 고객을 위한 청구서를 만드는 휴대폰 회사다. 사용자가 통화를 할 때마다 duration과 통화 중에 사용한 data 양이 정해진다. 청구서를 만드는 프로세스에선 원래의 통화 데이터를, 통화 시간 동안의 비용과 사용한 데이터 양에 대한 비용으로 변환해야 한다.
통화 내역은 해당 통화의 duration과 통화 중 사용한 data 양을 가지고 있는 UsageDetail 클래스로 모델링한다. 청구서는 통화 비용(costCall)과 데이터 비용(costData)을 가지고 있는 UsageCostDetail 클래스로 모델링한다. 각 클래스는 전화를 건 사람을 식별할 수 있는 ID(userId)를 포함한다.
이번에 만들어볼 세 가지 스트리밍 애플리케이션은 다음과 같다:
Source애플리케이션(UsageDetailSender로 명명)은 각userId마다 사용자의 통화duration과 사용한data양을 만들어내고,UsageDetail객체를 포함하는 메세지를 JSON으로 전송한다.Processor애플리케이션(UsageCostProcessor라 명명)은 이UsageDetail을 컨슘하고userId당 통화 비용과 데이터 비용을 계산한다.UsageCostDetail객체를 JSON으로 전송한다.Sink애플리케이션(UsageCostLogger로 명명)은UsageCostDetail객체를 컨슘하고 통화 및 데이터 비용을 기록한다.
목차
Source
여기서는 UsageDetailSender 소스를 생성한다.
Spring Initializr에서 만들어 둔 프로젝트를 바로 다운로드하거나
Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:
- 그룹명은
io.spring.dataflow.sample, 아티팩트명은usage-detail-sender-kafka, 패키지는o.spring.dataflow.sample.usagedetailsender를 사용해서 새 메이븐 프로젝트를 생성한다. - Dependencies 텍스트 박스에
Kafka를 입력해서 카프카 바인더 의존성을 선택한다. - Dependencies 텍스트 박스에
Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다. - Dependencies 텍스트 박스에
Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다. - Generate Project 버튼을 클릭한다.
이제 usage-detail-sender-kafka.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.
카프카를 메세지 브로커로 사용할 때는 다양한 설정 옵션들을 선택해 확장하거나 재정의해서 원하는 런타임 동작을 완성할 수 있다. 카프카 바인더 설정 프로퍼티들은 카프카 바인더 문서에 정리되어 있다.
Spring Initializr에서 만든 프로젝트를 바로 다운로드하거나
Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:
- 그룹명은
io.spring.dataflow.sample, 아티팩트명은usage-detail-sender-rabbit, 패키지는o.spring.dataflow.sample.usagedetailsender를 사용해서 새 메이븐 프로젝트를 생성한다. - Dependencies 텍스트 박스에
RabbitMQ를 입력해서 RabbitMQ 바인더 의존성을 선택한다. - Dependencies 텍스트 박스에
Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다. - Dependencies 텍스트 박스에
Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다. - Generate Project 버튼을 클릭한다.
이제 usage-detail-sender-rabbit.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.
Durable Queues
Spring Cloud Stream 컨슈머 애플리케이션은 기본적으로 anonymous auto-delete 큐를 생성한다. 그렇기 때문에 producer 애플리케이션이 컨슈머 애플리케이션보다 먼저 기동됐다면, producer가 메세지를 저장하고 전달하지 못하는 메세지가 생길 수 있다. exchange를 durable로 설정했다 하더라도 이후에도 컨슘할 수 있게 메세지를 저장하려면 exchange에 바인딩할 durable 큐가 필요하다. 이런한 까닭으로 메세지 전달을 보장하기 위해서는 durable 큐가 필요하다.
durable 큐를 미리 생성하고 exchange에 바인딩하려면 producer 애플리케이션에 아래 프로퍼티를 설정해야 한다:
spring.cloud.stream.bindings.<channelName>.producer.requiredGroups
이 requiredGroups 프로퍼티는 producer가 메세지 전달을 보장해야 하는 그룹들을 콤마로 구분해서 받는다. 이 프로퍼티를 설정하면 <exchange>.<requiredGroup> 형식을 통해 durable 큐가 생성된다.
RabbitMQ를 메세지 브로커로 사용할 때는 다양한 설정 옵션들을 선택해 확장하거나 재정의해서 원하는 런타임 동작을 완성할 수 있다. RabbitMQ 바인더 설정 프로퍼티들은 RabbitMQ 바인더 문서에 정리되어 있다.
Business Logic
이제 이 애플리케이션에 필요한 코드를 만들면 된다. 비지니스 로직을 작성하려면:
io.spring.dataflow.sample.usagedetailsender패키지에 UsageDetail.java와 같은UsageDetail클래스를 생성한다.UsageDetail클래스에는userId,data,duration프로퍼티가 담겨있다.io.spring.dataflow.sample.usagedetailsender패키지에UsageDetailSender클래스를 생성한다. 이 클래스는 아래 코드와 비슷하게 만들어야 한다:
package io.spring.dataflow.sample.usagedetailsender;
import java.util.Random;
import java.util.function.Supplier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageDetailSender {
private String[] users = {"user1", "user2", "user3", "user4", "user5"};
@Bean
public Supplier<UsageDetail> sendEvents() {
return () -> {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
return usageDetail;
};
}
}
sendEvents Supplier는 UsageDetail 객체에 랜덤 값을 채워 제공한다. Spring Cloud Stream은 자동으로 이 함수를 바인딩해서 해당 데이터를 설정해둔 출력 목적지로 전송한다. Spring Cloud Stream은 모든 Supplier에 디폴트 폴러도 설정하는데, 기본적으로 1초 간격으로 함수를 호출하게 된다.
Configuration
source 애플리케이션을 설정할 때는, producer가 데이터를 게시할 output 바인딩 목적지를 설정해야 한다 (RabbitMQ exchange나 카프카 토픽의 이름).
‘sendEvents 함수의 첫 번째 출력 파라미터’에 해당하는 출력은, 함수 출력 바인딩 이름으로 표현하면 sendEvents-out-0이다. 편의상 이 sendEvents-out-0을 논리적인 이름 output으로 alias를 지정하겠다. alias를 사용하지 않고 출력 바인딩 이름을 직접 사용해도 된다 (spring.cloud.stream.bindings.sendEvents-out-0.destination=usage-detail). 자세한 설명은 Functional Binding Names을 참고해라.
src/main/resources/application.properties에 아래 프로퍼티를 추가해라:
spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0
Building
이제 이 Usage Detail Sender 애플리케이션을 빌드하면 된다.
메이븐으로 프로젝트를 빌드하려면 usage-detail-sender 루트 디렉토리에서 아래 명령어를 실행해라:
./mvnw clean package
Testing
Spring Cloud Stream은 Spring Cloud Stream 애플리케이션을 테스트할 수 있는 test jar를 제공한다:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
TestChannelBinderConfiguration에선 메세지 브로커 바인더 구현체 대신, 애플리케이션의 아웃바운드와 인바운드 메세지를 추적하고 테스트하는할 수 있는 인메모리 바인더 구현체를 제공한다. 테스트 설정에는 메세지를 보내고 받기 위한 InputDestination과 OutputDestination 빈이 들어있다. UsageDetailSender 애플리케이션을 단위 테스트하려면 UsageDetailSenderApplicationTests 클래스에 아래 코드를 추가해라:
package io.spring.dataflow.sample.usagedetailsender;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
public class UsageDetailSenderApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageDetailSender() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageDetailSenderApplication.class))
.web(WebApplicationType.NONE)
.run()) {
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
UsageDetail usageDetail = (UsageDetail) converter
.fromMessage(sourceMessage, UsageDetail.class);
assertThat(usageDetail.getUserId()).isBetween("user1", "user5");
assertThat(usageDetail.getData()).isBetween(0L, 700L);
assertThat(usageDetail.getDuration()).isBetween(0L, 300L);
}
}
}
contextLoads테스트 케이스에선 애플리케이션을 정상적으로 기동할 수 있는지 검증한다.testUsageDetailSender테스트 케이스에선OutputDestination을 사용해UsageDetailSender가 전송한 메세지를 받아 검증한다.
이 인메모리 테스트 바인더는 다른 메세지 브로커 바인더 구현체가 동작하는 방식과 그대로다. 특히, Spring Cloud Stream 애플리케이션에선 기본적으로 메세지 페이로드는 항상 JSON으로 인코딩한 바이트 배열이다. 컨슘하는 애플리케이션은 입력 채널에서 바이트를 수신하고, 컨텐츠 타입에 따라 적절한
MessageConverter에 자동으로 위임해서 바이트를 컨슈밍 함수의 인자 타입인UsageDetail에 맞게 변환한다. 테스트를 위해서는 이 단계를 명시적으로 수행해야 한다. 아니면MessageConverter를 사용하는 대신에 JSON 파서를 직접 호출해도 된다.
Processor
이번에는 UsageCostProcessor 프로세서를 생성한다.
Spring Initializr에서 만들어 둔 프로젝트를 바로 다운로드하거나
Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:
- 그룹명은
io.spring.dataflow.sample, 아티팩트명은usage-cost-processor-kafka, 패키지는o.spring.dataflow.sample.usagecostprocessor를 사용해서 새 메이븐 프로젝트를 생성한다. - Dependencies 텍스트 박스에
Kafka를 입력해서 카프카 바인더 의존성을 선택한다. - Dependencies 텍스트 박스에
Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다. - Dependencies 텍스트 박스에
Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다. - Generate Project 버튼을 클릭한다.
이제 usage-cost-processor-kafka.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.
Spring Initializr에서 만들어 둔 프로젝트를 바로 다운로드하거나
Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:
- 그룹명은
io.spring.dataflow.sample, 아티팩트명은usage-cost-processor-rabbit, 패키지는o.spring.dataflow.sample.usagecostprocessor를 사용해서 새 메이븐 프로젝트를 생성한다. - Dependencies 텍스트 박스에
RabbitMQ를 입력해서 RabbitMQ 바인더 의존성을 선택한다. - Dependencies 텍스트 박스에
Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다. - Dependencies 텍스트 박스에
Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다. - Generate Project 버튼을 클릭한다.
이제 usage-cost-processor-rabbit.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.
Business Logic
이제 이 애플리케이션에 필요한 코드를 만들면 된다. 비지니스 로직을 작성하려면:
io.spring.dataflow.sample.usagecostprocessor패키지에UsageDetail클래스를 생성한다. 클래스 내용물은 UsageDetail.java와 비슷하다.UsageDetail클래스에는userId,data,duration프로퍼티가 담겨있다.io.spring.dataflow.sample.usagecostprocessor패키지에UsageCostDetail클래스를 생성한다. 내용물은 UsageCostDetail.java와 비슷하다.UsageCostDetail클래스에는userId,callCost,dataCost프로퍼티가 담겨있다.UsageDetail메세지를 받아 통화 및 데이터 비용을 계산하고UsageCostDetail메세지를 전송할UsageCostProcessor클래스를io.spring.dataflow.sample.usagecostprocessor패키지에 생성해라. 소스 코드는 다음과 같다:
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.function.Function;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageCostProcessor {
private double ratePerSecond = 0.1;
private double ratePerMB = 0.05;
@Bean
public Function<UsageDetail, UsageCostDetail> processUsageCost() {
return usageDetail -> {
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId(usageDetail.getUserId());
usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
return usageCostDetail;
};
}
}
위 애플리케이션에선 UsageDetail을 받아 UsageCostDetail을 반환하는 Function 빈을 선언하고 있다. Spring Cloud Stream은 이 함수를 찾아 해당 입출력을 메세징 미들웨어에 설정된 목적지에 바인딩한다. 앞 섹션에서 설명했듯이 Spring Cloud Stream은 적절한 MessageConverter를 사용해 메세지를 필요한 타입으로 변환해준다.
Configuration
processor 애플리케이션을 설정할 땐 아래 프로퍼티들을 설정해야 한다:
- 이 애플리케이션이 구독하는
input바인딩 목적지 (카프카 토픽이나 RabbitMQ exchange). - producer가 데이터를 게시할
output바인딩 목적지.
프로덕션 애플리케이션에선
spring.cloud.stream.bindings.input.group을 설정해서 이 컨슈머 애플리케이션이 속해있는 컨슈머 그룹을 지정하는 게 좋다. 이렇게 하면 추가적인 컨슈머 애플리케이션들도 각자 고유 그룹 id로 식별하면서 모든 메세지를 수신할 수 있다. 각 컨슈머 그룹은 여러 개의 인스턴스로 확장해서 작업 부하를 분산할 수 있다. Spring Cloud Stream은 RabbitMQ나 기타 다른 바인더 구현체로 확장할 수 있도록 카프카가 가진 이 고유의 기능을 추상화한다.
src/main/resources/application.properties에 아래 프로퍼티들을 추가해라:
spring.cloud.stream.function.bindings.processUsageCost-in-0=input
spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0
편의상 함수 바인딩 이름 processUsageCost-in-0과 processUsageCost-out-0을 각각 input, output으로 alias를 지정한다.
spring.cloud.stream.bindings.input.destination프로퍼티는UsageCostProcessor객체의input을usage-detail목적지에 바인딩한다.spring.cloud.stream.bindings.output.destination프로퍼티는UsageCostProcessor객체의 출력을usage-cost목적지에 바인딩한다.
입력 목적지는 반드시 소스 애플리케이션의 출력 목적지와 동일해야 한다. 마찬가지로 출력 목적지는 아래에 나오는 싱크의 입력 목적지와 동일해야 한다.
Building
이제 이 Usage Cost Processor 애플리케이션을 빌드하면 된다. usage-cost-processor 루트 디렉토리에서 아래 명령어를 실행해 메이븐으로 프로젝트를 빌드해라:
./mvnw clean package
Testing
앞에서도 말했지만, Spring Cloud Stream은 Spring Cloud Stream 애플리케이션을 테스트할 수 있는 test jar를 제공한다:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
TestChannelBinderConfiguration은 애플리케이션의 아웃바운드와 인바운드 메세지를 추적하고 테스트할 수 있는 인메모리 바인더 구현체를 제공한다. 테스트 설정에는 메세지를 보내고 받기 위한 InputDestination과 OutputDestination 빈이 들어있다. UsageCostProcessor 애플리케이션을 단위 테스트하려면 UsageCostProcessorApplicationTests 클래스에 다음 코드를 추가해라:
package io.spring.dataflow.sample.usagecostprocessor;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import static org.assertj.core.api.Assertions.assertThat;
public class UsageCostProcessorApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageCostProcessor() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
.run()) {
InputDestination source = context.getBean(InputDestination.class);
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId("user1");
usageDetail.setDuration(30L);
usageDetail.setData(100L);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Message<?> message = converter.toMessage(usageDetail, messageHeaders);
source.send(message);
OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);
final UsageCostDetail usageCostDetail = (UsageCostDetail) converter
.fromMessage(sourceMessage, UsageCostDetail.class);
assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
}
}
}
contextLoads테스트 케이스에선 애플리케이션을 정상적으로 기동할 수 있는지 검증한다.testUsageCostProcessor테스트 케이스에선InputDestination을 사용해 메세지를 보내고,OutputDestination을 사용해 이 메세지를 받아 검증한다.
Sink
이번에는 UsageCostLogger 싱크를 생성한다.
Spring Initializr에서 만들어 둔 프로젝트를 바로 다운로드받아 Generate Project를 클릭하거나
Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:
- 그룹명은
io.spring.dataflow.sample, 아티팩트명은usage-cost-logger-kafka, 패키지는o.spring.dataflow.sample.usagecostlogger를 사용해서 새 메이븐 프로젝트를 생성한다. - Dependencies 텍스트 박스에
Kafka를 입력해서 카프카 바인더 의존성을 선택한다. - Dependencies 텍스트 박스에
Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다. - Dependencies 텍스트 박스에
Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다. - Generate Project 버튼을 클릭한다.
이제 usage-cost-logger-kafka.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.
Spring Initializr에서 만들어 둔 프로젝트를 바로 다운로드받아 Generate Project를 클릭하거나
Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:
- 그룹명은
io.spring.dataflow.sample, 아티팩트명은usage-cost-logger-rabbit, 패키지는o.spring.dataflow.sample.usagecostlogger를 사용해서 새 메이븐 프로젝트를 생성한다. - Dependencies 텍스트 박스에
RabbitMQ를 입력해서 RabbitMQ 바인더 의존성을 선택한다. - Dependencies 텍스트 박스에
Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다. - Dependencies 텍스트 박스에
Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다. - Generate Project 버튼을 클릭한다.
이제 usage-cost-logger-rabbit.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.
Business Logic
비지니스 로직을 작성하려면:
io.spring.dataflow.sample.usagecostlogger패키지에UsageCostDetail클래스를 생성한다. 클래스 내용물은 UsageCostDetail.java와 비슷하다.UsageCostDetail클래스에는userId,callCost,dataCost프로퍼티가 담겨있다.UsageCostDetail메세지를 받아 로그를 남기는UsageCostLogger클래스를io.spring.dataflow.sample.usagecostlogger패키지에 생성해라. 소스 코드는 다음과 같다:
package io.spring.dataflow.sample.usagecostlogger;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class UsageCostLogger {
private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);
@Bean
public Consumer<UsageCostDetail> process() {
return usageCostDetail -> {
logger.info(usageCostDetail.toString());
};
}
}
위 애플리케이션에선 UsageCostDetail을 받는 Consumer 빈을 선언하고 있다. Spring Cloud Stream은 이 함수를 찾아 해당 입력을 메세징 미들웨어에 설정된 입력 목적지에 바인딩한다. 앞 섹션에서 설명했듯이 Spring Cloud Stream은 컨슈머를 실행하기 전에 적절한 MessageConverter를 사용해 메세지를 필요한 타입으로 변환해준다.
Configuration
sink 애플리케이션을 설정할 땐 아래 프로퍼티들을 설정해야 한다:
- 이 애플리케이션이 구독하는
input바인딩 목적지 (카프카 토픽이나 RabbitMQ exchange). - 이 컨슈머 애플리케이션이 속해있는 컨슈머 그룹을 지정하는
group(생략 가능).
편의상 함수 바인딩 이름 process-in-0을 input으로 alias를 지정한다.
src/main/resources/application.properties에 아래 프로퍼티들을 추가해라:
spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0
Building
이제 이 Usage Cost Logger 애플리케이션을 빌드하면 된다. usage-cost-logger 루트 디렉토리에서 아래 명령어를 실행해 메이븐으로 프로젝트를 빌드해라:
./mvnw clean package
Testing
UsageCostLogger를 테스트하려면 UsageCostLoggerApplicationTests 클래스를 만들어 다음 코드를 추가해라:
package io.spring.dataflow.sample.usagecostlogger;
import java.util.HashMap;
import java.util.Map;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {
@Test
public void contextLoads() {
}
@Test
public void testUsageCostLogger(CapturedOutput output) {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageCostLoggerApplication.class))
.web(WebApplicationType.NONE)
.run()) {
InputDestination source = context.getBean(InputDestination.class);
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId("user1");
usageCostDetail.setCallCost(3.0);
usageCostDetail.setDataCost(5.0);
final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Message<?> message = converter.toMessage(usageCostDetail, messageHeaders);
source.send(message);
Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
}
}
}
pom.xml에 awaitility 의존성을 추가해라:
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
contextLoads테스트 케이스에선 애플리케이션을 정상적으로 기동할 수 있는지 검증한다.testUsageCostLogger테스트 케이스에선 스프링 부트의 테스트 프레임워크에서OutputCaptureExtension을 사용해UsageCostLogger의process메소드를 호출하는지 검증한다.
Deployment
다음에 해볼 일은 이 애플리케이션들에 설정해둔 메세지 브로커를 사용해서, 지원하는 플랫폼 중 하나에 애플리케이션들을 배포해보는 거다.
Next :
Stream Application Deployment
샘플 스트림 애플리케이션을 수동으로 배포하기
전체 목차는 여기에 있습니다.