908 words
5 minutes
[DE Design Pattern]05-5. Distributed Aggregator
5. Data Aggregation: Distributed Aggregator
01. Overview
- 여러 물리적 노드에 분산된 데이터를 그룹핑 키 기반으로 모아 집계하는 패턴
- 싱글노드에 담지 못하는 대규모 데이터셋에서 GROUP BY + 집계 함수를 수행하는 기본 방식
- 내부적으로 shuffle이라는 네트워크 교환이 발생하는 것이 특징. 기본적으로 같은 키의 레코드가 서로 다른 노드에 흩어져 있기에 이를 한 노드로 모아야 집계가 가능
02. Shuffle과 Partial Aggregation
- Shuffle은 네트워크 비용이 큰 연산이지만 partial aggregation(부분 집계)으로 최적화가능.
- ex) 각 노드에서 먼저 로컬 count를 수행하고 그 숫자만 shuffle하여 최종 합산
from pyspark.sql import DataFrame
# 두 개의 물리적으로 분리된 데이터 소스 결합visits: DataFrame = spark.read.json(f"{base_dir}/visits")devices: DataFrame = spark.read.jdbc( url="jdbc:postgresql://localhost/dedp", table="dedp.devices", properties={"user": "dedp", "password": "dedp_test", "driver": "org.postgresql.Driver"})
# JOIN → 내부적으로 shuffle 발생visits_with_devices = visits.join( devices, [devices.type == visits.context.technical.dev_type, devices.version == visits.context.technical.dev_version], "inner")03. Shuffle 확인: explain()
visits_with_devices.explain()
# 출력 중 핵심 부분:# +- Exchange hashpartitioning(dev_type, dev_version, 200)# | +- Filter (...)# | +- FileScan json [...]# +- Exchange hashpartitioning(type, version, 200)# +- Scan JDBCRelation(...)Exchange hashpartitioning이 보이면 shuffle이 발생하는 것입니다.
04. Data Skew와 Salting
- 특정 키에 데이터가 극단적으로 몰릴경우 (data skew) 해당 노드만 병목발생.
- Salting으로 완화
from pyspark.sql.functions import rand
# 1단계: salt 추가 후 분산 집계partial = ( dataset .withColumn("salt", (rand() * 3).cast("int")) .groupBy("group_key", "salt") .agg({"value": "sum", "*": "count"}))
# 2단계: salt 제거 후 최종 집계result = ( partial .groupBy("group_key") .agg({"sum(value)": "sum", "count(1)": "sum"}))group_key에 랜덤 salt(0~2)를 붙여 하나의 hot key를 3개로 분산시키는 방식.- 1단계에서
(key, 0),(key, 1),(key, 2)세 그룹으로 부분 집계 - 2단계에서 salt를 제거후 최종 합산
05. External Table: DB에서 외부 파일 참조
- 분산 처리 프레임워크 외에도, 데이터 웨어하우스 자체에서 외부 파일을 테이블처럼 참조가능.
06. 주요 트레이드오프
- 이중 네트워크 교환: (1) 입력 데이터 읽기 + (2) shuffle. 현대 아키텍처에서 storage-compute 분리가 일반적이므로 (1)은 불가피, (2)는 Local Aggregator로 회피 가능
- Data Skew: 불균형 키가 있으면 한 노드가 병목 → salting 또는 Spark AQE(Adaptive Query Execution)로 완화
- Scaling: shuffle 중 노드가 유휴 상태여도 fault tolerance를 위해 유지됨 → External Shuffle Service로 유휴 노드 해제 가능
Concept
- Distributed Aggregator : 여러 노드에 분산된 데이터를 그룹핑 키로 모아 집계하는 패턴. MapReduce 프로그래밍 모델의 전형
- Shuffle : 같은 키의 레코드를 동일 노드로 이동시키는 네트워크 교환. 분산 집계의 핵심이자 주요 병목
- Partial Aggregation : shuffle 전 각 노드에서 로컬 부분 집계를 수행하여 전송 데이터량을 줄이는 최적화
- Data Skew : 특정 키에 데이터가 극단적으로 편중되어 한 노드가 병목이 되는 현상
- Salting : 그룹핑 키에 랜덤값을 추가하여 hot key를 여러 파티션으로 분산시키는 skew 완화 기법
- Adaptive Query Execution (AQE) : Spark가 런타임에 skew를 감지하고 자동으로 파티션을 재분배하는 기능
- External Shuffle Service : shuffle 데이터를 별도 컴포넌트가 관리하여 유휴 노드를 조기 해제하는 최적화
- External Table : 데이터 웨어하우스에서 외부 스토리지(S3, GCS 등)의 파일을 테이블처럼 참조하는 기능
[DE Design Pattern]05-5. Distributed Aggregator
https://yjinheon.netlify.app/posts/02de/de-design-pattern/05_data_value/05-05-distributed_aggregator/