908 words
5 minutes
[DE Design Pattern]05-5. Distributed Aggregator

5. Data Aggregation: Distributed Aggregator#

01. Overview#

  • 여러 물리적 노드에 분산된 데이터를 그룹핑 키 기반으로 모아 집계하는 패턴
  • 싱글노드에 담지 못하는 대규모 데이터셋에서 GROUP BY + 집계 함수를 수행하는 기본 방식
  • 내부적으로 shuffle이라는 네트워크 교환이 발생하는 것이 특징. 기본적으로 같은 키의 레코드가 서로 다른 노드에 흩어져 있기에 이를 한 노드로 모아야 집계가 가능

02. Shuffle과 Partial Aggregation#

  • Shuffle은 네트워크 비용이 큰 연산이지만 partial aggregation(부분 집계)으로 최적화가능.
  • ex) 각 노드에서 먼저 로컬 count를 수행하고 그 숫자만 shuffle하여 최종 합산
from pyspark.sql import DataFrame
# 두 개의 물리적으로 분리된 데이터 소스 결합
visits: DataFrame = spark.read.json(f"{base_dir}/visits")
devices: DataFrame = spark.read.jdbc(
url="jdbc:postgresql://localhost/dedp",
table="dedp.devices",
properties={"user": "dedp", "password": "dedp_test",
"driver": "org.postgresql.Driver"}
)
# JOIN → 내부적으로 shuffle 발생
visits_with_devices = visits.join(
devices,
[devices.type == visits.context.technical.dev_type,
devices.version == visits.context.technical.dev_version],
"inner"
)

03. Shuffle 확인: explain()#

visits_with_devices.explain()
# 출력 중 핵심 부분:
# +- Exchange hashpartitioning(dev_type, dev_version, 200)
# | +- Filter (...)
# | +- FileScan json [...]
# +- Exchange hashpartitioning(type, version, 200)
# +- Scan JDBCRelation(...)

Exchange hashpartitioning이 보이면 shuffle이 발생하는 것입니다.

04. Data Skew와 Salting#

  • 특정 키에 데이터가 극단적으로 몰릴경우 (data skew) 해당 노드만 병목발생.
  • Salting으로 완화
from pyspark.sql.functions import rand
# 1단계: salt 추가 후 분산 집계
partial = (
dataset
.withColumn("salt", (rand() * 3).cast("int"))
.groupBy("group_key", "salt")
.agg({"value": "sum", "*": "count"})
)
# 2단계: salt 제거 후 최종 집계
result = (
partial
.groupBy("group_key")
.agg({"sum(value)": "sum", "count(1)": "sum"})
)
  • group_key에 랜덤 salt(0~2)를 붙여 하나의 hot key를 3개로 분산시키는 방식.
  • 1단계에서 (key, 0), (key, 1), (key, 2) 세 그룹으로 부분 집계
  • 2단계에서 salt를 제거후 최종 합산

05. External Table: DB에서 외부 파일 참조#

  • 분산 처리 프레임워크 외에도, 데이터 웨어하우스 자체에서 외부 파일을 테이블처럼 참조가능.

06. 주요 트레이드오프#

  • 이중 네트워크 교환: (1) 입력 데이터 읽기 + (2) shuffle. 현대 아키텍처에서 storage-compute 분리가 일반적이므로 (1)은 불가피, (2)는 Local Aggregator로 회피 가능
  • Data Skew: 불균형 키가 있으면 한 노드가 병목 → salting 또는 Spark AQE(Adaptive Query Execution)로 완화
  • Scaling: shuffle 중 노드가 유휴 상태여도 fault tolerance를 위해 유지됨 → External Shuffle Service로 유휴 노드 해제 가능

Concept

  • Distributed Aggregator : 여러 노드에 분산된 데이터를 그룹핑 키로 모아 집계하는 패턴. MapReduce 프로그래밍 모델의 전형
  • Shuffle : 같은 키의 레코드를 동일 노드로 이동시키는 네트워크 교환. 분산 집계의 핵심이자 주요 병목
  • Partial Aggregation : shuffle 전 각 노드에서 로컬 부분 집계를 수행하여 전송 데이터량을 줄이는 최적화
  • Data Skew : 특정 키에 데이터가 극단적으로 편중되어 한 노드가 병목이 되는 현상
  • Salting : 그룹핑 키에 랜덤값을 추가하여 hot key를 여러 파티션으로 분산시키는 skew 완화 기법
  • Adaptive Query Execution (AQE) : Spark가 런타임에 skew를 감지하고 자동으로 파티션을 재분배하는 기능
  • External Shuffle Service : shuffle 데이터를 별도 컴포넌트가 관리하여 유휴 노드를 조기 해제하는 최적화
  • External Table : 데이터 웨어하우스에서 외부 스토리지(S3, GCS 등)의 파일을 테이블처럼 참조하는 기능

[DE Design Pattern]05-5. Distributed Aggregator
https://yjinheon.netlify.app/posts/02de/de-design-pattern/05_data_value/05-05-distributed_aggregator/
Author
Datamind
Published at
2025-03-14
License
CC BY-NC-SA 4.0