Contents

Kafka Consumer Group Rebalancing

카프카를 실제 운영환경에서 사용하다 보면 마주치게 되는 몇 가지 이슈가 있다.

오늘은 그 중 컨슈머 그룹의 파티션 리밸런싱에 대해 좀 정리해봤다.

https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

컨슈머 그룹 리밸런싱이란

카프카 클라이언트는 카프카 클러스터의 레코드들을 소비한다.

그리고 이 클라이언트들은 카프카 브로커들의 실패를 다루고, 클러스터 내의 토픽 파티션을 재조정한다. 이러한 컨슈머들은 컨슈머 그룹에 속해 메시지들을 소비하는데, 컨슈머는 쓰레드 세이프하지 않다.

컨슈머의 쓰레드 안전성 보장 방법

사실 자세히 다루기엔 엄청 깊은 내용이지만, 아주 간단하게만 다루자면..

카프카는 파티션 내의 각 레코드에 numerical한 offset을 두어 관리한다.

  • 이 오프셋은 파티션 내의 레코드에 대한 unique 한 식별자로써 활용된다.
  • 또한 오프셋은 파티션 내 컨슈머를 파악하기 위한 용도로도 활용된다.
  • 예를 들어 ‘위치 5’에 위치한 컨슈머는 이미 오프셋 0~4까지의 데이터를 소비하고, 5번째 레코드를 가져갈 것이라는 것을 알 수 있다.

컨슈머가 다루는 position에는 크게 두 가지 종류가 있다.

  1. 컨슈머의 position은 다음 번에 주어지게 될 레코드의 오프셋을 제공한다.
    1. position은 컨슈머가 해당 파티션에서 봤던 가장 높은 오프셋보다도 더 높다.
    2. 컨슈머가 메시지를 poll을 통해 받을 때마다 증가한다.
  2. 컨슈머의 committed position은 가장 마지막에 저장된 오프셋이다.
    1. 프로세스가 실패하고 재시작할 때, 컨슈머는 이 오프셋을 통해 복구 절차를 시작한다.
    2. 컨슈머는 오프셋을 주기적으로 커밋하거나, 매뉴얼하게 커밋 api를 호출해서 committed position을 조정할 수 있다. (commitSynccommitAsync가 이에 해당한다)

컨슈머 그룹과 토픽 구독

카프카는 컨슈머 그룹이라는 개념을 사용해서 레코드들을 소비하고 처리하는 작업을 분할한다. 이 분할된 작업들은 동일한 머신에서 돌아가거나 다수의 머신에서 돌아가게 해서 확장성을 증대시키고 처리 과정에서의 fault tolerance를 증대시킬 수 있다.

동일한 group.id를 공유하는 모든 컨슈머 인스턴스는 동일한 컨슈머 그룹에 속한다.

  • 그룹에 속한 각각의 컨슈머는 동적으로 (subscribe api를 사용해서 구독할) 토픽의 목록을 설정한다.
  • 그리고 카프카는 각각의 컨슈머 그룹에 속한 하나의 프로세스에 구독한 토픽의 각 메시지를 전달한다.
  • 이 과정은 ‘파티션을 밸런싱’하는 과정을 통해 이뤄진다.
    • 이 과정은 각 파티션이 그룹에 속한 컨슈머 그룹의 단 하나의 컨슈머에 할당될 때까지 이뤄진다.
    • 즉 4개의 파티션을 가진 토픽이 있고, 컨슈머 그룹이 두 개의 프로세스를 가진다면 각각의 프로세스는 두 개의 파티션을 소비한다.

컨슈머 그룹의 멤버는 동적으로 정해진다. 컨슈머가 갑자기 죽어서 실패하는 경우를 가정해보자.

  • 죽은 컨슈머에 할당된 파티션은 동일한 그룹의 다른 컨슈머에게 재할당된다.
  • 비슷하게, 만약 기존 그룹에 새 컨슈머가 들어오게 된다면 기존 컨슈머의 파티션은 새로 들어오게 된 컨슈머에게 할당된다.
  • 이 과정을 컨슈머 그룹의 리밸런싱이라고 한다.

컨슈머 그룹의 리밸런싱은 컨슈머가 특정 파티션을 구독하게 함으로써 매뉴얼하게 할당될 수도 있다. 이 경우에는 동적 파티션 할당과 컨슈머 그룹의 coordination 과정은 생략된다.

컨슈머 그룹 리밸런싱이 미치는 영향

https://joooootopia.tistory.com/30?category=907957

굉장히 잘 설명된 블로그 글이 있어 가져와봤다.

컨슈머 그룹 리밸런싱 과정이 이뤄지는 동안에는 그룹에 속해 있는 어떠한 컨슈머 인스턴스도 정상적으로 메시지 처리를 할 수 없다.

