Contents

스트림의 새로운 표준 리액티브 스트림

왜 표준이 필요한가

초기에는 CompletableStage를 쓰는 라이브러리와, RxJava 간 호환이 되지 않았고, 어댑터 유틸리티 클래스를 구현해서 사용해야만 했다.

이전 장에서 RxJava, Vert.x, Ratpack, Retrofit과 같은 수 많은 프레임워크들은 일관된 API가 존재하지 않았다.

pull 방식 / push 방식

리액티브의 초창기에는 모든 라이브러리의 데이터가 source에서 subscriber에게 푸시되는 방식

  • pull 방식이 효과적이지 못했기 때문에~

1. Pull 방식

  • element를 하나 씩 요청하면 어플리케이션 - DB 간 요청 처리에 시간이 많이 소요됨
  • 전체 처리 시간의 대부분이 idle한 상태
  • 리소스 사용하지 않더라도 네트워크 비용 때문에 전체 처리 시간이 늘어남
  • DB 입장에서는 미래에 들어올 요청의 개수를 알지 못하기 때문에 미리 데이터를 뽑지 못함

2. Pull + Batch 방식

  • 이전과 동일한 Pull 방식에, 배치 처리를 같이 요청
  • 하지만 DB가 데이터 쿼리하는 동안 어플리케이션은 idle 상태

3. Push 방식

  • 어플리케이션은 데이터를 DB에게 한 번 요청
  • DB는 데이터가 가용상태가 되면 비동기적으로 push
  • 클라이언트의 모든 요구 사항이 충족되면 DB Connection close

Push 모델을 선택하는 가장 큰 이유는 요청 횟수의 최소화 -> 전체 처리 시간 최적화가 가능하기 때문

하지만 문제가 있다..

Slow Producer + Fast Consumer

Consumer는 데이터가 엄청 많이 급하게 필요한 상황인데 Producer가 Consumer에 대한 상태를 잘 모르기 때문에 발생할 수 있다.

이런 consumer에 대한 메트릭을 제공하지 못하기 때문에 push 모델만 가지고는 해결 어렵다.

Fast Producer + Slow Consumer

Consumer가 처리할 수 있는 것보다 훨씬 많은 데이터를 Producer가 보내는 경우.

처리되지 않은 element를 큐에 넣는 방식으로 해결..?

  1. 무한 크기의 큐
    1. 메시지가 무조건 전달됨이 보장됨
    2. 하지만 실제 리소스가 무제한일 수는 없기 때문에 존재하기 어려움
  2. 유한 크기의 drop 큐
    1. 가용한 리소스에 맞춰 큐의 크기를 구성 가능
    2. 하지만 이 경우 데이터의 무결성을 보장할 수 없음. 누락된 데이터가 어떤 것인지 영영 알지 못한다
  3. 유한 크기의 blocking 큐
    1. 큐가 꽉 찼을 떄 메시지를 삭제하는 대신 메시지 유입 차단
    2. 하지만 이는 비동기 동작이 아님 - 가장 느린 consumer의 처리 속도에 병목

때문에 순수한 Push 모델은 다양한 부작용을 발생시킬 수 있다.

리액티브 스트림의 기본 스펙

Publisher, Subscriber, Subscription, Processor 가 존재

  • onSubscribe() 메서드를 통해서 SubscriberPublisher에게 구독 여부를 알림
  • 이 때 구독에 대한 정보는 Subscription 파라미터로 전달됨
  • Subscriberrequest() 메서드를 통해 Publisher가 보내줘야 하는 데이터 크기를 전달
    • 이후 Publisher는 요청 받은 개수 만큼의 element를 Subscriber에게 전달

이 모델은 순수 push 모델과 비교해서 어떨까?

  • 리액티브 스트림 스펙에 정의된 방식 때문에 DB의 첫 번째 element가 좀 늦게 도착할 수 있다
  • 새 데이터를 요청하는 경우 진행 중인 element 처리를 중단하지 않아도 된다.
  • 전체 처리 시간은 거의 영향 받지 않는다.

리액티브 스트림을 사용하면 동적 Push-Pull 모델을 사용할 수 있다.

  • 순수 push 모델을 사용하려면 java.lang.Long.MAX_VALUE 개씩 요청하는 방식으로 사용 가능
  • 순수 pull 모델을 사용하려면 Subscriber.onNext가 호출될 때마다 한 개씩 요청하는 방식으로 사용 가능

리액티브 스트림이 동작하는 방식을 정리하자면

  1. onSubscribe()를 호출
  2. SubscriberSubscription을 로컬에 저장, request() 메서드를 통해 데이터 받을 준비 됐다는 것을 Publisher에게 알림
  3. 구독 정보가 변경되는 경우 onComplete 메서드 명시적으로 호출해서 Subscriber에게 알려줌
  4. Subscriber가 요청한 데이터를 보내는 과정에서 오류가 발생했을 때 onError() 호출

