토리맘의 한글라이즈 프로젝트 logo 토리맘의 한글라이즈 프로젝트

스프링 클라우드 데이터 플로우 공식 레퍼런스를 한글로 번역한 문서입니다.

전체 목차는 여기에 있습니다.


이 가이드에선 Spring Cloud Stream을 사용해 세 가지 스프링 부트 애플리케이션을 개발하고, 클라우드 파운드리, 쿠버네티스, 로컬 머신에 배포해본다. Data Flow를 사용해서 이 애플리케이션들을 배포하는 일은 별도 가이드 문서에서 다룬다. 애플리케이션을 직접 수동으로 배포해보면 Data Flow가 자동화해주는 일들을 더 깊게 이해할 수 있다.

이어지는 섹션에선 이 애플리케이션들을 빌드하는 방법부터 설명한다.

원한다면 이 애플리케이션들의 소스 코드를 가지고 있는 zip 파일을 다운받아, 압축을 풀고 빌드해서 배포 섹션으로 이동해도 좋다.

이 세 가지 애플리케이션이 전부 들어있는 프로젝트는 브라우저에서 다운받을 수 있다. 아래 예시처럼 커맨드라인을 사용해도 된다:

wget https://github.com/spring-cloud/spring-cloud-dataflow-samples/blob/master/dataflow-website/stream-developer-guides/streams/standalone-stream-sample/dist/usage-cost-stream-sample.zip?raw=true -O usage-cost-stream-sample.zip

Building the downloaded sample

스트림 앱은 공통 코드 베이스는 유지하면서, 카프카 브로커나 RabbitMQ와 함께 실행하도록 설정할 수 있다. 유일한 차이점은 executable jar 파일에 있다. 카프카 브로커와 함께 동작하려면 카프카 바인더 의존성이 필요하다 (디폴트로 활성화된다). RabbitMQ에선 Rabbit 바인더가 필요하다.

Kafka Streams, Amazon Kinesis, Google PubSub (partner maintained), Solace PubSub+ (partner maintained), Azure Event Hubs (partner maintained)를 지원하는 Spring Cloud Stream 바인더 구현체도 있다. 바인더는 빌드 시점에 선택한다. 샘플 프로젝트는 메이븐 프로파일을 사용해 적당한 바인더를 활성화한다.

Kafka RabbitMQ

카프카 전용 샘플 스트림 앱을 빌드하려면 프로젝트 루트 디렉토리에서 아래 명령어를 실행해라:

$./mvnw clean package -Pkafka

RabbitMQ 전용 샘플 스트림 앱을 빌드하려면 프로젝트 루트 디렉토리에서 아래 명령어를 실행해라:

$./mvnw clean package -Prabbit

Development

설정한 바인더를 사용해서 통신하는 Spring Cloud Stream 애플리케이션을 세 가지 만들어보겠다.

이번 시나리오는 고객을 위한 청구서를 만드는 휴대폰 회사다. 사용자가 통화를 할 때마다 duration과 통화 중에 사용한 data 양이 정해진다. 청구서를 만드는 프로세스에선 원래의 통화 데이터를, 통화 시간 동안의 비용과 사용한 데이터 양에 대한 비용으로 변환해야 한다.

통화 내역은 해당 통화의 duration과 통화 중 사용한 data 양을 가지고 있는 UsageDetail 클래스로 모델링한다. 청구서는 통화 비용(costCall)과 데이터 비용(costData)을 가지고 있는 UsageCostDetail 클래스로 모델링한다. 각 클래스는 전화를 건 사람을 식별할 수 있는 ID(userId)를 포함한다.

이번에 만들어볼 세 가지 스트리밍 애플리케이션은 다음과 같다:

목차


Source

여기서는 UsageDetailSender 소스를 생성한다.

Kafka RabbitMQ

