1161 words
6 minutes
[DE Design Pattern]05-6. Local Aggregator

6. Data Aggregation: Local Aggregator#

01. Pattern Overview#

Local Aggregator는 shuffle 없이 로컬에서 집계를 수행하는 패턴

  1. 입력 데이터가 올바르게 파티셔닝되어 있어서, 같은 키의 레코드가 이미 같은 파티션에 존재
  2. 데이터셋이 충분히 작아서 단일 머신 메모리에 전부 적재 가능

기본적으로 네트워크 교환이 입력 읽기 1회뿐이므로 훨씬 효율적이고, 각 태스크가 독립적으로 진행가능

02. Kafka Streams: groupByKey#

  • Kafka Streams는 Local Aggregator를 가장 직관적으로 지원.
  • Kafka 토픽의 레코드는 key 기준으로 파티셔닝되므로, groupByKey가 shuffle 없이 동작
# Kafka 토픽: visits (key=visit_id)
# groupByKey → 같은 key가 이미 같은 파티션에 있으므로 shuffle 없음
# grouped = visits_stream.group_by_key()
# aggregated = grouped.aggregate(
# initializer=AggregatedVisits(),
# aggregator=lambda key, visit, agg: agg.add(visit)
# )
# aggregated.to("visits-aggregated")
  • groupByKey는 레코드의 기존 key를 사용
  • 프로듀서가 같은 key를 항상 같은 파티션에 쓰므로 네트워크 교환이 불필요

03. PySpark: sortWithinPartitions + foreachPartition#

Spark에는 명시적인 “local aggregation” API가 없지만, 파티션 내부 정렬 + 파티션별 처리로 동일 효과

from pyspark.sql import DataFrame
# 1단계: 파티션 내부에서만 정렬 (shuffle 없음)
sorted_visits: DataFrame = (
visits
.sortWithinPartitions(["visit_id", "event_time"])
)
# 2단계: 파티션별로 로컬 집계 수행
def aggregate_partition(rows):
writer = KafkaWriter(bootstrap_server, output_topic)
for visit in rows:
writer.process(visit)
writer.close()
sorted_visits.foreachPartition(aggregate_partition)
# KafkaWriter: 정렬된 레코드를 순회하며 세션 집계
class KafkaWriter:
def __init__(self, bootstrap_server: str, output_topic: str):
self.in_flight_visit = {"visit_id": None, "pages": []}
def process(self, row):
# visit_id가 바뀌면 → 이전 세션 완료, 전송
if row.visit_id != self.in_flight_visit["visit_id"]:
self._send(self.in_flight_visit)
self.in_flight_visit = {"visit_id": row.visit_id, "pages": []}
self.in_flight_visit["pages"].append(row.page)

원리: sortWithinPartitions은 각 파티션 내부에서만 정렬하므로 shuffle이 없습니다. 정렬된 레코드를 순회하면서 visit_id가 바뀌는 시점에 집계를 완료합니다. 전제 조건: 같은 visit_id의 모든 레코드가 이미 같은 파티션에 있어야 합니다.

04. AWS Redshift: Distribution Style#

데이터 웨어하우스 레벨에서도 Local Aggregator를 적용가능

-- KEY 분배: 같은 visit_id가 항상 같은 노드에 저장
CREATE TABLE visits (
visit_id INT,
user_id INT, ...
)
DISTSTYLE KEY
DISTKEY(visit_id);
-- ALL 분배: 전체 테이블을 모든 노드에 복사
-- → 어떤 JOIN이든 로컬에서 수행 가능 (작은 참조 테이블용)
CREATE TABLE users (
user_id INT, ...
)
DISTSTYLE ALL;
  • DISTSTYLE KEY는 같은 DISTKEY 값을 한 노드에 모음,
  • DISTSTYLE ALL은 테이블 전체를 모든 노드에 복사함. 두 경우 모두 JOIN/집계가 로컬에서 수행

05. Spark Bucketing#

Spark에서도 동일 키, 동일 버킷 수로 저장된 테이블 간에는 shuffle 없이 JOIN이 가능

# 저장 시 bucketing 적용
(visits.write
.bucketBy(16, "visit_id")
.sortBy("visit_id")
.saveAsTable("visits_bucketed"))
# 같은 key, 같은 버킷 수로 저장된 테이블끼리 JOIN → shuffle 없음

06. 주요 트레이드오프#

  • Scaling 제약: 파티션 수가 고정되어야 함. 파티션 수를 변경하면 모든 레코드의 파티션 할당을 재계산해야 하며, 스트리밍에서는 stop-the-world 이벤트 필요
  • 단일 그룹핑 키 제약: 파티셔닝 키가 하나로 고정되므로, 다른 키로 집계하려면 Distributed Aggregator를 써야 함. 예: visit_id로 파티셔닝했는데 user_id로 집계하려면 shuffle 불가피
  • 프로듀서 책임: 같은 키를 같은 파티션에 쓰는 것은 프로듀서가 보장해야 함. 정적 파티셔닝 키 + 불변 파티션 수가 전제

Concept

  • Local Aggregator : shuffle 없이 로컬 파티션 내에서 집계를 수행하는 패턴. 입력 데이터가 올바르게 파티셔닝되어 있어야 함
  • sortWithinPartitions : Spark에서 파티션 내부에서만 정렬하여 shuffle을 회피하는 연산
  • foreachPartition : Spark에서 각 파티션을 독립적으로 처리하는 연산. 로컬 집계 로직 구현에 활용
  • Co-partitioning : 두 데이터셋이 같은 키, 같은 파티션 수로 분배되어 shuffle 없이 JOIN 가능한 상태
  • DISTSTYLE KEY : Redshift에서 특정 컬럼 값 기준으로 같은 노드에 데이터를 배치하는 분배 전략
  • DISTSTYLE ALL : Redshift에서 테이블 전체를 모든 노드에 복사하는 분배 전략. 작은 참조 테이블에 적합
  • Bucketing : 지정된 키와 버킷 수로 데이터를 미리 분배 저장하여, 이후 JOIN/집계 시 shuffle을 회피하는 기법
  • groupByKey (Kafka Streams) : Kafka 레코드의 기존 key를 활용하여 shuffle 없이 로컬 집계하는 메서드

  • Spark mapPartitions vs foreachPartition : 파티션 단위 처리 API의 차이 (변환 vs 액션)

[DE Design Pattern]05-6. Local Aggregator
https://yjinheon.netlify.app/posts/02de/de-design-pattern/05_data_value/05-06-local-aggregator/
Author
Datamind
Published at
2025-03-14
License
CC BY-NC-SA 4.0