그럼 Processor는 어떤 역할을 할까

Publisher == 시작 지점. / Subscriber == 끝 지점. Processor는 Publisher와 Subscriber 사이에 몇 가지 처리를 추가한 모델

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {

}
  • Subscriber의 존재 여부와는 상관 없이 여러 Subscriber에게 메시지를 보내야 한다면 (멀티 캐스팅)?
  • 그 중간 과정에서 캐싱을 해야 한다면?

processor 구현은 어렵다. 몇 가지 생각해볼 것들이 많다.

  • 비즈니스 흐름을 복잡하게 만들고 모든 상황에서 재사용할 수 있는 operator를 만들기 어렵다.
  • Processor 구성 시 별도의 Subscriber 관리가 필요하다.
    • 이 경우 Processor 성능 저하, 전체 스트림 처리량 감소
  • 단순히 A -> B 변환하려면 여러 개의 Publisher 인스턴스를 조합하는 것이 적절할 수도

TCK (Reactive Streams Technology Compatibility Kit)

리액티브 스트림은 표준 명세이지, 실제로 구현하지는 않는다. 구현체들이 명세된 스펙을 만족하는지에 대한 테스트들 == TCK

Publisher 검증

  • 주어진 수의 element를 생산할 수 있는지
  • 데이터 소스를 사용할 수 없는 경우 publisher 인스턴스 생성에 실패하는지
  • 등등..

Subscriber 검증

Blackbox 검증

내부 구현에 대해 모르는 상태에서 별다른 수정 없이 subscriber 테스트 내부에 대한 접근이 없다 == 내부 subscription을 모른다

Whitebox 검증

구현을 아는 상태에서 subscriber 테스트 subscriber 내부에서 입력 스트림에 대한 제어와 시그널 캡쳐 가능

리액티브 스트림을 활용한 비동기 / 병렬 처리

publisher가 생성하고 subscriber가 소비하는 모든 시그널은 논블로킹 + 방해 요소가 없어야 한다.

리액티브 스트림을 활용하면 한 노드 / 한 코어를 효율적으로 활용할 수 있다! 그런데 여러 코어를 효율적으로 활용하기 위해서는 ? 병렬 처리가 필요하다.

  • 리액티브 스트림에서 병렬화 == onNext 메서드를 병렬로 호출하는 것을 의미함.
  • 하지만 리액티브 스트림 명세에는 on~ 메서드의 호출은
    • 쓰레드 안정성을 보장하는 방식으로 신호를 보내야 하고
    • 멀티 쓰레드 환경에서 수행되는 경우 별도의 동기화가 필요하다 (리액티브 스트림 스펙 자체에서 동기화 보장 X)
  • 때문에 ParallelPublisher 같은 것이 존재할 수는 없다.

파이프라인을 단계별로 구분하고, 각 단계에 메시지를 비동기적으로 전달

어떤 로직을 실행하는 부분은 하나의 쓰레드에 일임 시키고, 다른 로직은 다른 스레드를 사용할 수 있도록 위임하는 방식

  • 특정 작업이 CPU 사용이 많다면 -> 별도의 쓰레드로 분리해서 처리
  • 두 개의 독립적인 쓰레드 간 처리 분할 -> 단계 사이에 비동기적인 경계 설정 필요

그럼 어떤 부분을 경계로 처리해야 하는가?

  1. CPU 집약적인 로직이 메시지 생산 단계에서 발생하는 경우 - 메시지 생산을 경계로 설정
    1. 목적지 쪽이 더 적은 자원 소비한다면 목적지 쪽에서 모든 데이터를 처리
  2. CPU 집약적인 로직이 메시지 소비 단계에서 발생하는 경우 - 메시지 소비를 경계로 설정
    1. 소스 쪽이 더 적은 자원 소비한다면 소스 쪽에서 모든 데이터를 처리 후 보낸다
  3. CPU 집약적인 로직이 메시지 생산, 소비 단계 모두에서 발생하는 경우 - 생산 / 소비를 별도의 경계로 설정
    1. 변환 자체가 리소스 집약적이라면 생산, 소비 모두에서 변환 작업 처리

리액티브 기술 조합

  1. 표준이 존재하고
  2. 표준을 구현하는 서로 다른 드라이버 / 라이브러리 / 프레임워크를 사용하면서
  3. 별다른 노력 없이 프로세스에 대한 플로우를 작성하고
  4. 최종 사용자에게 처리 결과를 리턴할 수 있다.