Spring Initializr에서 만들어 둔 프로젝트를 바로 다운로드하거나

Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:

  1. 그룹명은 io.spring.dataflow.sample, 아티팩트명은 usage-detail-sender-kafka, 패키지는 o.spring.dataflow.sample.usagedetailsender를 사용해서 새 메이븐 프로젝트를 생성한다.
  2. Dependencies 텍스트 박스에 Kafka를 입력해서 카프카 바인더 의존성을 선택한다.
  3. Dependencies 텍스트 박스에 Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다.
  4. Dependencies 텍스트 박스에 Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다.
  5. Generate Project 버튼을 클릭한다.

이제 usage-detail-sender-kafka.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.

카프카를 메세지 브로커로 사용할 때는 다양한 설정 옵션들을 선택해 확장하거나 재정의해서 원하는 런타임 동작을 완성할 수 있다. 카프카 바인더 설정 프로퍼티들은 카프카 바인더 문서에 정리되어 있다.

Spring Initializr에서 만든 프로젝트를 바로 다운로드하거나

Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:

  1. 그룹명은 io.spring.dataflow.sample, 아티팩트명은 usage-detail-sender-rabbit, 패키지는 o.spring.dataflow.sample.usagedetailsender를 사용해서 새 메이븐 프로젝트를 생성한다.
  2. Dependencies 텍스트 박스에 RabbitMQ를 입력해서 RabbitMQ 바인더 의존성을 선택한다.
  3. Dependencies 텍스트 박스에 Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다.
  4. Dependencies 텍스트 박스에 Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다.
  5. Generate Project 버튼을 클릭한다.

이제 usage-detail-sender-rabbit.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.

Durable Queues

Spring Cloud Stream 컨슈머 애플리케이션은 기본적으로 anonymous auto-delete 큐를 생성한다. 그렇기 때문에 producer 애플리케이션이 컨슈머 애플리케이션보다 먼저 기동됐다면, producer가 메세지를 저장하고 전달하지 못하는 메세지가 생길 수 있다. exchange를 durable로 설정했다 하더라도 이후에도 컨슘할 수 있게 메세지를 저장하려면 exchange에 바인딩할 durable 큐가 필요하다. 이런한 까닭으로 메세지 전달을 보장하기 위해서는 durable 큐가 필요하다.

durable 큐를 미리 생성하고 exchange에 바인딩하려면 producer 애플리케이션에 아래 프로퍼티를 설정해야 한다:

spring.cloud.stream.bindings.<channelName>.producer.requiredGroups

requiredGroups 프로퍼티는 producer가 메세지 전달을 보장해야 하는 그룹들을 콤마로 구분해서 받는다. 이 프로퍼티를 설정하면 <exchange>.<requiredGroup> 형식을 통해 durable 큐가 생성된다.

RabbitMQ를 메세지 브로커로 사용할 때는 다양한 설정 옵션들을 선택해 확장하거나 재정의해서 원하는 런타임 동작을 완성할 수 있다. RabbitMQ 바인더 설정 프로퍼티들은 RabbitMQ 바인더 문서에 정리되어 있다.

Business Logic

이제 이 애플리케이션에 필요한 코드를 만들면 된다. 비지니스 로직을 작성하려면:

  1. io.spring.dataflow.sample.usagedetailsender 패키지에 UsageDetail.java와 같은 UsageDetail 클래스를 생성한다. UsageDetail 클래스에는 userId, data, duration 프로퍼티가 담겨있다.
  2. io.spring.dataflow.sample.usagedetailsender 패키지에 UsageDetailSender 클래스를 생성한다. 이 클래스는 아래 코드와 비슷하게 만들어야 한다:
package io.spring.dataflow.sample.usagedetailsender;

import java.util.Random;
import java.util.function.Supplier;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageDetailSender {

	private String[] users = {"user1", "user2", "user3", "user4", "user5"};

	@Bean
	public Supplier<UsageDetail> sendEvents() {
		return () -> {
			UsageDetail usageDetail = new UsageDetail();
			usageDetail.setUserId(this.users[new Random().nextInt(5)]);
			usageDetail.setDuration(new Random().nextInt(300));
			usageDetail.setData(new Random().nextInt(700));
			return usageDetail;
		};
	}
}

