1. 카프카 컨슈머
카프카 컨슈머는 카프카 시스템에서 읽기 역할을 담당하는 주체이다. 여기서는 카프카에서 메시지 데이터를 읽는 방식과 이와 관련한 컨슈머 중요 개념들을 다룬다.
컨슈머 그룹
- 컨슈머 그룹은 기본적으로 메시지 처리(읽기)를 Scaling할 수 있게 한다.
- 메시지 처리를 Scaling한다는 것은 카프카 토픽에서 컨슈머가 데이터를 읽어오는 양을 확장하는 것이다.
- 구체적으로는 여러 컨슈머가 같은 토픽으로부터 파티션 단위로 데이터를 분할해서 읽어오는 것이다.
Concept
- Consumer Group : 토픽에서 읽어오는 데이터를 확장할 수 있도록 구성된 컨슈머의 집합.
컨슈머의 특성
Key Takeaway
- 오직 하나의 컨슈머에 하나의 파티션만 할당될 수 있다. 따라서 토픽의 파티션 수가 병렬 처리의 상한을 결정한다.
- 토픽에 설정된 파티션 수 이상으로 컨슈머를 늘리는 것은 무의미하다.
- 카프카는 성능 저하 없이 컨슈머와 컨슈머 그룹이 동일한 토픽을 구독하게끔 확장할 수 있다.
- 토픽에서 메시지를 읽거나 처리하는 규모를 확장하기 위해서는 이미 존재하는 컨슈머 그룹에 컨슈머를 추가해 해당 그룹의 컨슈머 각각이 메시지의 일부를 처리 하도록 하는 것이 좋다.
파티션 리밸런싱
리밸런싱은 기본적으로 하나의 파티션에 하나의 컨슈머라는 원칙을 보장하기 위한 카프카의 방식이다.
언제 리밸런싱이 발생하는가?
리밸런싱 전략은 무엇이 있는가?
![]()
Eager Rebalancing에서 거치는 단계
1. 파티션 반환
- 모든 컨슈머들이 현재 자신이 소비하고 있던 파티션들을 반환.(모든 파티션의 소비 중단됨)
2. JoinGroup
- Group Coordinator(Broker)에게 JoinGroup 요청을 보냄. 처음으로 그룹에 참여하는 컨슈머는 JoinGroup 요청 시 그룹 리더가 될 수 있다.
- Group Coordinator는 모든 컨슈머로부터 JoinGroup 요청을 받을 때까지 대기.
- 모든 요청을 받으면, 그룹 리더를 선정하고 JoinGroupResponse를 통해 컨슈머 정보를 리더에게 전달.
3. Synchronization Barrier
- 중요 동기화 지점
- 모든 컨슈머가 이전 파티션 할당을 완전히 해제하도록 보장.
- 파티션 리밸런싱 전 데이터 손실이나 중복 처리를 방지.
4. SyncGroup
그룹 리더(Consumer)가 새로운 파티션 할당을 계산하고, 이를 브로커에게 전송.
- 모든 컨슈머가 그룹 리더에게 SyncGroup 요청.
- 그룹 리더(Consumer)는 모든 컨슈머로부터 SyncGroup 요청을 받을 때까지 대기
- 모든 요청이 도착하면, 리더가 파티션 리밸런싱 계산.
- 계산된 할당 결과를 브로커에게 전송하고, 브로커는 이를 SyncGroupResponse를 통해 모든 멤버에게 브로드 캐스트
![[de-kafka-05-eager_rebalancing.png]]
Rebalancing 전략
Concept
- Partition Rebalancing : 컨슈머 그룹에 새로운 컨슈머가 추가되거나 기존 컨슈머가 사라지면, 토픽에 파티션을 새로 할당하는 과정.
- Eager Rebalancing : 조급한 리밸런싱.모든 컨슈머가 읽기 작업을 멈추고 파티션을 완전히 다시 할당하는 방식. 모든 컨슈머가 자신에게 할당된 파티션을 포기하고 새로운 파티션을 할당받는 두 단계를 거친다.
- Cooperative Rebalancing : 협력적 리밸런싱. 핵심 아이디어는 전체 파티션이 아닌 재할당 대상이 되는 파티션만 컨슈머가 읽기 작업을 정지하고 리밸런싱을 수행하는 것. 서비스 다운타임 최소화 및 시스템 안정성 향상 도모.
2. 카프카 컨슈머
3. 토픽 구독
4. Polling Loop
- 카프카에서 컨슈머는 기본적으로 종료되지 않는 루프를 돌면서 poll() 메서드를 호출한다.
- 처리가 완료되면 결과물을 데이터 저장소에 쓰거나 이미 저장된 레코드를 읽어온다.
개념
![]()
polling 방식의 이해 polling 은 어떤 프로세스가 다른 프로세스의 상태를 주기적으로 확인하는 것이다. 기본적으로 클라이언트가 서버로 주기적으로 요청을 날려서 이벤트 내용을 전달받는다. 구현이 단순하지만 서버 부하를 줄 수 있고(http 오버헤드 발생 가능) 실시간성이 떨어질 수 있다. 일정 주기로 서버가 갱신되는 대시보드 시나리오에서 적용하기 보다 적합하다.
추가 개념
- long polling
- web socket
- server-sent event(SSE)(streaming)
Concept
- polling : 기본적으로 어떤 프로세스가 다른 프로세스의 상태를 주기적으로 확인하는 것.
- http overhead : 정보의 신뢰성 판단을 위한, 보내지는 헤더같은 정보 때문에 처리시간이나 데이터의 양이 증가하는 것을 의미.
poll() 호출시 처리 과정
- poll() : 컨슈머가 브로커로부터 메시지를 가져오는 메서드.
poll은 단순히 데이터를 가져오는 것보다 많은 일울 수행 한다.
- Group Coordinator를 찾아 Consumer Group에 추가.
- 파티션 할당받기
- 리밸런스와 관련한 콜백처리
- 컨슈머나 콜백에서 발생할 수 있는 예외 처리
Thread Safe 문제
- 하나의 Thread당 하나의 Consumer 가 원칙이다.
- 이는 Thread Safe 문제를 해결하기 위함이다.
- Thread Safe는 기본적으로 여러 Thread가 동시에 자원에 접근하더라도 프로그램 실행에 문제가 없는 것을 의미한다.
하나의 어플리케이션에서 동일한 그룹에 속하는 여러 컨슈머를 운용할 경우:
ExecuterService를 사용해 각 컨슈머를 별도의 스레드로 실행한다.
각자의 컨슈머를 가지는 다수의 스레드
수집 스레드와 다수의 워커 스레드로 분리
5. Consumer 설정
대부분의 매개변수는 합리적인 기본 값을 가지고 있기 때문데 딱히 변경할 필요가 없다. 여기서는 일부 주요한 설정들만 다룬다.
Concept
- fetch.min.bytes : 브로커로부터 가져올 최소 데이터 양. 값을 크게 가져갈 경우 컨슈머 수가 많을 때 브로커의 부하를 줄일 수 있다. 하지만 처리량이 적을때 레이턴시가 늘어날 수 있다.
- fetch.max.bytes : 브로커로부터 가져올 최대 데이터 양.
- max.poll.interval.ms : poll() 호출 간격의 최대 시간. 컨슈머가 poll을 하지 않고도 죽은것으로 판단하지 않기 위한 최대 시간.
Consumer Assignment 전략
Concept
- Range : 구독하는 각 토픽의 파티션을 연속된 그룹으로 나눠서 할당.
- Round Robin : Round Robin Partitioning. 모든 구독된 토픽의 모든 파티션을 가져다 순차적으로 하나씩 컨슈머에 할당.
- Sticky : 파티션을 균등하게 할당하면서 리밸런싱 발생시 가능한 많은 파티션들이 같은 컨슈머에 할당되게끔 하는 전략.
- Cooperative Sticky : Cooperative Sticky Assignment. Sticky Assignment와 기본적으로 동일하지만, 컨슈머가 재할당되지 않은 파티션으로부터 데이터를 읽는 협력적 리밸런싱 지원.
6. Offset Commit
어떻게 Consumer가 Offset을 추적 관리하는가?
신경써야할 지점
중복 메시지
메시지 손실
Key Takeaway
- 컨슈머가 파티션 안에서의 현재 위치를 업데이트 하는 작업을 오프셋 커밋이라고 한다.
- 기본적으로
poll()이 리턴한 마지막 오프셋 바로 다음 오프셋을 커밋한다. - AutoCommit의 문제는 기본적으로 중복을 완전히 방지하지 못한다는 것이다
CommitSync는poll에 의해 반환된 마지막 오프셋을 커밋한다.- 비동기적 커밋은 브로커가 커밋 요청에 응답할 때 깨지 어플리케이션이 블락됨으로서 발생되는 처리량 제한 문제를 해결하기 위해 브로커가 커밋에 응답할 때까지 기다리는 대신 요청만 보내고 처리를 계속하는 것이다.
Auto Commit
현재 offset commit
비동기 commit
동기, 비동기 commit함께 사용
특정 offset commit
7. Rebalance Listener
Rebalance Listener는 컨슈머 그룹의 파티션이 할당되거나 해제될 때 호출되는 콜백 메서드를 정의한다.
onPartitionAssigned(Collection<TopicPartition>, partitions): 파티션이 할당된 수 컨슈머가 메시지를 읽기 시작하기 전 호출onPartitionRevoked(Collection<TopicPartition>, partitions): 파티션이 할당 해제될때 호출onPartitionLost(Collection<TopicPartition>, partitions): 예외적인 리밸런싱 상황에서 호출
8. seekToBeginning, seekToEnd
특정 오프셋에서부터 메시지를 읽기 시작하거나 특정 오프셋까지 메시지를 읽기 위해 사용된다.
seekToBeginning: 앞의 메시지는 건너 뛰고 파티션에 새로 들어온 메시지 부터 읽기seekToEnd: 파티션에 있는 모든 메시지를 읽기
9. Polling Loop에서 벗어나기
- consumer.wakeup() : ShutdownHook은 JVM레벨에서 일어나기 때문에 사실상 polling loop를 벗어나는 방법은 wakeup() 메서드를 사용하는 것 밖에 없다.
ShutdownHook을 사용한 Graceful Shutdown
10. Consumer Deserializer
11. Standalone Consumer
독립적으로 실행되는 컨슈머가 필요한 상황
- 하나의 컨슈머가 모든 파티션으로부터 데이터를 읽어오는 상황
- 특정 파티션에 대해 데이터를 읽어오는 상황