2526 words
13 minutes
[kafka]05. 컨슈머

1. 카프카 컨슈머#

카프카 컨슈머는 카프카 시스템에서 읽기 역할을 담당하는 주체이다. 여기서는 카프카에서 메시지 데이터를 읽는 방식과 이와 관련한 컨슈머 중요 개념들을 다룬다.

컨슈머 그룹#

  • 컨슈머 그룹은 기본적으로 메시지 처리(읽기)를 Scaling할 수 있게 한다.
  • 메시지 처리를 Scaling한다는 것은 카프카 토픽에서 컨슈머가 데이터를 읽어오는 양을 확장하는 것이다.
  • 구체적으로는 여러 컨슈머가 같은 토픽으로부터 파티션 단위로 데이터를 분할해서 읽어오는 것이다.

Concept

  • Consumer Group : 토픽에서 읽어오는 데이터를 확장할 수 있도록 구성된 컨슈머의 집합.

컨슈머의 특성#


Key Takeaway

  • 오직 하나의 컨슈머에 하나의 파티션만 할당될 수 있다. 따라서 토픽의 파티션 수가 병렬 처리의 상한을 결정한다.
  • 토픽에 설정된 파티션 수 이상으로 컨슈머를 늘리는 것은 무의미하다.
  • 카프카는 성능 저하 없이 컨슈머와 컨슈머 그룹이 동일한 토픽을 구독하게끔 확장할 수 있다.
  • 토픽에서 메시지를 읽거나 처리하는 규모를 확장하기 위해서는 이미 존재하는 컨슈머 그룹에 컨슈머를 추가해 해당 그룹의 컨슈머 각각이 메시지의 일부를 처리 하도록 하는 것이 좋다.

파티션 리밸런싱#

리밸런싱은 기본적으로 하나의 파티션에 하나의 컨슈머라는 원칙을 보장하기 위한 카프카의 방식이다.

언제 리밸런싱이 발생하는가?#

리밸런싱 전략은 무엇이 있는가?#

NOTE
Eager Rebalancing에서 거치는 단계

1. 파티션 반환

  • 모든 컨슈머들이 현재 자신이 소비하고 있던 파티션들을 반환.(모든 파티션의 소비 중단됨)

2. JoinGroup

  • Group Coordinator(Broker)에게 JoinGroup 요청을 보냄. 처음으로 그룹에 참여하는 컨슈머는 JoinGroup 요청 시 그룹 리더가 될 수 있다.
  • Group Coordinator는 모든 컨슈머로부터 JoinGroup 요청을 받을 때까지 대기.
  • 모든 요청을 받으면, 그룹 리더를 선정하고 JoinGroupResponse를 통해 컨슈머 정보를 리더에게 전달.

3. Synchronization Barrier

  • 중요 동기화 지점
    • 모든 컨슈머가 이전 파티션 할당을 완전히 해제하도록 보장.
    • 파티션 리밸런싱 전 데이터 손실이나 중복 처리를 방지.

4. SyncGroup

그룹 리더(Consumer)가 새로운 파티션 할당을 계산하고, 이를 브로커에게 전송.

  • 모든 컨슈머가 그룹 리더에게 SyncGroup 요청.
  • 그룹 리더(Consumer)는 모든 컨슈머로부터 SyncGroup 요청을 받을 때까지 대기
  • 모든 요청이 도착하면, 리더가 파티션 리밸런싱 계산.
  • 계산된 할당 결과를 브로커에게 전송하고, 브로커는 이를 SyncGroupResponse를 통해 모든 멤버에게 브로드 캐스트

![[de-kafka-05-eager_rebalancing.png]]

Rebalancing 전략#


Concept

  • Partition Rebalancing : 컨슈머 그룹에 새로운 컨슈머가 추가되거나 기존 컨슈머가 사라지면, 토픽에 파티션을 새로 할당하는 과정.
  • Eager Rebalancing : 조급한 리밸런싱.모든 컨슈머가 읽기 작업을 멈추고 파티션을 완전히 다시 할당하는 방식. 모든 컨슈머가 자신에게 할당된 파티션을 포기하고 새로운 파티션을 할당받는 두 단계를 거친다.
  • Cooperative Rebalancing : 협력적 리밸런싱. 핵심 아이디어는 전체 파티션이 아닌 재할당 대상이 되는 파티션만 컨슈머가 읽기 작업을 정지하고 리밸런싱을 수행하는 것. 서비스 다운타임 최소화 및 시스템 안정성 향상 도모.

2. 카프카 컨슈머#

3. 토픽 구독#

4. Polling Loop#

  • 카프카에서 컨슈머는 기본적으로 종료되지 않는 루프를 돌면서 poll() 메서드를 호출한다.
  • 처리가 완료되면 결과물을 데이터 저장소에 쓰거나 이미 저장된 레코드를 읽어온다.

개념#

NOTE
polling 방식의 이해 polling 은 어떤 프로세스가 다른 프로세스의 상태를 주기적으로 확인하는 것이다. 기본적으로 클라이언트가 서버로 주기적으로 요청을 날려서 이벤트 내용을 전달받는다. 구현이 단순하지만 서버 부하를 줄 수 있고(http 오버헤드 발생 가능) 실시간성이 떨어질 수 있다. 일정 주기로 서버가 갱신되는 대시보드 시나리오에서 적용하기 보다 적합하다.

추가 개념

  • long polling
  • web socket
  • server-sent event(SSE)(streaming)

Concept

  • polling : 기본적으로 어떤 프로세스가 다른 프로세스의 상태를 주기적으로 확인하는 것.
  • http overhead : 정보의 신뢰성 판단을 위한, 보내지는 헤더같은 정보 때문에 처리시간이나 데이터의 양이 증가하는 것을 의미.