sendEvents Supplier는 UsageDetail 객체에 랜덤 값을 채워 제공한다. Spring Cloud Stream은 자동으로 이 함수를 바인딩해서 해당 데이터를 설정해둔 출력 목적지로 전송한다. Spring Cloud Stream은 모든 Supplier에 디폴트 폴러도 설정하는데, 기본적으로 1초 간격으로 함수를 호출하게 된다.

Configuration

source 애플리케이션을 설정할 때는, producer가 데이터를 게시할 output 바인딩 목적지를 설정해야 한다 (RabbitMQ exchange나 카프카 토픽의 이름).

sendEvents 함수의 첫 번째 출력 파라미터’에 해당하는 출력은, 함수 출력 바인딩 이름으로 표현하면 sendEvents-out-0이다. 편의상 이 sendEvents-out-0을 논리적인 이름 output으로 alias를 지정하겠다. alias를 사용하지 않고 출력 바인딩 이름을 직접 사용해도 된다 (spring.cloud.stream.bindings.sendEvents-out-0.destination=usage-detail). 자세한 설명은 Functional Binding Names을 참고해라.

src/main/resources/application.properties에 아래 프로퍼티를 추가해라:

spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

Building

이제 이 Usage Detail Sender 애플리케이션을 빌드하면 된다.

메이븐으로 프로젝트를 빌드하려면 usage-detail-sender 루트 디렉토리에서 아래 명령어를 실행해라:

./mvnw clean package

Testing

Spring Cloud Stream은 Spring Cloud Stream 애플리케이션을 테스트할 수 있는 test jar를 제공한다:

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
	<type>test-jar</type>
	<classifier>test-binder</classifier>
	<scope>test</scope>
</dependency>

TestChannelBinderConfiguration에선 메세지 브로커 바인더 구현체 대신, 애플리케이션의 아웃바운드와 인바운드 메세지를 추적하고 테스트하는할 수 있는 인메모리 바인더 구현체를 제공한다. 테스트 설정에는 메세지를 보내고 받기 위한 InputDestinationOutputDestination 빈이 들어있다. UsageDetailSender 애플리케이션을 단위 테스트하려면 UsageDetailSenderApplicationTests 클래스에 아래 코드를 추가해라:

package io.spring.dataflow.sample.usagedetailsender;

import org.junit.jupiter.api.Test;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

import static org.assertj.core.api.Assertions.assertThat;

public class UsageDetailSenderApplicationTests {

	@Test
	public void contextLoads() {
	}

	@Test
	public void testUsageDetailSender() {
		try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration
						.getCompleteConfiguration(UsageDetailSenderApplication.class))
				.web(WebApplicationType.NONE)
				.run()) {

			OutputDestination target = context.getBean(OutputDestination.class);
			Message<byte[]> sourceMessage = target.receive(10000);

			final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
			UsageDetail usageDetail = (UsageDetail) converter
					.fromMessage(sourceMessage, UsageDetail.class);

			assertThat(usageDetail.getUserId()).isBetween("user1", "user5");
			assertThat(usageDetail.getData()).isBetween(0L, 700L);
			assertThat(usageDetail.getDuration()).isBetween(0L, 300L);
		}
	}
}

이 인메모리 테스트 바인더는 다른 메세지 브로커 바인더 구현체가 동작하는 방식과 그대로다. 특히, Spring Cloud Stream 애플리케이션에선 기본적으로 메세지 페이로드는 항상 JSON으로 인코딩한 바이트 배열이다. 컨슘하는 애플리케이션은 입력 채널에서 바이트를 수신하고, 컨텐츠 타입에 따라 적절한 MessageConverter에 자동으로 위임해서 바이트를 컨슈밍 함수의 인자 타입인 UsageDetail에 맞게 변환한다. 테스트를 위해서는 이 단계를 명시적으로 수행해야 한다. 아니면 MessageConverter를 사용하는 대신에 JSON 파서를 직접 호출해도 된다.


Processor

이번에는 UsageCostProcessor 프로세서를 생성한다.

Kafka RabbitMQ