카프카에서는 이를 Stop the World 라고 부른다. (자바의 그것과 비슷한 개념이다)

만약 그룹에 할당된 태스크가 1000개라면 그 1000개의 프로세스가 전부 일시적으로 동작을 멈추게 된다. 자바의 Stop the world처럼, 카프카 운영 상황에서 카프카의 컨슈머 그룹 리밸런싱은 가급적 피하고 싶은 상황이다.

컨슈머의 실패 감지

다시 돌아와서, 카프카의 입장에서 리밸런싱을 수행하려면 컨슈머가 어떻게 실패하는지를 알아야 한다.

컨슈머는 토픽 집합에 구독한 이후에 poll을 호출함과 동시에 컨슈머 그룹에 자동으로 소속되게 된다.

이 Poll API는 컨슈머의 liveness를 보장하기 위해 설게되었다.

  • poll을 할 수 있는 한 컨슈머는 그룹 내에 계속 존재하게 되고, 할당된 파티션으로부터 메시지를 정상적으로 가져올 수 있다.
  • 이 과정에서 해당 컨슈머는 서버에게 주기적으로 heartbeat 신호를 보낸다.
    • 만약 컨슈머가 실제로 죽거나 하는 등의 이유로 heartbeat 신호를 session.timeout.ms 동안 보내지 못했다면..
    • 컨슈머는 (실제로 죽지 않았어도) 죽은 것으로 간주되어 컨슈머 그룹 리밸런싱이 일어난다.

한편 컨슈머 입장에서는 heartbeat 신호를 계속 보내지만 아무 것도 이뤄지지 않는 livelock 상황에 돌입할 수도 있다.

  • 컨슈머가 livelock 상황에 빠지는 것을 막기 위해 카프카는 max.poll.interval.ms 설정을 지원한다.
  • 기본적으로 구성된 최대 interval 만큼 자주 poll을 호출하지 않으면 클라이언트는 다른 컨슈머가 파티션을 할당 받을 수 있게 미리 그룹을 떠난다.
    • 이 일이 발생하면 commitSync() 과정에서 오프셋 커밋 실패 Exception이 터진다.
    • 이는 그룹의 활성 상태인 멤버만 오프셋을 커밋할 수 있도록 보장한다.
      • 비활성 상태인 멤버가 오프셋을 커밋한다면 메시지가 실제로 카프카 내에 존재함에도 불구하고 소비하지 못하는 상황이 되어 결과적으로 유실되는 것과 같기 때문이다.
    • 따라서 컨슈머는 그룹에 계속 남아 있기 위해서 poll을 계속 호출해야 한다.

poll loop을 조절하기 위한 설정 값들

1. max.poll.interval.ms

  • 예상되는 poll 간의 간격을 증가시킨다면: 컨슈머는 poll로 가져온 데이터를 처리할 시간을 더 가지게 된다.
    • 하지만 이 값을 증가시킨다는 것은 그룹 리밸런싱을 지연시킬 수 있는데, 컨슈머가 poll 호출할 때에만 그룹 리밸런싱에 참여하기 때문이다.
  • 이 설정 값을 사용해서 리밸런싱이 끝나는 시간을 조절할 수 있지만 컨슈머가 실제로 poll 호출을 충분히 자주 호출할 수 없는 경우 전체 처리 속도를 지연시킬 수 있다. (fast producer - slow consumer 상황에서는 위험하다)

2. max.poll.records

  • 이 설정 값을 통해 poll의 결과로 리턴되는 레코드 수를 제한할 수 있다.
  • 이 설정은 poll interval 동안에 다뤄지는 최대 레코드를 예측하기 쉽게 만들어준다.
  • 이 값을 조정하는 것은 실제로 poll interval을 줄이는 데 도움이 되고 결과적으로 그룹 리밸런싱의 영향을 줄일 수 있다.

하지만..

하지만 메시지 처리 속도가 예측 불가능한 상황에서는 두 설정 값 모두 충분하지 않다.

이런 경우 가장 추천되는 방법은 메시지 프로세싱을 다른 쓰레드로 이관하는 것이다. 프로세서가 계속 작업하는 도중에 컨슈머가 poll할 수 있도록 유지하는 전략이다.

이 경우에도 커밋된 오프셋이 실제 메시지를 소비한 위치보다 앞서지 않도록 주의를 기울여야 한다.

오토 커밋을 비활성화하고 프로세싱을 맡긴 쓰레드가 메시지 처리를 마친 경우에만 committed offset을 수동으로 커밋하는 방법이다.. (물론 처리할 메시지의 성격에 따라 약간 다를 수 있다)

또한 이 경우에는 레코드의 처리가 끝날 때까지 poll을 통해 새 레코드가 수신되지 않도록 파티션을 잠깐 일시정지해야 한다는 점도 유의해야 한다.