[Spring] 리액티브 스트림 Operations
지난번 포스팅에서는 Flow 라이브러리에 있는 Publisher, Subscriber를 사용했는데 이번엔 reactive-streams 라이브러리로 사용 해보겠습니다. 사용방법은 똑같지만 reactive-streams 는 모든 Java 버전에서 사용가능 하다는점이 다릅니다.
지난번과 동일하게 Publisher, Subscriber 를 작성해보겠습니다.
PubSub.java
package com.example.springreactivestudy.demo2;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Slf4j
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> publisher = new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
Iterable<Integer> integers = Stream.iterate(1, a -> a + 1)
.limit(10).collect(Collectors.toList());
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
integers.forEach(s -> sub.onNext(s));
sub.onComplete();
} catch (Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
}
});
}
};
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
log.debug("onSubscribe");
}
@Override
public void onNext(Integer integer) {
log.debug("onNext : " + integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError : " + t.getMessage());
}
@Override
public void onComplete() {
log.debug("onComplete");
}
};
publisher.subscribe(subscriber);
}
}
Stream.iterate()
파라미터로 seed 값과 규칙을 받아서 무한한 스트림을 만들수 있는 Stream의 메서드입니다. limit 메서드로
한계를 지정할수 있습니다.
뒤에 설명할 Operator 를 만들기 전에 Publisher 와 Subscriber 에서 구현한 내용들을 메서드로 추출 해보겠습니다. Publisher 에서 만든 Iterable 객체는 외부에서 파라미터로 전달합니다.
PubSub.java
public class PubSub {
public static void main(String[] args) {
Publisher<Integer> publisher1 = getPublisher
(Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList()));
Subscriber<Integer> subscriber = getSubscriber();
publisher1.subscribe(subscriber);
}
private static Subscriber<Integer> getSubscriber() {
return new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
log.debug("onSubscribe");
}
@Override
public void onNext(Integer integer) {
log.debug("onNext : " + integer);
}
@Override
public void onError(Throwable t) {
log.debug("onError : " + t.getMessage());
}
@Override
public void onComplete() {
log.debug("onComplete");
}
};
}
private static Publisher<Integer> getPublisher(Iterable<Integer> iterable) {
return sub -> sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
try {
iterable.forEach(s -> sub.onNext(s));
sub.onComplete();
} catch (Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
}
});
}
}
Operator
일반적인 Publisher와 Subscriber의 동작 원리는 두개의 오브젝트 사이에 Subscription 이라는 객체를 두고 데이터를 전송하는 구조입니다.
Operator는 Publisher 와 Subscriber 사이에 들어가는 Publisher 객체인데 두 객체 사이에서 전송되는 데이터들을 수정하는 작업을 수행할수 있습니다.
publisher2 라는 새로운 Publisher를 만들고 publisher1 을 operatorPub 메서드의 인자로 전달합니다. 이는 publisher1을 publisher2에 연결한것과 같은 의미입니다.
PubSub.java
package com.example.springreactivestudy.demo2;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Slf4j
public class PubSub {
public static void main(String[] args) {
Iterable<Integer> iterable = Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList());
Publisher<Integer> publisher1 = getPublisher(iterable);
Publisher<Integer> publisher2 = operatorPub(publisher1, s -> s * 10);
Subscriber<Integer> subscriber = getSubscriber();
publisher2.subscribe(subscriber);
}
private static Publisher<Integer> getPublisher(Iterable<Integer> iterable) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
sub.onSubscribe(new Subscription() {
@Override
public void request(long n) {
log.info("getPublisher request()");
try {
iterable.forEach(s -> sub.onNext(s));
sub.onComplete();
} catch (Throwable t) {
sub.onError(t);
}
}
@Override
public void cancel() {
}
});
}
};
}
private static Publisher<Integer> operatorPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
log.info("onSubscribe");
sub.onSubscribe(subscription);
}
@Override
public void onNext(Integer integer) {
log.info("onNext ");
sub.onNext(f.apply(integer));
}
@Override
public void onError(Throwable t) {
log.info("onError");
sub.onError(t);
}
@Override
public void onComplete() {
log.info("onComplete");
sub.onComplete();
}
});
}
};
}
private static Subscriber<Integer> getSubscriber() {
return new Subscriber<>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
log.info("onSubscribe");
}
@Override
public void onNext(Integer integer) {
log.info("onNext : " + integer);
}
@Override
public void onError(Throwable t) {
log.info("onError : " + t.getMessage());
}
@Override
public void onComplete() {
log.info("onComplete");
}
};
}
}
11:36:12.965 [main] INFO - onNext : 10
11:36:12.965 [main] INFO - onNext : 20
11:36:12.965 [main] INFO - onNext : 30
11:36:12.965 [main] INFO - onNext : 40
11:36:12.965 [main] INFO - onNext : 50
11:36:12.965 [main] INFO - onNext : 60
11:36:12.965 [main] INFO - onNext : 70
11:36:12.965 [main] INFO - onNext : 80
11:36:12.965 [main] INFO - onNext : 90
11:36:12.965 [main] INFO - onNext : 100
11:36:12.965 [main] INFO - onComplete
위의 예제는 Operator 객체인 publisher2 가 publisher1을 인자로 받아서 1~10의 숫자 배열에 대한 데이터를 Subscriber에 전송할때 중간에서 10씩 곱하는 연산을 수행하고 전달하는 것입니다.
그림을 통해 좀더 자세하게 설명을 하면 Operator publisher2
는 파라미터로 Publisher 와 Function<T,R>
를 받는데 각각 publisher1, s -> s * 10
이라는 값으로 전달했습니다.
publisher2.subscribe(subscriber);
이 부분에서 publisher2는 subscribe 메서드를 통해 subscriber 객체를 등록합니다.
private static Publisher<Integer> operatorPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new Subscriber<>() {
@Override
public void onSubscribe(Subscription subscription) {
log.info("onSubscribe");
sub.onSubscribe(subscription);
}
@Override
public void onNext(Integer integer) {
log.info("onNext ");
sub.onNext(f.apply(integer));
}
@Override
public void onError(Throwable t) {
log.info("onError");
sub.onError(t);
}
@Override
public void onComplete() {
log.info("onComplete");
sub.onComplete();
}
});
}
};
}
이 부분에서 operatorPub 의 인자로 받은 Function을 적용하기 위해서는 publisher1인 pub에서 subcribe 메서드의 인자로 새로운 Subscriber를 생성하고 여기서 재정의된 onSubscribe, onNext, onError, onComplete 메서드는 publisher1 에서 호출됩니다.
여기서 onSubscribe는 반드시 수행되야 하며 publisher1 이 전달해준 Subscription 객체를 파라미터로 받은 sub의 onSubscribe 메서드의 인자로 전달하여 publisher1 과 subscriber 사이에서 중개 역할을 합니다.
그리고 subscriber는 인자로 받은 subscription(전달자) 객체를 통해 publisher1 에게 request 요청을 전달하고 publisher1 은 onNext 메서드를 통해 데이터를 publisher2 내부에서 재정의된 onNext 메서드로 전달하고 인자로 받은 Function 수식을 적용하기위해 apply 메서드를 통해 다시한번 subscriber의 onNext 메서드로 계산된 값을 전달하게 됩니다.
데이터 전송이 완료될때까지 onNext 메서드를 통해 모든 데이터를 전달후 subscriber 에서 onComplete 메서드를 호출하면 publisher1 에서 publisher2 에 있는 onComplete 를 이어서 호출하게 되고 데이터 전송은 종료됩니다.
operatorPub 메서드 간소화
operatorPub 와 비슷한 Operator를 새로 만들기 위해 내부에서 중개역할을 하는 Subscriber 메서드를 슈퍼클래스로 만들어서 수정이 필요한 메서드에 대해서만 오버라이딩 하는 방식으로 수정해보겠습니다. 우선 pub.subscribe 의 인자로 들어가있는 new Subscriber 를 슈퍼클래스인 DelegateSub 이름으로 생성합니다.
[DelegateSub.java]
public class DeletageSub implements Subscriber<Integer> {
private final Subscriber sub;
public DeletageSub(Subscriber subscriber) {
this.sub = subscriber;
}
@Override
public void onSubscribe(Subscription subscription) {
sub.onSubscribe(subscription);
}
@Override
public void onNext(Integer integer) {
sub.onNext(integer);
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
}
pub.subscribe의 인자에는 DelegateSub 메서드를 호출하도록 하고 Function을 적용하기 위해 onNext 메서드를 재정의 하도록 합니다.
private static Publisher<Integer> operatorPub(Publisher<Integer> pub, Function<Integer, Integer> f) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DeletageSub(sub) {
@Override
public void onNext(Integer integer) {
super.onNext(f.apply(integer));
}
});
}
};
}
Operator 는 Publisher, Subscriber 사이에서 사용자가 원하는만큼 늘릴수 있는데 publisher3 라는 Operator를 새로 생성하여 전달받은 값에 대하여 publisher2 와 동일하게 10을 곱하는 Function 수식을 전달 해보겠습니다.
public static void main(String[] args) {
Iterable<Integer> iterable = Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList());
Publisher<Integer> publisher1 = getPublisher(iterable);
Publisher<Integer> publisher2 = operatorPub(publisher1, s -> s * 10);
Publisher<Integer> publisher3 = operatorPub(publisher2, s -> s * 10);
Subscriber<Integer> subscriber = getSubscriber();
publisher3.subscribe(subscriber);
}
11:34:51.945 [main] INFO - onNext : 100
11:34:51.945 [main] INFO - onNext : 200
11:34:51.945 [main] INFO - onNext : 300
11:34:51.945 [main] INFO - onNext : 400
11:34:51.945 [main] INFO - onNext : 500
11:34:51.945 [main] INFO - onNext : 600
11:34:51.945 [main] INFO - onNext : 700
11:34:51.945 [main] INFO - onNext : 800
11:34:51.945 [main] INFO - onNext : 900
11:34:51.945 [main] INFO - onNext : 1000
11:34:51.945 [main] INFO - onComplete
Operator 응용하기
위의 예제에서 만든 Operator 는 전송되는 값들에 대해 곱셈 연산만 해서 전달했는데 이번엔 Operator에서 전송받는 값들의 합을 구한후 모든 값을 받았을때 Subscriber 에게 전달하는 예제를 만들어보겠습니다.
먼저 생각해봐야 하는것은 위에 예제에서 Publisher에서 오는 값을 슈퍼클래스의 onNext 메서드를 통해 바로 전송했지만 이번엔 수신받는 모든값을 더하고 전송해야 되기때문에 onComplete 메서드를 재정의하고 그 내부에서 onNext 메서드로 최종 결과값을 전달 해야 합니다.
public static void main(String[] args) {
Iterable<Integer> iterable = Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList());
Publisher<Integer> publisher1 = getPublisher(iterable);
Publisher<Integer> publisher2 = opSumPub(publisher1);
Subscriber<Integer> subscriber = getSubscriber();
publisher2.subscribe(subscriber);
}
...
...
...
private static Publisher<Integer> opSumPub(Publisher<Integer> pub) {
return new Publisher<Integer>() {
@Override
public void subscribe(Subscriber<? super Integer> sub) {
pub.subscribe(new DeletageSub(sub) {
int sum = 0;
@Override
public void onNext(Integer integer) {
sum += integer;
}
@Override
public void onComplete() {
sub.onNext(sum);
sub.onComplete();
}
});
}
};
}
13:16:23.891 [main] INFO com.example.springreactivestudy.demo2.PubSub - onNext : 55
13:16:23.891 [main] INFO com.example.springreactivestudy.demo2.PubSub - onComplete
원본 Publisher 로부터 1부터 10까지의 데이터를 전송받고 완료 됬음을 알리는 onComplete 메서드가 호출되었을때 onNext 메서드를 호출하여 모두 더한 sum 값을 전송하고 onComplete 메서드도 같이 호출합니다.
제네릭한 Operator
이번엔 onSumPub 를 수정하여 sum 연산만 하는것이 아닌 Function을 파라미터로 전달하여 사용자가 지정한수식을 계산하고 입출력 타입도 자유롭게 하기위해 제네릭 타입을 사용해보겠습니다.
DelegateSub 클래스는 Subscriber 인터페이스를 상속받은 클래스로 operatorPub 메서드에서 onNext 메서드만 재정의 하려는 목적으로 만들었습니다. (코드의 간결화)
PubSub.java
public class PubSub {
public static void main(String[] args) {
Iterable<Integer> iterable = Stream.iterate(1, a -> a + 1).limit(10).collect(Collectors.toList());
Publisher<Integer> publisher1 = getPublisher(iterable);
Publisher<String> publisher2 = operatorPub(publisher1, s -> "[" + s + "]");
Subscriber<String> subscriber = getSubscriber();
publisher2.subscribe(subscriber);
}
...
...
private static <T, R> Publisher<R> operatorPub(Publisher<T> pub, Function<T, R> f) {
return new Publisher<R>() {
@Override
public void subscribe(Subscriber<? super R> sub) {
pub.subscribe(new DelegateSub<T, R>(sub) {
@Override
public void onNext(T t) {
sub.onNext(f.apply(t));
}
});
}
};
}
private static <T> Subscriber<T> getSubscriber() {
return new Subscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
log.info("[getSubscriber] onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T integer) {
log.info("onNext : " + integer);
}
@Override
public void onError(Throwable t) {
log.info("onError : " + t.getMessage());
}
@Override
public void onComplete() {
log.info("onComplete");
}
};
}
}
DelegateSub.java
public class DelegateSub<T,R> implements Subscriber<T> {
private final Subscriber sub;
public DelegateSub(Subscriber<? super R> subscriber) {
this.sub = subscriber;
}
@Override
public void onSubscribe(Subscription subscription) {
sub.onSubscribe(subscription);
}
@Override
public void onNext(T t) {
sub.onNext(t);
}
@Override
public void onError(Throwable t) {
sub.onError(t);
}
@Override
public void onComplete() {
sub.onComplete();
}
}
reactive-stream 라이브러리
위의 예제들은 Publisher, Subscriber 들의 동작을 확인하기 위해 만든 예제였고 실제로는 저런 함수들을 다 직접 구현해서 사용하지는 않습니다. 그래도 한번 직접 작성해보는것이 리액티브 스트림을 사용하기 전에 함수의 동작방식을 이해할수 있기 때문에 반드시 거쳐야 하는 과정이라고 생각합니다.
Publischer, Subscriber 의 동작이 어느정도 이해가 되었다면 이제 실제 reactive-stream 라이브러리에서 제공 하는 메서드들을 맛보기로 체험해보겠습니다.
ReactorExample.java
public static void main(String[] args) {
Flux.create(e -> {
e.next(1);
e.next(2);
e.next(3);
})
.log()
.subscribe(s -> System.out.println(s));
}
15:29:43.380 [main] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
15:29:43.382 [main] INFO reactor.Flux.Create.1 - request(unbounded)
15:29:43.384 [main] INFO reactor.Flux.Create.1 - onNext(1)
1
15:29:43.384 [main] INFO reactor.Flux.Create.1 - onNext(2)
2
15:29:43.384 [main] INFO reactor.Flux.Create.1 - onNext(3)
3
Flux는 Publisher 의 인터페이스를 상속받은 클래스입니다. 예제에서는 간단하게 1,2,3 세개의 숫자를 순서대로 요청하여 출력하고 있습니다. 아래에서부터 subscribe 메서드를 통해 onSubscribe, request 메서드를 호출합니다.
Operator 도 굉장히 간단하게 구현할수 있습니다.
public static void main(String[] args) {
Flux.<Integer>create(e -> {
e.next(1);
e.next(2);
e.next(3);
})
.log() // create
.map(s->s*10)
.log() // map
.subscribe(s -> System.out.println(s));
}
15:38:44.056 [main] INFO reactor.Flux.Create.1 - onSubscribe(FluxCreate.BufferAsyncSink)
15:38:44.058 [main] INFO reactor.Flux.Map.2 - onSubscribe(FluxMap.MapSubscriber)
15:38:44.058 [main] INFO reactor.Flux.Map.2 - request(unbounded)
15:38:44.058 [main] INFO reactor.Flux.Create.1 - request(unbounded)
15:38:44.060 [main] INFO reactor.Flux.Create.1 - onNext(1)
15:38:44.060 [main] INFO reactor.Flux.Map.2 - onNext(10)
10
15:38:44.060 [main] INFO reactor.Flux.Create.1 - onNext(2)
15:38:44.060 [main] INFO reactor.Flux.Map.2 - onNext(20)
20
15:38:44.060 [main] INFO reactor.Flux.Create.1 - onNext(3)
15:38:44.060 [main] INFO reactor.Flux.Map.2 - onNext(30)
30
로그를 확인해보면 첫번째 로그 메서드는 create의 동작에 대해 로그를 출력하고 있고 두번째 로그 메서드는 오퍼레이터인 map의 동작에 대해 로그를 출력하는것을 확인할수 있습니다.
공식사이트에 있는 api 문서에서 그림을 사용하여 설명해주기 때문에 이해하기가 쉽습니다.
Flux를 포함한 reactor에 대한 라이브러리함수는 https://projectreactor.io/docs/core/release/api/ 에서 자세하게 확인할수 있습니다.
출저
토비의 봄 TV 6회 스프링 리액티브 프로그래밍 (2) - Reactive Streams - Operators -Youtube
'WEB > Spring' 카테고리의 다른 글
[Spring] @RequestMapping HTTP Header 데이터 유형 / produces, consume 의미와 역할 (0) | 2022.07.06 |
---|---|
[Spring] 스프링 빈 웹 스코프, request 타입과 프록시 모드 - 2/2 (2) | 2022.06.19 |
[Spring] 스프링 빈 스코프(Scope) 싱글톤, 프로토 타입 - 1/2 (0) | 2022.06.19 |
[Spring] 스프링 빈 생명주기 초기화, 소멸 콜백 관리하기 @PostConstruct, @PreDestroy (0) | 2022.05.26 |
[Spring] 리액티브 스트림 기본 개념 (Publisher, Subscriber, Subscription) (0) | 2022.02.17 |
댓글