Spring Initializr에서 만들어 둔 프로젝트를 바로 다운로드하거나

Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:

  1. 그룹명은 io.spring.dataflow.sample, 아티팩트명은 usage-cost-processor-kafka, 패키지는 o.spring.dataflow.sample.usagecostprocessor를 사용해서 새 메이븐 프로젝트를 생성한다.
  2. Dependencies 텍스트 박스에 Kafka를 입력해서 카프카 바인더 의존성을 선택한다.
  3. Dependencies 텍스트 박스에 Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다.
  4. Dependencies 텍스트 박스에 Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다.
  5. Generate Project 버튼을 클릭한다.

이제 usage-cost-processor-kafka.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.

Spring Initializr에서 만들어 둔 프로젝트를 바로 다운로드하거나

Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:

  1. 그룹명은 io.spring.dataflow.sample, 아티팩트명은 usage-cost-processor-rabbit, 패키지는 o.spring.dataflow.sample.usagecostprocessor를 사용해서 새 메이븐 프로젝트를 생성한다.
  2. Dependencies 텍스트 박스에 RabbitMQ를 입력해서 RabbitMQ 바인더 의존성을 선택한다.
  3. Dependencies 텍스트 박스에 Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다.
  4. Dependencies 텍스트 박스에 Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다.
  5. Generate Project 버튼을 클릭한다.

이제 usage-cost-processor-rabbit.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.

Business Logic

이제 이 애플리케이션에 필요한 코드를 만들면 된다. 비지니스 로직을 작성하려면:

  1. io.spring.dataflow.sample.usagecostprocessor 패키지에 UsageDetail 클래스를 생성한다. 클래스 내용물은 UsageDetail.java와 비슷하다. UsageDetail 클래스에는 userId, data, duration 프로퍼티가 담겨있다.
  2. io.spring.dataflow.sample.usagecostprocessor 패키지에 UsageCostDetail 클래스를 생성한다. 내용물은 UsageCostDetail.java와 비슷하다. UsageCostDetail 클래스에는 userId, callCost, dataCost 프로퍼티가 담겨있다.
  3. UsageDetail 메세지를 받아 통화 및 데이터 비용을 계산하고 UsageCostDetail 메세지를 전송할 UsageCostProcessor 클래스를 io.spring.dataflow.sample.usagecostprocessor 패키지에 생성해라. 소스 코드는 다음과 같다:
package io.spring.dataflow.sample.usagecostprocessor;

import java.util.function.Function;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageCostProcessor {

	private double ratePerSecond = 0.1;

	private double ratePerMB = 0.05;

	@Bean
	public Function<UsageDetail, UsageCostDetail> processUsageCost() {
		return usageDetail -> {
			UsageCostDetail usageCostDetail = new UsageCostDetail();
			usageCostDetail.setUserId(usageDetail.getUserId());
			usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
			usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
			return usageCostDetail;
		};
	}
}

위 애플리케이션에선 UsageDetail을 받아 UsageCostDetail을 반환하는 Function 빈을 선언하고 있다. Spring Cloud Stream은 이 함수를 찾아 해당 입출력을 메세징 미들웨어에 설정된 목적지에 바인딩한다. 앞 섹션에서 설명했듯이 Spring Cloud Stream은 적절한 MessageConverter를 사용해 메세지를 필요한 타입으로 변환해준다.

Configuration

processor 애플리케이션을 설정할 땐 아래 프로퍼티들을 설정해야 한다:

프로덕션 애플리케이션에선 spring.cloud.stream.bindings.input.group을 설정해서 이 컨슈머 애플리케이션이 속해있는 컨슈머 그룹을 지정하는 게 좋다. 이렇게 하면 추가적인 컨슈머 애플리케이션들도 각자 고유 그룹 id로 식별하면서 모든 메세지를 수신할 수 있다. 각 컨슈머 그룹은 여러 개의 인스턴스로 확장해서 작업 부하를 분산할 수 있다. Spring Cloud Stream은 RabbitMQ나 기타 다른 바인더 구현체로 확장할 수 있도록 카프카가 가진 이 고유의 기능을 추상화한다.

