본문 바로가기
개발 언어/Java

Java 리액티브 Mono Flux 설명 예제

by 정권이 내 2024. 2. 6.

[Java] 리액티브 프로그래밍 Flux & Mono

리액티브 프로그래밍

리액티브 프로그래밍은 변화하는 데이터 스트림과 이벤트에 대한 반응을 표현하는 프로그래밍 패러다임입니다. 기본적으로 비동기 방식으로 작동하여 데이터 흐름을 기반으로 코드를 작성하고 데이터 변화에 자동으로 반응하도록 설계되었습니다.

 

특징

  • 비동기 처리: 스레드와 락에 의존하지 않고 비동기 방식으로 데이터를 처리합니다. 시스템 성능을 향상시키고 콜백 지옥(callback hell)을 방지하는 데 도움이 됩니다.
  • 데이터 흐름 기반 프로그래밍: 데이터 스트림을 기반으로 코드를 작성하여 데이터 변화에 자동으로 반응하도록 설계되어서 코드를 보다 간결하고 유지 관리하기 쉽게 만들어줍니다.
  • 반응형 시스템 구현: 외부 요청에 신속하게 반응하고 빠른 응답 시간을 제공하는 반응형 시스템 구현에 적합합니다.
  • 다양한 라이브러리 및 프레임워크 지원: RxJava, Spring Reactor 등 다양한 리액티브 프로그래밍 라이브러리 및 프레임워크가 자바에서 지원됩니다.

 

Flux & Mono

리액티브 프로그래밍에서 가장 기초적인 개념은 스트림을 제공하는 Publisher와 구독을 하는 주체인 Subscriber 입니다. 이 둘에 대한 내용은 리액티브 스트림 기본개념 글을 참조하시기 바랍니다.

Flux와 Mono는 자바에서 리액티브 프로그래밍을 위한 핵심 타입입니다. 두 타입 모두 Publisher 인터페이스를 구현하며 데이터 스트림을 방출하는 역할을 합니다.

img

 

Flux

  • 여러 개의 데이터를 순차적으로 방출할 수 있는 타입입니다.
  • 데이터 스트림은 무한하거나 유한할 수 있습니다.
  • 다양한 연산자를 사용하여 데이터 스트림을 변환, 필터링, 결합 등을 수행할 수 있습니다.

 

Mono

  • 최대 하나의 데이터만 방출할 수 있는 타입입니다.
  • 데이터 스트림은 0개 또는 1개의 데이터를 포함합니다.
  • Mono는 일반적으로 단일 값을 반환하는 작업, 예를 들어 데이터베이스 조회 등에 사용됩니다.

 

Flux 알아보기

Flux는 여러 개의 데이터를 순차적으로 방출하는 Publisher 타입입니다. 데이터 스트림은 무한하거나 유한할 수 있으며, 다양한 연산자를 사용하여 데이터 스트림을 변환, 필터링, 결합 등을 수행할 수 있습니다.

  • 생성: just, fromIterable 등 다양한 메서드를 사용하여 Flux를 생성
  • 연산: map, filter, flatMap 등 다양한 연산자를 사용하여 데이터 스트림 변환
  • 구독: subscribe 메서드를 사용하여 데이터 스트림 구독 및 처리

 

Flux 동작 구조

img

 

Flux 예제 - just, fromIterable

아래는 just를 사용하여 Integer 타입을 퍼블리싱하는 Flux를 만드는 예제입니다.

public static void main(String[] args) {

    Flux<Integer> flux = Flux
            .just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .map(number -> number * 2)
            .filter(number -> number % 2 == 0);

    flux.subscribe(System.out::println);
}
  • just: 정수 1부터 10까지의 데이터를 포함하는 Flux를 생성
  • map: 각 숫자값의 2배로 반환
  • filter: 짝수인 숫자만 반환
  • subscribe: 변환된 데이터 스트림을 처리

 

결과

2
4
...
...
18
20

 

Iterator 타입의 변수를 입력 스트림으로 처리하는 fromIterable 방식도 있습니다.