poll() 호출시 처리 과정#

  • poll() : 컨슈머가 브로커로부터 메시지를 가져오는 메서드.

poll은 단순히 데이터를 가져오는 것보다 많은 일울 수행 한다.

  • Group Coordinator를 찾아 Consumer Group에 추가.
  • 파티션 할당받기
  • 리밸런스와 관련한 콜백처리
  • 컨슈머나 콜백에서 발생할 수 있는 예외 처리

Thread Safe 문제#

  • 하나의 Thread당 하나의 Consumer 가 원칙이다.
  • 이는 Thread Safe 문제를 해결하기 위함이다.
  • Thread Safe는 기본적으로 여러 Thread가 동시에 자원에 접근하더라도 프로그램 실행에 문제가 없는 것을 의미한다.

하나의 어플리케이션에서 동일한 그룹에 속하는 여러 컨슈머를 운용할 경우:

ExecuterService를 사용해 각 컨슈머를 별도의 스레드로 실행한다.

각자의 컨슈머를 가지는 다수의 스레드#

수집 스레드와 다수의 워커 스레드로 분리#

5. Consumer 설정#

대부분의 매개변수는 합리적인 기본 값을 가지고 있기 때문데 딱히 변경할 필요가 없다. 여기서는 일부 주요한 설정들만 다룬다.


Concept

  • fetch.min.bytes : 브로커로부터 가져올 최소 데이터 양. 값을 크게 가져갈 경우 컨슈머 수가 많을 때 브로커의 부하를 줄일 수 있다. 하지만 처리량이 적을때 레이턴시가 늘어날 수 있다.
  • fetch.max.bytes : 브로커로부터 가져올 최대 데이터 양.
  • max.poll.interval.ms : poll() 호출 간격의 최대 시간. 컨슈머가 poll을 하지 않고도 죽은것으로 판단하지 않기 위한 최대 시간.

Consumer Assignment 전략#


Concept

  • Range : 구독하는 각 토픽의 파티션을 연속된 그룹으로 나눠서 할당.
  • Round Robin : Round Robin Partitioning. 모든 구독된 토픽의 모든 파티션을 가져다 순차적으로 하나씩 컨슈머에 할당.
  • Sticky : 파티션을 균등하게 할당하면서 리밸런싱 발생시 가능한 많은 파티션들이 같은 컨슈머에 할당되게끔 하는 전략.
  • Cooperative Sticky : Cooperative Sticky Assignment. Sticky Assignment와 기본적으로 동일하지만, 컨슈머가 재할당되지 않은 파티션으로부터 데이터를 읽는 협력적 리밸런싱 지원.

6. Offset Commit#

어떻게 Consumer가 Offset을 추적 관리하는가?

신경써야할 지점#

중복 메시지#

메시지 손실#


Key Takeaway

  • 컨슈머가 파티션 안에서의 현재 위치를 업데이트 하는 작업을 오프셋 커밋이라고 한다.
  • 기본적으로 poll() 이 리턴한 마지막 오프셋 바로 다음 오프셋을 커밋한다.
  • AutoCommit의 문제는 기본적으로 중복을 완전히 방지하지 못한다는 것이다
  • CommitSyncpoll에 의해 반환된 마지막 오프셋을 커밋한다.
  • 비동기적 커밋은 브로커가 커밋 요청에 응답할 때 깨지 어플리케이션이 블락됨으로서 발생되는 처리량 제한 문제를 해결하기 위해 브로커가 커밋에 응답할 때까지 기다리는 대신 요청만 보내고 처리를 계속하는 것이다.

Auto Commit#

현재 offset commit#

비동기 commit#

동기, 비동기 commit함께 사용#

특정 offset commit#

7. Rebalance Listener#

Rebalance Listener는 컨슈머 그룹의 파티션이 할당되거나 해제될 때 호출되는 콜백 메서드를 정의한다.

  • onPartitionAssigned(Collection<TopicPartition>, partitions) : 파티션이 할당된 수 컨슈머가 메시지를 읽기 시작하기 전 호출
  • onPartitionRevoked(Collection<TopicPartition>, partitions) : 파티션이 할당 해제될때 호출
  • onPartitionLost(Collection<TopicPartition>, partitions) : 예외적인 리밸런싱 상황에서 호출

8. seekToBeginning, seekToEnd#

특정 오프셋에서부터 메시지를 읽기 시작하거나 특정 오프셋까지 메시지를 읽기 위해 사용된다.

  • seekToBeginning : 앞의 메시지는 건너 뛰고 파티션에 새로 들어온 메시지 부터 읽기
  • seekToEnd : 파티션에 있는 모든 메시지를 읽기

9. Polling Loop에서 벗어나기#

  • consumer.wakeup() : ShutdownHook은 JVM레벨에서 일어나기 때문에 사실상 polling loop를 벗어나는 방법은 wakeup() 메서드를 사용하는 것 밖에 없다.
  • ShutdownHook을 사용한 Graceful Shutdown

10. Consumer Deserializer#

11. Standalone Consumer#

독립적으로 실행되는 컨슈머가 필요한 상황

  • 하나의 컨슈머가 모든 파티션으로부터 데이터를 읽어오는 상황
  • 특정 파티션에 대해 데이터를 읽어오는 상황

Reference#

[kafka]05. 컨슈머
https://yjinheon.netlify.app/posts/02de/kafka/de-kafka-05_consumer/
Author
Datamind
Published at
2024-12-02
License
CC BY-NC-SA 4.0