1161 words
6 minutes
[DE Design Pattern]05-6. Local Aggregator
6. Data Aggregation: Local Aggregator
01. Pattern Overview
Local Aggregator는 shuffle 없이 로컬에서 집계를 수행하는 패턴
- 입력 데이터가 올바르게 파티셔닝되어 있어서, 같은 키의 레코드가 이미 같은 파티션에 존재
- 데이터셋이 충분히 작아서 단일 머신 메모리에 전부 적재 가능
기본적으로 네트워크 교환이 입력 읽기 1회뿐이므로 훨씬 효율적이고, 각 태스크가 독립적으로 진행가능
02. Kafka Streams: groupByKey
- Kafka Streams는 Local Aggregator를 가장 직관적으로 지원.
- Kafka 토픽의 레코드는 key 기준으로 파티셔닝되므로,
groupByKey가 shuffle 없이 동작
# Kafka 토픽: visits (key=visit_id)
# groupByKey → 같은 key가 이미 같은 파티션에 있으므로 shuffle 없음# grouped = visits_stream.group_by_key()# aggregated = grouped.aggregate(# initializer=AggregatedVisits(),# aggregator=lambda key, visit, agg: agg.add(visit)# )# aggregated.to("visits-aggregated")groupByKey는 레코드의 기존 key를 사용- 프로듀서가 같은 key를 항상 같은 파티션에 쓰므로 네트워크 교환이 불필요
03. PySpark: sortWithinPartitions + foreachPartition
Spark에는 명시적인 “local aggregation” API가 없지만, 파티션 내부 정렬 + 파티션별 처리로 동일 효과
from pyspark.sql import DataFrame
# 1단계: 파티션 내부에서만 정렬 (shuffle 없음)sorted_visits: DataFrame = ( visits .sortWithinPartitions(["visit_id", "event_time"]))
# 2단계: 파티션별로 로컬 집계 수행def aggregate_partition(rows): writer = KafkaWriter(bootstrap_server, output_topic) for visit in rows: writer.process(visit) writer.close()
sorted_visits.foreachPartition(aggregate_partition)# KafkaWriter: 정렬된 레코드를 순회하며 세션 집계class KafkaWriter: def __init__(self, bootstrap_server: str, output_topic: str): self.in_flight_visit = {"visit_id": None, "pages": []}
def process(self, row): # visit_id가 바뀌면 → 이전 세션 완료, 전송 if row.visit_id != self.in_flight_visit["visit_id"]: self._send(self.in_flight_visit) self.in_flight_visit = {"visit_id": row.visit_id, "pages": []} self.in_flight_visit["pages"].append(row.page)원리: sortWithinPartitions은 각 파티션 내부에서만 정렬하므로 shuffle이 없습니다. 정렬된 레코드를 순회하면서 visit_id가 바뀌는 시점에 집계를 완료합니다. 전제 조건: 같은 visit_id의 모든 레코드가 이미 같은 파티션에 있어야 합니다.
04. AWS Redshift: Distribution Style
데이터 웨어하우스 레벨에서도 Local Aggregator를 적용가능
-- KEY 분배: 같은 visit_id가 항상 같은 노드에 저장CREATE TABLE visits ( visit_id INT, user_id INT, ...)DISTSTYLE KEYDISTKEY(visit_id);
-- ALL 분배: 전체 테이블을 모든 노드에 복사-- → 어떤 JOIN이든 로컬에서 수행 가능 (작은 참조 테이블용)CREATE TABLE users ( user_id INT, ...)DISTSTYLE ALL;DISTSTYLE KEY는 같은 DISTKEY 값을 한 노드에 모음,DISTSTYLE ALL은 테이블 전체를 모든 노드에 복사함. 두 경우 모두 JOIN/집계가 로컬에서 수행
05. Spark Bucketing
Spark에서도 동일 키, 동일 버킷 수로 저장된 테이블 간에는 shuffle 없이 JOIN이 가능
# 저장 시 bucketing 적용(visits.write .bucketBy(16, "visit_id") .sortBy("visit_id") .saveAsTable("visits_bucketed"))
# 같은 key, 같은 버킷 수로 저장된 테이블끼리 JOIN → shuffle 없음06. 주요 트레이드오프
- Scaling 제약: 파티션 수가 고정되어야 함. 파티션 수를 변경하면 모든 레코드의 파티션 할당을 재계산해야 하며, 스트리밍에서는 stop-the-world 이벤트 필요
- 단일 그룹핑 키 제약: 파티셔닝 키가 하나로 고정되므로, 다른 키로 집계하려면 Distributed Aggregator를 써야 함. 예: visit_id로 파티셔닝했는데 user_id로 집계하려면 shuffle 불가피
- 프로듀서 책임: 같은 키를 같은 파티션에 쓰는 것은 프로듀서가 보장해야 함. 정적 파티셔닝 키 + 불변 파티션 수가 전제
Concept
- Local Aggregator : shuffle 없이 로컬 파티션 내에서 집계를 수행하는 패턴. 입력 데이터가 올바르게 파티셔닝되어 있어야 함
- sortWithinPartitions : Spark에서 파티션 내부에서만 정렬하여 shuffle을 회피하는 연산
- foreachPartition : Spark에서 각 파티션을 독립적으로 처리하는 연산. 로컬 집계 로직 구현에 활용
- Co-partitioning : 두 데이터셋이 같은 키, 같은 파티션 수로 분배되어 shuffle 없이 JOIN 가능한 상태
- DISTSTYLE KEY : Redshift에서 특정 컬럼 값 기준으로 같은 노드에 데이터를 배치하는 분배 전략
- DISTSTYLE ALL : Redshift에서 테이블 전체를 모든 노드에 복사하는 분배 전략. 작은 참조 테이블에 적합
- Bucketing : 지정된 키와 버킷 수로 데이터를 미리 분배 저장하여, 이후 JOIN/집계 시 shuffle을 회피하는 기법
- groupByKey (Kafka Streams) : Kafka 레코드의 기존 key를 활용하여 shuffle 없이 로컬 집계하는 메서드
- Spark mapPartitions vs foreachPartition : 파티션 단위 처리 API의 차이 (변환 vs 액션)
[DE Design Pattern]05-6. Local Aggregator
https://yjinheon.netlify.app/posts/02de/de-design-pattern/05_data_value/05-06-local-aggregator/