src/main/resources/application.properties에 아래 프로퍼티들을 추가해라:

spring.cloud.stream.function.bindings.processUsageCost-in-0=input
spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

편의상 함수 바인딩 이름 processUsageCost-in-0processUsageCost-out-0을 각각 input, output으로 alias를 지정한다.

입력 목적지는 반드시 소스 애플리케이션의 출력 목적지와 동일해야 한다. 마찬가지로 출력 목적지는 아래에 나오는 싱크의 입력 목적지와 동일해야 한다.

Building

이제 이 Usage Cost Processor 애플리케이션을 빌드하면 된다. usage-cost-processor 루트 디렉토리에서 아래 명령어를 실행해 메이븐으로 프로젝트를 빌드해라:

./mvnw clean package

Testing

앞에서도 말했지만, Spring Cloud Stream은 Spring Cloud Stream 애플리케이션을 테스트할 수 있는 test jar를 제공한다:

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream</artifactId>
	<type>test-jar</type>
	<classifier>test-binder</classifier>
	<scope>test</scope>
</dependency>

TestChannelBinderConfiguration은 애플리케이션의 아웃바운드와 인바운드 메세지를 추적하고 테스트할 수 있는 인메모리 바인더 구현체를 제공한다. 테스트 설정에는 메세지를 보내고 받기 위한 InputDestinationOutputDestination 빈이 들어있다. UsageCostProcessor 애플리케이션을 단위 테스트하려면 UsageCostProcessorApplicationTests 클래스에 다음 코드를 추가해라:

package io.spring.dataflow.sample.usagecostprocessor;

import java.util.HashMap;
import java.util.Map;

import org.junit.jupiter.api.Test;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

import static org.assertj.core.api.Assertions.assertThat;

public class UsageCostProcessorApplicationTests {

	@Test
	public void contextLoads() {
	}

	@Test
	public void testUsageCostProcessor() {
		try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration.getCompleteConfiguration(
						UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
				.run()) {

			InputDestination source = context.getBean(InputDestination.class);

			UsageDetail usageDetail = new UsageDetail();
			usageDetail.setUserId("user1");
			usageDetail.setDuration(30L);
			usageDetail.setData(100L);

			final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
			Map<String, Object> headers = new HashMap<>();
			headers.put("contentType", "application/json");
			MessageHeaders messageHeaders = new MessageHeaders(headers);
			final Message<?> message = converter.toMessage(usageDetail, messageHeaders);

			source.send(message);

			OutputDestination target = context.getBean(OutputDestination.class);
			Message<byte[]> sourceMessage = target.receive(10000);

			final UsageCostDetail usageCostDetail = (UsageCostDetail) converter
					.fromMessage(sourceMessage, UsageCostDetail.class);

			assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
			assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
		}
	}
}

Sink

이번에는 UsageCostLogger 싱크를 생성한다.

Kafka RabbitMQ

Spring Initializr에서 만들어 둔 프로젝트를 바로 다운로드받아 Generate Project를 클릭하거나

Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:

  1. 그룹명은 io.spring.dataflow.sample, 아티팩트명은 usage-cost-logger-kafka, 패키지는 o.spring.dataflow.sample.usagecostlogger를 사용해서 새 메이븐 프로젝트를 생성한다.
  2. Dependencies 텍스트 박스에 Kafka를 입력해서 카프카 바인더 의존성을 선택한다.
  3. Dependencies 텍스트 박스에 Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다.
  4. Dependencies 텍스트 박스에 Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다.
  5. Generate Project 버튼을 클릭한다.

이제 usage-cost-logger-kafka.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.

Spring Initializr에서 만들어 둔 프로젝트를 바로 다운로드받아 Generate Project를 클릭하거나

