- 7.1. Starting and Stopping
- 7.2. Writing Data
- 7.3. Consuming Data
- 7.4. Lifecycle Callbacks
- 7.5. Connection Configuration
- 7.6. Metrics
리액터 네티를 사용하면 손쉽게 UdpServer
를 설정할 수 있다. UdpServer
는 UDP 서버 생성에 필요한 네티 기능을 대부분 숨겨주고, 리액티브 스트림 backpressure를 추가해준다.
7.1. Starting and Stopping
UDP 서버를 시작하려면 일단 UdpServer 인스턴스를 만들어 설정해야 한다. 기본적으로 호스트는 localhost
, 포트는 12012
로 설정된다. 다음은 UDP 서버를 생성하고 시작하는 예시다:
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
UdpServer.create() // (1)
.bindNow(Duration.ofSeconds(30)); // (2)
(1) UdpServer
인스턴스를 생성한다. 이제 인스턴스를 설정할 준비가 되었다.
(2) 서버를 블로킹 방식으로 시작하고 초기화를 마치길 기다린다.
반환된 Connection
은 블로킹 방식으로 서버를 셧다운 시켜주는 disposeNow()
를 포함하는, 간단한 서버 API를 제공한다.
7.1.1. Host and Port
특정 호스트와 포트로 서빙하고 싶다면, UDP 서버를 다음과 같이 설정하면 된다:
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
.host("localhost") // (1)
.port(8080) // (2)
(1) UDP 서버 호스트를 설정한다.
(2) UDP 서버 포트를 설정한다.
7.2. Writing Data
리모트 피어에 데이터를 전송하려면 I/O 핸들러를 연결해야 한다. I/O 핸들러는 UdpOutbound
에 접근할 수 있어 데이터를 write할 수 있다. 다음은 hello
를 전송하는 예시다:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
.handle((in, out) ->
.map(o -> {
if (o instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) o;
ByteBuf buf = Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8);
return new DatagramPacket(buf, p.sender()); // (1)
else {
return Mono.error(new Exception("Unexpected type of the message: " + o));
(1) 리모트 피어에 hello
라는 문자열을 전송한다.
7.3. Consuming Data
리모트 피어로부터 데이터를 받으려면 I/O 핸들러를 연결해야 한다. I/O 핸들러는 UdpInbound
에 접근할 수 있어 데이터를 읽을 수 있다. 다음은 데이터를 컨슈밍하는 예시다:
import io.netty.channel.socket.DatagramPacket;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
.handle((in, out) ->
.map(o -> {
if (o instanceof DatagramPacket) {
DatagramPacket p = (DatagramPacket) o;
return new DatagramPacket(p.content().retain(), p.sender()); // (1)
else {
return Mono.error(new Exception("Unexpected type of the message: " + o));
(1) 리모트 피어로부터 데이터를 받는다.
7.4. Lifecycle Callbacks
아래 라이프사이클 콜백을 사용해서 UDP 서버를 확장할 수도 있다:
: 서버 채널이 바운드되려고 할 때 호출한다.doOnBound
: 서버 채널이 바운드될 때 호출한다.doOnUnbound
: 서버 채널이 언바운드될 때 호출한다.
아래 예제는 doOnBound
메소드를 사용한다:
import io.netty.handler.codec.LineBasedFrameDecoder;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
.doOnBound(conn -> conn.addHandler(new LineBasedFrameDecoder(8192))) // (1)
(1) 서버 채널이 바운드될 때 LineBasedFrameDecoder
로 네티 파이프라인을 확장한다.
7.5. Connection Configuration
이번 섹션에선 UDP 레벨에서 사용할 수 있는 세 가지 설정에 대해 설명한다.
7.5.1. Channel Options
UDP 서버는 디폴트로 다음과 같은 옵션으로 설정된다:
UdpServerBind() {
this.config = new UdpServerConfig(
Collections.singletonMap(ChannelOption.AUTO_READ, false),
() -> new InetSocketAddress(NetUtil.LOCALHOST, DEFAULT_PORT));
다른 옵션이 더 필요하거나 현재 옵션을 바꾸고 싶다면, 다음과 같이 설정할 수 있다:
import io.netty.channel.ChannelOption;
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
네티 채널 옵션에 대한 자세한 내용은 아래 링크를 참고하라:
7.5.2. Wire Logger
리액터 네티는 피어 간의 트래픽을 살펴보기 위한 wire 로깅을 제공한다. 기본적으로 wire 로깅은 비활성화돼 있다. 활성화하려면 로거의 reactor.netty.udp.UdpServer
로 설정하고 아래 설정을 적용해야 한다:
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
.wiretap(true) // (1)
(1) wire 로깅을 활성화한다.
7.5.3. Event Loop Group
기본적으로 UDP 서버는 “이벤트 루프 그룹”을 사용하며, 워커 스레드는 초기화 시점 런타임에 사용할 수 있는 프로세서 수로 세팅된다 (단, 최소값은 4). 다른 설정이 필요하다면 LoopResource#create
메소드 중 하나를 사용하면 된다.
“이벤트 루프 그룹”의 디폴트 설정은 다음과 같다:
* Default worker thread count, fallback to available processor
* (but with a minimum value of 4)
public static final String IO_WORKER_COUNT = "reactor.netty.ioWorkerCount";
* Default selector thread count, fallback to -1 (no selector thread)
public static final String IO_SELECT_COUNT = "reactor.netty.ioSelectCount";
* Default worker thread count for UDP, fallback to available processor
* (but with a minimum value of 4)
public static final String UDP_IO_THREAD_COUNT = "reactor.netty.udp.ioThreadCount";
* Default quiet period that guarantees that the disposal of the underlying LoopResources
* will not happen, fallback to 2 seconds.
public static final String SHUTDOWN_QUIET_PERIOD = "reactor.netty.ioShutdownQuietPeriod";
* Default maximum amount of time to wait until the disposal of the underlying LoopResources
* regardless if a task was submitted during the quiet period, fallback to 15 seconds.
public static final String SHUTDOWN_TIMEOUT = "reactor.netty.ioShutdownTimeout";
* Default value whether the native transport (epoll, kqueue) will be preferred,
* fallback it will be preferred when available
이 설정을 바꾸려면 다음과 같이 설정해라:
import reactor.netty.Connection;
import reactor.netty.resources.LoopResources;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
LoopResources loop = LoopResources.create("event-loop", 1, 4, true);
Connection server =
7.6. Metrics
UDP 서버는 Micrometer 통합 지원을 내장하고 있다. 프리픽스로 reactor.netty.udp.server
를 사용하는 모든 메트릭에 해당한다.
아래 테이블은 UDP 서버 메트릭에 대한 정보를 담고 있다:
metric name | type | description |
reactor.netty.udp.server.data.received | DistributionSummary | 데이터 수신량 (바이트 단위) |
reactor.netty.udp.server.data.sent | DistributionSummary | 데이터 전송량 (바이트 단위) |
reactor.netty.udp.server.errors | Counter | 에러 발생 횟수 |
추가로 다음 메트릭도 사용할 수 있다:
metric name | type | description |
reactor.netty.bytebuf.allocator.used.heap.memory | Gauge | 힙 메모리의 바이트 수 |
reactor.netty.bytebuf.allocator.used.direct.memory | Gauge | 다이렉트 메모리의 바이트 수 |
reactor.netty.bytebuf.allocator.used.heap.arenas | Gauge | 힙 arena 수 (PooledByteBufAllocator 를 사용할 때) |
reactor.netty.bytebuf.allocator.used.direct.arenas | Gauge | 다이렉트 arena 수 (PooledByteBufAllocator 를 사용할 때) |
reactor.netty.bytebuf.allocator.used.threadlocal.caches | Gauge | 스레드 로컬 캐시 수(PooledByteBufAllocator 를 사용할 때) |
reactor.netty.bytebuf.allocator.used.tiny.cache.size | Gauge | tiny 캐시 사이즈 (PooledByteBufAllocator 를 사용할 때) |
reactor.netty.bytebuf.allocator.used.small.cache.size | Gauge | small 캐시 사이즈 (PooledByteBufAllocator 를 사용할 때) |
reactor.netty.bytebuf.allocator.used.normal.cache.size | Gauge | normal 캐시 사이즈 (PooledByteBufAllocator 를 사용할 때) |
reactor.netty.bytebuf.allocator.used.chunk.size | Gauge | arena의 청크 사이즈 (PooledByteBufAllocator 를 사용할 때) |
다음은 메트릭 통합을 활성화하는 예시다:
import reactor.netty.Connection;
import reactor.netty.udp.UdpServer;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
.metrics(true) // (1)
(1) 빌트인 Micrometer 인테그레이션을 활성화한다.
Micrometer 외에 다른 시스템과 통합해서 UDP 서버 메트릭을 보고 싶거나, 자체적으로 Micrometer를 통합하고 싶다면, 다음과 같이 자체 메트릭 레코더를 제공하면 된다:
import reactor.netty.Connection;
import reactor.netty.channel.ChannelMetricsRecorder;
import reactor.netty.udp.UdpServer;
import java.net.SocketAddress;
import java.time.Duration;
public class Application {
public static void main(String[] args) {
Connection server =
.metrics(true, CustomChannelMetricsRecorder::new) // (1)
(1) UDP 서버 메트릭을 활성화하고 ChannelMetricsRecorder
구현체를 제공한다.
