1312 words
7 minutes
[spark]Spark Shuffling 최적화
01. Overview
- Spark 최적화 대상으로서의 Shuffle에 대해 간단히 정리
02. 셔플(Shuffle) 컨셉
분산 처리 환경에서 데이터를 연산하기 위해 노드 간에 데이터를 재분배하는 과정. 정확히는 분산노드간 특정 키를 기준으로 클러스터 전체의 데이터를 조합하는것
의존성(Dependency)의 차이
- 좁은 의존성 (Narrow Dependency):
map,filter연산. 각 노드가 보유한 데이터만 독립적으로 처리 가능. 네트워크 통신 없음. - 넓은 의존성 (Wide Dependency):
groupBy,join연산. 특정 ‘키(Key)‘를 기준으로 클러스터 전체의 데이터를 재조합해야 함. 이 과정이 **셔플(Shuffle)**임.
셔플이 스파크에서 병목인 이유
셔플은 단순한 논리적 지연이 아니라, 컴퓨팅 자원의 물리적 한계를 시험하는 구간
- 네트워크 I/O 폭발: 수백 대의 노드가 동시에 데이터를 교환하는 N x N (All-to-All) 통신 발생.
- 디스크 I/O (Spill): 셔플되는 중간 데이터의 크기가 메모리 용량을 초과하면, 노드의 로컬 디스크에 임시 파일을 쓰고(Write) 읽는(Read) 현상 발생.
- 직렬화 (Serialization) 오버헤드: 메모리 상의 객체를 네트워크/디스크로 보내기 위해 바이트 스트림으로 변환하는 과정에서 막대한 CPU 리소스 소모.
- 데이터 스큐 (Data Skew): 특정 키(예: null 값)에 데이터가 몰릴 경우, 한 노드에만 작업이 집중되어 전체 파이프라인 지연 초래.
03. 셔플 최적화 및 회피 전략
파이프라인 설계 시 셔플의 발생을 최소화하거나 비용을 낮추는 방법론.
1. Broadcast Join
- 개념: 조인 대상 중 한쪽 테이블이 매우 작을 때(수십 MB 이하), 셔플 대신 작은 테이블 전체를 모든 노드의 메모리에 복제(Broadcast)하는 방식.
- 코드 예시:
from pyspark.sql.functions import broadcast
# small_df가 모든 워커 노드로 복제되어 셔플 네트워크 통신 제거result_df = large_df.join(broadcast(small_df), "user_id")2. Bucketing
- 개념: 셔플이 잦은 컬럼(예: user_id)을 기준으로 데이터를 스토리지 적재 시점에 미리 분할 및 정렬
- 장점: 이후 해당 컬럼으로 조인/그룹핑 시 셔플 단계 생략 가능. (초기 쓰기 비용 트레이드오프 존재)
3. Predicate Pushdown
조인이나 그룹핑 등 셔플을 유발하는 연산 이전에 filter 조건을 먼저 적용하여, 네트워크를 타는 절대적인 데이터 양(Payload)을 줄이는것
인메모리 엔진의 역설: 내결함성(Fault Tolerance)
- Spark가 셔플 데이터를 굳이 로컬 디스크에 기록하는 본질적 이유는 내결함성(Fault Tolerance) 확보에 있음.(메모리가 충분한 경우에도)
- MPP DW의 한계: 쿼리 도중 노드 하나가 죽으면, 전체 쿼리를 실패 처리하고 처음부터 재시도함 (Low Latency 지향).
- Spark의 생존 전략: 수십 시간 단위의 대규모 배치 작업 중 노드가 죽었을 때, 전체를 재시작하는 것은 치명적임.
- Lineage와 셔플 파일: Spark는 DAG(Directed Acyclic Graph)를 통해 작업의 족보(Lineage)를 기억함. 이전 단계의 워커 노드들이 로컬 디스크에 셔플 중간 파일을 남겨두었기 때문에, 특정 노드가 죽어도 실패한 파티션 부분만 다시 계산하여 파이프라인을 복구할 수 있음.
Concept
- Shuffle : 분산 처리 시스템에서 특정 키를 기준으로 노드 간에 데이터를 재분배하는 대규모 네트워크 통신 과정.
- Narrow Dependency : 부모 파티션 하나가 자식 파티션 하나에만 영향을 주는 구조. 셔플이 발생하지 않음.
- Wide Dependency : 부모 파티션 하나가 여러 자식 파티션으로 쪼개지는 구조. 셔플이 필수적으로 발생함.
- Spill to Disk : 인메모리 연산 중 메모리 공간이 부족하여 중간 데이터를 임시로 로컬 디스크에 기록하는 현상.
- Lineage : Spark가 RDD/DataFrame의 생성 과정을 기록해 둔 논리적 실행 계획. 장애 발생 시 복구의 기준이 됨.
- AQE (Adaptive Query Execution) : Spark 3.0부터 도입된 기능으로, 런타임에 셔플 파티션 수를 동적으로 조절하거나 조인 전략을 변경하는 최적화 엔진.
Key_Takeaways
- Spark에서 셔플은 CPU, 메모리, 디스크, 네트워크 자원을 모두 고갈시키는 시스템 최대의 병목 구간임
- 파이프라인 최적화는 Broadcast Join, Bucketing 등을 통해 물리적인 셔플 데이터 양을 줄이는 데서 시작함.
- Spark는 메모리 기반 연산 엔진임에도, 긴 배치 작업 중 노드 장애에 대비하기 위해 셔플 중간 데이터를 디스크에 기록.
- 이 디스크 기록과 Lineage 추적 메커니즘 덕분에, Spark는 장애 발생 시 전체 재시작 없이 실패한 구간만 복구(Fault Tolerance)가능.
[spark]Spark Shuffling 최적화
https://yjinheon.netlify.app/posts/02de/spark/de-spark-shuffle/