1. 개요
프로듀서는 카프카에서 메시지를 생성하고 브로커에 전송하는 역할을 한다. 프로듀서는 카프카 시스텀에서 쓰기 작업을 담당하는 개념이다. 여기서는 카프카에 데이터를 전송할 때 거치는 단계에 대해 간단히 다룬다.
메시지 전송 절차
1. ProducerRecord 객체 생성
객체 생성시 직렬화
- 필수 지정 : Topic, Value
- 선택 지정 : Key, Partition, Timestamp
2. 데이터를 파티셔너로 전송
- 파티셔너는 토픽의 파티션을 선택한다. 일반적으로 메시지의 Key를 기준으로 메시지를 선택한다.
- 전송 성공시 metadata 객체 반환
3. 레코드 배치에 추가
- 별도의 스레드가 레코드 배치를 적절한 카프카 브로커에게 전송
4. 카프카 브로커에 전송
5. 카프카 브로커 응답 반환
2. 프로듀서 생성
프로듀서 생성시 옵션들
- bootstrap.servers : 브로커 목록을 지정한다.
- key.serializer : 메시지 키의 직렬화 방법을 지정한다.
- value.serializer : 메시지 값의 직렬화 방법을 지정한다.
프로듀서 인터페이스는 임의 자바 객체를 키 혹은 밸류로 전송할 수 있도록 매개변수화된 타입을 사용할 수 있도록 한다
프로듀서 설정을 위한 몇가지 예제들
- Spring Boot환경에서 yaml file에 프로듀서 설정 작성
kafkaTemplate<>사용@Configurationclass 생성- Properties()
메시지 전송 방법
Concept
- Fire and Forget : 메시지를 서버에 전송만하고 성공 혹인 실패 여부에는 신경쓰지 않음. 재시도할 수 없는 에러 혹은 타임아웃 발생시 메시지가 유실됨.
- 동기적 전송 : 기본적으로 카프카는 비동기적으로 동작하지만(Future 객체를 반환) 다음 메시지를 전송하기 전 get()을 호출하여 동기적으로 동작하게끔 할 수 있다.
- 비동기적 전송 : 콜백함수와 함께 send() 를 호출하면 카프카 브로커로부터 응답을 받는 시점에서 자동으로 콜백 함수가 호출된다.
메시지 직렬화 예외 시나리오
SerializationException: 직렬화 실패TimeoutExcepion: 버퍼가 가득참InterruptException: dfddfdf
동기적 전송
비동기적 전송
3. 프로듀서 설정
메모리 사용량, 성능, 신뢰성에 영향을 미치는 주요 파라미터들
프로듀서 설정
Concept
client.id: 프로듀서 클라이언트 식별자. 브로커에 로그를 남길 때 사용.acks: 프로듀서 매개변수. 프로듀서가 임의의 쓰기 작업이 성공했다고 판별하기 위해 얼마나 많은 파티션 레플리카가 해당 메시지를 수신해야 하는지를 지정.end to end latency: 종단지연. 레코드가 생성되어 컨슈머가 읽을 수 있을 때까지의 시간. acks는 종단지연에 영향을 주지 않는다.
프로듀서 설정 예제
Spring Configuration 방식으로 설정
@Configurationpublic class KafkaProducerConfig {
// <String, String> // <String, customDTO> @Bean public ProducerFactory<String, Customer> factory() {
return new DefaultKafkaProducerFactory<>(producerConfigs()); }
@Bean public KafkaTemplate<String, Customer> kafkaTemplate() { return new KafkaTemplate<>(factory()); }
@Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9095"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); props.put("schema.registry.url", "http://localhost:8084");
return props; }
}ack 설정 시나리오
기본적으로 acks설정은 신뢰성과 프로듀서 지연 사이의 트레이드 오프를 제공한다.
acks=0: 서버로부터 메시시 전송 성공 여부를 확인하지 않음. 높은 처리량이 필요한 경우 사용acks=1: 리더 레플리카에게 메시지 전송 성공 여부를 확인. 기본값acks=2: 리더와 ISR에게 메시지 전송 성공 여부를 확인. 높은 신뢰성이 필요한 경우 사용
메시지 전달 시간 설정
ProducerRecord를 보낼 때 걸리는 시간 두 구간
- send에 대한 비동기 호출이 이뤄진 시간부터 결과를 리턴할 때 까지 걸리는 시간.
- send에 대한 비동기호출이 리턴한 시각부터 콜백이 호출 될 때까지 걸리는 시간.
batch.size 설정
batch.size: 프로듀서가 각각의 메시지 배치에 사용될 메모리 양을 지정한다.batch.size를 너무 작게 설정할 경우 작은 배치를 자주 전송하게 되어 약간의 오버헤드가 발생할 수 있다.
순서 보장하기
max.inflight.requests.per.connection: 프로듀서가 서버로부터 응답을 받지 못한 상태에서 전송할 수 있는 최대 메시지 의 수를 지정한다. 이 값을 올릴 경우 메모리 사용량이 증가하지만 처리량 또한 증가한다.
4. 직렬화
커스텀 직렬화
커스텀 직렬화는 일반적으로 권장되지 않는다. 이는 기존 스키마와 신규 스키마 간의 호환성을 유지하기 어렵게 만들기 때문이다.
Avro 직렬화
스키마 변경시 시나리오
- 데이터를 읽고 쓸때 사용되는 스키마가 호환되어야 한다.
- 역직렬화시 데이터를 쓸 대 사용했던 스키마에 접근이 가능해야 한다.
Schema Registry
Concept
- Schema Registry : Producer와 Consumer 간에 주고받는 메시지의 형식(스키마)을 중앙에서 관리하는 시스템. 주로 Avro, Protocol Buffers와 같은 스키마 형식을 지원하며 스키마 버전관리가 가능하다. RESTful 인터페이스를 통해 스키마를 저장하고 검색할 수 있다.
- Avro : 바이너리 포맷으로 데이터를 읽고 쓰기 위한 데이터 직렬화 시스템. 스키마는 JSON으로 정의되며, 데이터는 스키마에 따라 직렬화된다. 기본적으로 메시지가 바이너리라 메모리를 적게 사용하며 데이터가 스키마와 함께 저장되지 않기 때문에 데이터의 일관성을 유지하기 용이하다. 이는 데이터의 구조가 변경되더라도 기존 데이터와의 호환성을 유지할 수 있기 때문이다.
5. 파티션
- 같은 키 값을 가진 모든 메시지는 기본적으로 같은 파티션에 저장된다.
- 임의의 프로세스가 전체 파티션 중 일부를 읽을 경우 특정 키 값을 가진 모든 메시지를 읽게 된다.
Sticky Partitioner
접착성 처리
접착성 처리는 기본적으로 프로듀서가 메시지 배치를 채울 때, 다음 배치로 넘어가기 전에 이전 배치를 모두 채우는 것을 의미한다. 키 값이 null 인 메시지를 접착성 처리할 경우 일단 키 값이 있는 메시지에 따라 붙어야 RR 방식오로 배치되게 된다.
Partitioner
Kafka에서 제공되는 파티셔너
Concept
- Sticky Partitioning : 프로듀서가 메시지 배치를 채울 때, 다음 배치로 넘어가기 전에 이전 배치를 모두 채우는 것
- RoundRobin Partitioner : 기본적으로 메시지를 전체 파티션에 대해 균등하게 할당하는 것
- UniformSticky Partitioner : batch.size에 도달할 때까지 파티션을 유지. 대기 중인 큐의 크기를 고려하여 파티션을 전환하는 균등 분배 전략 사용.
6. 헤더
카프카 레코드의 Key, Value를 건드리지 않고 추가 메타데이터를 작성할 경우 Header 를 사용한다.
7. 인터셉터
프로듀서 인터셉터에서 다음의 두 메소드를 정의할 수 있다.
onSend: 프로듀서 레코드를 브로커로 보내기 전 직렬화 되기 직전에 호출oncknowledgement: 카프카 브로커로부터 응답를 받은 후 호출
8. 쿼터, 스로틀링
쿼터 타입
- 쓰기 쿼터(produce quota)
- 읽기 쿼터(consume quota)
- 요청 쿼터(request quota)
리소스 관리하기
쿼터, 쓰로틀링이 필요한 이유는 기본적으로 브로커의 리소스(CPU, Memory, Network) 를 안정적으로 관리하기 위함이다. 프로듀서가 브로커에 너무 많은 요청을 보낸 상태에서 브로커가 밀린 메시지를 처리하지 못해 프로듀서 버퍼의 메모리 공간이 확보되지 못할 경우 TimeoutException이 발생할 수 있다.
Key Takeaway
- Latency와 메시지 신뢰성의 Tradeoff를 고려해 프로듀서 설정을 조정할 수 있다.
- Latency와 Throughput의 Tradeoff를 고려해 프로듀서 설정을 조정할 수 있다.
- 비동기 호출이 시행되고 반환되는 시간과 콜백이 호출되는 시간을 나눠서 관제함으로써 메시지 전송에서의 병목 지점을 파악할 수 있다.
- 기본 제공타입 직렬화나 커스텀 직렬화 대신 Avro와 같은 직렬화 포맷을 활용하는 것이 권장된다.
- Schema Registry 를 통해 스키마 버전관리와 호환성을 유지할 수 있다.
- Sticky Partitioning 을 통해 더 적은 요청으로 같은 수의 메시지를 전송함으로써 Latency와 CPU 사용량을 줄일 수 있다.
- 파티션을 결정하는데 사용하는 키가 중요해서 같은 키값을 사용하는 파티션이 변경되어서는 안될 경우. 충분한 수의 파티션을 가진 토픽을 생성하고 그 이상 파티션을 추가하지 않는 것이 좋다.
- 카프카 레코드의 Key, Value를 건드리지 않고 추가 메타데이터를 작성할 경우
Header를 사용한다. - 쿼터, 쓰로틀링이 필요한 이유는 기본적으로 브로커의 리소스(CPU,Memory,Network) 를 안정적으로 관리하기 위함이다.