Spring Initializr 사이트를 방문해서 아래 설명대로 따라하면 된다:

  1. 그룹명은 io.spring.dataflow.sample, 아티팩트명은 usage-cost-logger-rabbit, 패키지는 o.spring.dataflow.sample.usagecostlogger를 사용해서 새 메이븐 프로젝트를 생성한다.
  2. Dependencies 텍스트 박스에 RabbitMQ를 입력해서 RabbitMQ 바인더 의존성을 선택한다.
  3. Dependencies 텍스트 박스에 Cloud Stream을 입력해서 Spring Cloud Stream 의존성을 선택한다.
  4. Dependencies 텍스트 박스에 Actuator를 입력하고 스프링 부트 액추에이터 의존성을 선택한다.
  5. Generate Project 버튼을 클릭한다.

이제 usage-cost-logger-rabbit.zip 파일을 unzip하고 즐겨 사용하는 IDE에서 프로젝트를 임포트하면 된다.

Business Logic

비지니스 로직을 작성하려면:

  1. io.spring.dataflow.sample.usagecostlogger 패키지에 UsageCostDetail 클래스를 생성한다. 클래스 내용물은 UsageCostDetail.java와 비슷하다. UsageCostDetail 클래스에는 userId, callCost, dataCost 프로퍼티가 담겨있다.
  2. UsageCostDetail 메세지를 받아 로그를 남기는 UsageCostLogger 클래스를 io.spring.dataflow.sample.usagecostlogger 패키지에 생성해라. 소스 코드는 다음과 같다:
package io.spring.dataflow.sample.usagecostlogger;

import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class UsageCostLogger {

	private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);

	@Bean
	public Consumer<UsageCostDetail> process() {
		return usageCostDetail -> {
			logger.info(usageCostDetail.toString());
		};
	}
}

위 애플리케이션에선 UsageCostDetail을 받는 Consumer 빈을 선언하고 있다. Spring Cloud Stream은 이 함수를 찾아 해당 입력을 메세징 미들웨어에 설정된 입력 목적지에 바인딩한다. 앞 섹션에서 설명했듯이 Spring Cloud Stream은 컨슈머를 실행하기 전에 적절한 MessageConverter를 사용해 메세지를 필요한 타입으로 변환해준다.

Configuration

sink 애플리케이션을 설정할 땐 아래 프로퍼티들을 설정해야 한다:

편의상 함수 바인딩 이름 process-in-0input으로 alias를 지정한다.

src/main/resources/application.properties에 아래 프로퍼티들을 추가해라:

spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost
# Spring Boot will automatically assign an unused http port. This may be overridden using external configuration.
server.port=0

Building

이제 이 Usage Cost Logger 애플리케이션을 빌드하면 된다. usage-cost-logger 루트 디렉토리에서 아래 명령어를 실행해 메이븐으로 프로젝트를 빌드해라:

./mvnw clean package

Testing

UsageCostLogger를 테스트하려면 UsageCostLoggerApplicationTests 클래스를 만들어 다음 코드를 추가해라:

package io.spring.dataflow.sample.usagecostlogger;

import java.util.HashMap;
import java.util.Map;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {

	@Test
	public void contextLoads() {
	}

	@Test
	public void testUsageCostLogger(CapturedOutput output) {
		try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration
						.getCompleteConfiguration(UsageCostLoggerApplication.class))
				.web(WebApplicationType.NONE)
				.run()) {

			InputDestination source = context.getBean(InputDestination.class);

			UsageCostDetail usageCostDetail = new UsageCostDetail();
			usageCostDetail.setUserId("user1");
			usageCostDetail.setCallCost(3.0);
			usageCostDetail.setDataCost(5.0);

			final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
			Map<String, Object> headers = new HashMap<>();
			headers.put("contentType", "application/json");
			MessageHeaders messageHeaders = new MessageHeaders(headers);
			final Message<?> message = converter.toMessage(usageCostDetail, messageHeaders);

			source.send(message);

			Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
		}
	}
}

pom.xmlawaitility 의존성을 추가해라:

<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>

Deployment

다음에 해볼 일은 이 애플리케이션들에 설정해둔 메세지 브로커를 사용해서, 지원하는 플랫폼 중 하나에 애플리케이션들을 배포해보는 거다.


Next :
Stream Application Deployment
샘플 스트림 애플리케이션을 수동으로 배포하기

전체 목차는 여기에 있습니다.

<< >>

TOP