public static void main(String[] args) {
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

    Flux<Integer> flux = Flux.fromIterable(numbers)
            .map(number -> number * 2)
            .filter(number -> number % 2 == 0);

    flux.subscribe(System.out::println);
}

 

Flux 예제 - interval, take

just, fromIterable은 범위가 정해진 데이터 스트림을 처리한다면 interval은 일정한 시간 주기로 퍼블리싱을 하는 방식입니다. 중지점이 없으면 무한하게 수행하지만 take를 활용하여 퍼블리싱 횟수를 지정할수 있습니다.

 

아래 예제 코드는 100ms 주기로 10번 퍼블리싱하는 Flux입니다.

public static void main(String[] args) {
    Flux<Long> intervalFlux = Flux.interval(Duration.ofMillis(100))
            .doOnNext(System.out::println)
            .take(10);

    intervalFlux.subscribe();
}

 

그런데 위 예제 코드를 실행해보면 doOnNext 부분에서 100ms 마다 출력을 해야하지만 별다른 출력없이 종료되는데 이유는 Flux는 비동기 방식이기 때문에 Flux를 subscribe에 대한 응답을 기다리지 않기 때문입니다.

따라서 위의 상황에서 출력값을 보고싶다면 main 메서드가 종료되기전에 딜레이를 걸어보겠습니다.

public static void main(String[] args) {

        Flux<Long> intervalFlux = Flux.interval(Duration.ofMillis(100))
                .doOnNext(System.out::println)
                .take(10);

        intervalFlux.subscribe();

    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        System.out.println("Interrupted Exception");
    }

    System.out.println("Done");
}

 

결과

0
1
...
...
8
9
Done

 

Mono 알아보기

Mono는 리액티브 프로그래밍에서 0~1개의 데이터를 방출하는 Publisher 타입입니다. 데이터 스트림은 0개 또는 1개의 데이터를 포함하며, Mono는 일반적으로 단일 값을 반환하는 작업, 예를 들어 데이터베이스 조회 등에 사용됩니다.

 

Mono 동작 구조

img

 

Mono 예제 - just, justOrEmpty

지정된 값을 방출하는 Mono를 생성합니다. justOrEmpty는 Optional 형태로 반환됩니다.

public static void main(String[] args) {
    Mono<String> mono = Mono.just("Hello From just");

    log.info("Before subscribe");
    mono.subscribe(log::info);
    log.info("After subscribe");
}

 

결과

10:58:09.584 [main] INFO com.example.demo.MonoDemo -- Before subscribe
10:58:09.591 [main] INFO com.example.demo.MonoDemo -- Hello From just
10:58:09.591 [main] INFO com.example.demo.MonoDemo -- After subscribe

 

Mono 예제 - fromCallable

지정된 Callable을 실행하여 생성된 값을 방출하는 Mono를 생성합니다. fromCallable은 비동기 방식으로 값을 생성해야 하거나 값을 생성하는 데 시간이 걸리는 경우에 적합합니다. (ex, DB처리)

public static void main(String[] args) {

    log.info("Before subscribe");
    Mono.fromCallable(() -> {
        Thread.sleep(1000);
        return "Lazy Work Done";
    }).subscribe(log::info);
    log.info("After subscribe");
}

 

결과

11:22:40.038 [main] INFO com.example.demo.MonoExecuterDemo -- Before subscribe
11:22:41.094 [main] INFO com.example.demo.MonoExecuterDemo -- Lazy Work Done
11:22:41.094 [main] INFO com.example.demo.MonoExecuterDemo -- After subscribe

 

정리

Java의 리액티브 프로그래밍에서 Mono, Flux 두가지 퍼블리셔에 대해 알아봤습니다.

Flux와 Mono의 차이점을 표로 비교해보면 다음과 같습니다.

기능FluxMono
데이터 개수0개 이상0~1개
데이터 전달 방식순차적단일
활용 예시데이터 스트림 처리, 여러 API 호출 결과 조합데이터 조회, 단일 작업 결과 처리

 

또한 두개의 퍼블리셔가 공통적으로 가지는 특성은 비동기적으로 퍼블리싱을 수행할수 있다는 점입니다.

반응형

댓글