본문 바로가기
WEB/Spring

[Spring] 리액티브 스트림 기본 개념 (Publisher, Subscriber, Subscription)

by 정권이 내 2022. 2. 17.

 

스프링 리액티브 스트림(reactive-streams) 기초

 

Publisher & Subscriber

Publisher 는 연속된 데이터들을 정보를 요청한 Subscriber 에게 제공합니다. 이때 Publisher가 Subscriber

에게 전달할때는 다음과 같은 프로토콜을 따릅니다.

 

onSubscribe / onNext* / (onError | onComplete)

onSubscribe는 항상 호출되고 onNext는 0번부터 n번까지 호출이 될수있습니다. onError, onComplete는

선택적으로 호출됩니다.

 

Publisher & Subscriber 생성

public class PubSub {
    public static void main(String[] args) {
        
        Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5);

        Publisher publisher = new Publisher<>() {

            @Override
            public void subscribe(Subscriber subscriber) {
                subscriber.onSubscribe(new Subscription() {
                    @Override
                    public void request(long l) {

                    }

                    @Override
                    public void cancel() {

                    }
                });
            }
        };

        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription subscription) {

            }

            @Override
            public void onNext(Integer integer) {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {

            }
        };

        publisher.subscribe(subscriber);
    }
}

Publisher를 만들면 구현 메서드로 subscribe 가 생성됩니다. Publisher 객체 publisher에서 suscribe

메서드를 호출하고 인자로 Subscriber 의 객체를 사용하여 호출하게 됩니다.

 

subscriber.onSubscribe(new Subscription()

Publisher 에서 Subscriber 에게 데이터 전송시 onSubscribe 메서드를 호출하는데 인자로 Subscription()

오브젝트를 받습니다. Subscription 는 인터페이스로 request, cancel 메서드가 있습니다.

 

img

 

Subscriber 가 Publisher 에게 데이터를 요청할때 subscribe 메서드를 먼저 수행합니다. Publisher 는 직접

Subscriber 에게 데이터를 전송하는것이 아닌 Subscription 이라는 오브젝트를 만들고 onSubscribe 메서드

를 사용해 데이터 전송이 준비되었음을 Subscriber 에게 알립니다.

 

그 후에 Subscriber 는 request 메서드로 중간 매개체인 Subscription 에게 데이터를 요청하고 Publisher도

Subscription 오브젝트를 통해 Subscriber 에게 데이터를 전송하게 됩니다.

 

request(n)

Subscriber가 데이터를 요청할때 갯수를 지정할수 있다.

 

@Override
public void onSubscribe(Subscription subscription) {
    System.out.println("onSubscribe");
}

@Override
public void onNext(Integer integer) {
    System.out.println("onNext " + integer);
}

@Override
public void onError(Throwable throwable) {
    System.out.println("onError");
}

@Override
public void onComplete() {
    System.out.println("onComplete");
}
  • onSubscribe : Publisher 에서 Subcriber의 구현 메서드 onSubscribe를 호출합니다.
  • onNext : Publisher 에서 받은 데이터를 처리하는 메서드입니다.
  • onError : 에러가 발생했을때 에러에 대한 정보를 오브젝트로 받아서 처리하는 메서드입니다.
  • onComplete : Publisher 에서 데이터 처리가 완료되었을때 호출하는 메서드입니다.

데이터 요청 예제

Publisher 에서 subscribe 메서드를 통해 Subscriber와 연결이 되었다면 Subscriber 에서 request 메서드를

호출하여 데이터를 요청하는 예제를 만들어 보겠습니다.

 

Subscriber 코드

Subscriber<Integer> subscriber = new Subscriber<Integer>() {

    Subscription subscription;
    @Override
    public void onSubscribe(Subscription subscription) {
        System.out.println("onSubscribe");
        this.subscription = subscription;
        this.subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("onNext " + integer);
    }

    @Override
    public void onError(Throwable throwable) {
        System.out.println("onError");
    }

    @Override
    public void onComplete() {
        System.out.println("onComplete");
    }
};

중간 매개체인 Subscription 오브젝트를 생성하고 onSubscribe 메서드의 인자로 받은 subscription 객체를

받습니다. Publisher 에 요청하는 request 메서드의 인자로 Long 타입의 최대값을 전달하는데 한번에 모든

데이터를 받겠다고 요청하는 의미입니다.

 

Publisher 코드

Iterable<Integer> iter = Arrays.asList(1, 2, 3, 4, 5);

Publisher publisher = new Publisher<>() {

    @Override
    public void subscribe(Subscriber subscriber) {
        Iterator iterator = iter.iterator();
        subscriber.onSubscribe(new Subscription() {
            @Override
            public void request(long n) {
                while (n-- > 0) {
                    if (iterator.hasNext()) {
                        subscriber.onNext(iterator.next());
                    } else {
                        subscriber.onComplete();
                        break;
                    }
                }
            }

            @Override
            public void cancel() {

            }
        });
    }
};

Integer 데이터를 가지고있는 Iterator 객체를 만든후 request 메서드 내부에 Iterator 오브젝트의 요소들을

루프를 돌면서 데이터를 Subscriber 객체의 onNext() 메서드의 인자로 전달합니다.

 

코드 실행

onSubscribe
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onComplete

 

아쉬운점이 있다면 request 메서드의 인자를 Long의 최대값으로 넘겨버리다보니 제대로 활용하지 못한

느낌이 듭니다. 이번에는 request 에 1건씩 받도록 요청해보겠습니다.

onSubscribe, onNext 메서드만 수정하면 됩니다.

@Override
public void onSubscribe(Subscription subscription) {
    System.out.println("onSubscribe");
    this.subscription = subscription;
    this.subscription.request(1);
}

@Override
public void onNext(Integer integer) {
    System.out.println("onNext " + integer);
    this.subscription.request(1);
}

 

코드 실행

onSubscribe
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onComplete

 

Publisher 의 onSubscribe 함수 내에서 request 요청을 처리하다가 에러가 발생할 경우를 대비해 try catch

문을 작성하고 예외발생시 onError 메서드를 통해 Subscriber 에 넘겨줄수도 있습니다.

Publisher

@Override
public void subscribe(Subscriber subscriber) {
    Iterator iterator = iter.iterator();
    subscriber.onSubscribe(new Subscription() {
        @Override
        public void request(long n) {
            try {
                while (n-- > 0) {
                    if (iterator.hasNext()) {
                        subscriber.onNext(iterator.next());
                    } else {
                        subscriber.onComplete();
                        break;
                    }
                }

            } catch (RuntimeException exception) {
                subscriber.onError(exception);
            }
        }

        @Override
        public void cancel() {

        }
    });
}

 

Subscriber

@Override
public void onError(Throwable throwable) {
    System.out.println("onError : " + throwable.getMessage());
}

 

 

출저

토비의 봄 TV 5회 스프링 리액티브 프로그래밍 (1) - Reactive Streams -Youtube

 

반응형

댓글