스트림의 새로운 표준 리액티브 스트림
왜 표준이 필요한가
초기에는 CompletableStage
를 쓰는 라이브러리와, RxJava 간 호환이 되지 않았고, 어댑터 유틸리티 클래스를 구현해서 사용해야만 했다.
이전 장에서 RxJava, Vert.x, Ratpack, Retrofit과 같은 수 많은 프레임워크들은 일관된 API가 존재하지 않았다.
pull 방식 / push 방식
리액티브의 초창기에는 모든 라이브러리의 데이터가 source에서 subscriber에게 푸시되는 방식
- pull 방식이 효과적이지 못했기 때문에~
1. Pull 방식
- element를 하나 씩 요청하면 어플리케이션 - DB 간 요청 처리에 시간이 많이 소요됨
- 전체 처리 시간의 대부분이 idle한 상태
- 리소스 사용하지 않더라도 네트워크 비용 때문에 전체 처리 시간이 늘어남
- DB 입장에서는 미래에 들어올 요청의 개수를 알지 못하기 때문에 미리 데이터를 뽑지 못함
2. Pull + Batch 방식
- 이전과 동일한 Pull 방식에, 배치 처리를 같이 요청
- 하지만 DB가 데이터 쿼리하는 동안 어플리케이션은 idle 상태
3. Push 방식
- 어플리케이션은 데이터를 DB에게 한 번 요청
- DB는 데이터가 가용상태가 되면 비동기적으로 push
- 클라이언트의 모든 요구 사항이 충족되면 DB Connection close
Push 모델을 선택하는 가장 큰 이유는 요청 횟수의 최소화 -> 전체 처리 시간 최적화가 가능하기 때문
하지만 문제가 있다..
Slow Producer + Fast Consumer
Consumer는 데이터가 엄청 많이 급하게 필요한 상황인데 Producer가 Consumer에 대한 상태를 잘 모르기 때문에 발생할 수 있다.
이런 consumer에 대한 메트릭을 제공하지 못하기 때문에 push 모델만 가지고는 해결 어렵다.
Fast Producer + Slow Consumer
Consumer가 처리할 수 있는 것보다 훨씬 많은 데이터를 Producer가 보내는 경우.
처리되지 않은 element를 큐에 넣는 방식으로 해결..?
- 무한 크기의 큐
- 메시지가 무조건 전달됨이 보장됨
- 하지만 실제 리소스가 무제한일 수는 없기 때문에 존재하기 어려움
- 유한 크기의 drop 큐
- 가용한 리소스에 맞춰 큐의 크기를 구성 가능
- 하지만 이 경우 데이터의 무결성을 보장할 수 없음. 누락된 데이터가 어떤 것인지 영영 알지 못한다
- 유한 크기의 blocking 큐
- 큐가 꽉 찼을 떄 메시지를 삭제하는 대신 메시지 유입 차단
- 하지만 이는 비동기 동작이 아님 - 가장 느린 consumer의 처리 속도에 병목
때문에 순수한 Push 모델은 다양한 부작용을 발생시킬 수 있다.
리액티브 스트림의 기본 스펙
Publisher
, Subscriber
, Subscription
, Processor
가 존재
onSubscribe()
메서드를 통해서Subscriber
는Publisher
에게 구독 여부를 알림- 이 때 구독에 대한 정보는
Subscription
파라미터로 전달됨 Subscriber
는request()
메서드를 통해Publisher
가 보내줘야 하는 데이터 크기를 전달- 이후
Publisher
는 요청 받은 개수 만큼의 element를Subscriber
에게 전달
- 이후
이 모델은 순수 push 모델과 비교해서 어떨까?
- 리액티브 스트림 스펙에 정의된 방식 때문에 DB의 첫 번째 element가 좀 늦게 도착할 수 있다
- 새 데이터를 요청하는 경우 진행 중인 element 처리를 중단하지 않아도 된다.
- 전체 처리 시간은 거의 영향 받지 않는다.
리액티브 스트림을 사용하면 동적 Push-Pull 모델을 사용할 수 있다.
- 순수 push 모델을 사용하려면
java.lang.Long.MAX_VALUE
개씩 요청하는 방식으로 사용 가능 - 순수 pull 모델을 사용하려면
Subscriber.onNext
가 호출될 때마다 한 개씩 요청하는 방식으로 사용 가능
리액티브 스트림이 동작하는 방식을 정리하자면
onSubscribe()
를 호출Subscriber
는Subscription
을 로컬에 저장,request()
메서드를 통해 데이터 받을 준비 됐다는 것을Publisher
에게 알림- 구독 정보가 변경되는 경우
onComplete
메서드 명시적으로 호출해서Subscriber
에게 알려줌 Subscriber
가 요청한 데이터를 보내는 과정에서 오류가 발생했을 때onError()
호출
그럼 Processor는 어떤 역할을 할까
Publisher == 시작 지점. / Subscriber == 끝 지점. Processor는 Publisher와 Subscriber 사이에 몇 가지 처리를 추가한 모델
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
- Subscriber의 존재 여부와는 상관 없이 여러 Subscriber에게 메시지를 보내야 한다면 (멀티 캐스팅)?
- 그 중간 과정에서 캐싱을 해야 한다면?
processor 구현은 어렵다. 몇 가지 생각해볼 것들이 많다.
- 비즈니스 흐름을 복잡하게 만들고 모든 상황에서 재사용할 수 있는 operator를 만들기 어렵다.
- Processor 구성 시 별도의 Subscriber 관리가 필요하다.
- 이 경우 Processor 성능 저하, 전체 스트림 처리량 감소
- 단순히 A -> B 변환하려면 여러 개의 Publisher 인스턴스를 조합하는 것이 적절할 수도
TCK (Reactive Streams Technology Compatibility Kit)
리액티브 스트림은 표준 명세이지, 실제로 구현하지는 않는다. 구현체들이 명세된 스펙을 만족하는지에 대한 테스트들 == TCK
Publisher 검증
- 주어진 수의 element를 생산할 수 있는지
- 데이터 소스를 사용할 수 없는 경우 publisher 인스턴스 생성에 실패하는지
- 등등..
Subscriber 검증
Blackbox 검증
내부 구현에 대해 모르는 상태에서 별다른 수정 없이 subscriber 테스트 내부에 대한 접근이 없다 == 내부 subscription을 모른다
Whitebox 검증
구현을 아는 상태에서 subscriber 테스트 subscriber 내부에서 입력 스트림에 대한 제어와 시그널 캡쳐 가능
리액티브 스트림을 활용한 비동기 / 병렬 처리
publisher가 생성하고 subscriber가 소비하는 모든 시그널은 논블로킹 + 방해 요소가 없어야 한다.
리액티브 스트림을 활용하면 한 노드 / 한 코어를 효율적으로 활용할 수 있다! 그런데 여러 코어를 효율적으로 활용하기 위해서는 ? 병렬 처리가 필요하다.
- 리액티브 스트림에서 병렬화 ==
onNext
메서드를 병렬로 호출하는 것을 의미함. - 하지만 리액티브 스트림 명세에는
on~
메서드의 호출은- 쓰레드 안정성을 보장하는 방식으로 신호를 보내야 하고
- 멀티 쓰레드 환경에서 수행되는 경우 별도의 동기화가 필요하다 (리액티브 스트림 스펙 자체에서 동기화 보장 X)
- 때문에
ParallelPublisher
같은 것이 존재할 수는 없다.
파이프라인을 단계별로 구분하고, 각 단계에 메시지를 비동기적으로 전달
어떤 로직을 실행하는 부분은 하나의 쓰레드에 일임 시키고, 다른 로직은 다른 스레드를 사용할 수 있도록 위임하는 방식
- 특정 작업이 CPU 사용이 많다면 -> 별도의 쓰레드로 분리해서 처리
- 두 개의 독립적인 쓰레드 간 처리 분할 -> 단계 사이에 비동기적인 경계 설정 필요
그럼 어떤 부분을 경계로 처리해야 하는가?
- CPU 집약적인 로직이 메시지 생산 단계에서 발생하는 경우 - 메시지 생산을 경계로 설정
- 목적지 쪽이 더 적은 자원 소비한다면 목적지 쪽에서 모든 데이터를 처리
- CPU 집약적인 로직이 메시지 소비 단계에서 발생하는 경우 - 메시지 소비를 경계로 설정
- 소스 쪽이 더 적은 자원 소비한다면 소스 쪽에서 모든 데이터를 처리 후 보낸다
- CPU 집약적인 로직이 메시지 생산, 소비 단계 모두에서 발생하는 경우 - 생산 / 소비를 별도의 경계로 설정
- 변환 자체가 리소스 집약적이라면 생산, 소비 모두에서 변환 작업 처리
리액티브 기술 조합
- 표준이 존재하고
- 표준을 구현하는 서로 다른 드라이버 / 라이브러리 / 프레임워크를 사용하면서
- 별다른 노력 없이 프로세스에 대한 플로우를 작성하고
- 최종 사용자에게 처리 결과를 리턴할 수 있다.