1491 words
7 minutes
[DE Design Pattern]02-5. Data Compaction
05. Data Compaction
- Data Compaction은 다수의 작은 파일을 더 적은 수의 큰 파일로 병합하여 I/O 오버헤드를 줄이는 것
- 보통 “Small Files Problem”이라 불림
Small Files Problem
- 실시간 수집 파이프라인이 스트리밍 브로커에서 오브젝트 스토어로 데이터를 쓰면, 짧은 주기마다 작은 파일이 생성됨.
- 3개월이 지나면 수십만 개의 소형 파일이 쌓임
[문제 상황]실행 시간의 70%: 파일 리스팅 (메타데이터 작업)실행 시간의 30%: 실제 데이터 처리
→ 파일 수가 많을수록 리스팅, open/close I/O가 지배적→ pay-as-you-go 환경에서 비용 낭비Compaction은 이 문제를 다음과 같이 해결합니다.
[Compaction 전]part-00001.parquet (1MB)part-00002.parquet (1MB)...part-01000.parquet (1MB)→ 1,000개 파일, 1,000번의 open/close
[Compaction 후]part-00001.parquet (100MB)part-00002.parquet (100MB)...part-00010.parquet (100MB)→ 10개 파일, 10번의 open/close, 동일 데이터Open Table Format별 Compaction
Delta Lake: OPTIMIZE + VACUUM
- Delta Lake 의 compaction api는 optimize와 vacuum의 두 단계로 나뉨
from delta.tables import DeltaTable
# 01. OPTIMIZE — 작은 파일들을 큰 파일로 병합devices_table = DeltaTable.forPath(spark_session, "s3://lake/devices")devices_table.optimize().executeCompaction()
# 이 시점에서 새 큰 파일이 생성되었지만, 원본 작은 파일도 여전히 존재# → 아직 스토리지 절약 효과 없음, 하지만 읽기 성능은 개선됨# (Delta 트랜잭션 로그가 새 파일만 참조하므로)# 02. VACUUM — 더 이상 참조되지 않는 오래된 파일 삭제devices_table = DeltaTable.forPath(spark_session, "s3://lake/devices")devices_table.vacuum() # 기본 retention: 7일
# 주의: retention 기간 내 파일은 삭제하지 않음# → 너무 짧게 설정하면 아직 커밋되지 않은 파일이 삭제될 위험# → VACUUM 후에는 해당 버전으로의 Time Travel이 불가능OPTIMIZE는 읽기 성능 개선, VACUUM은 스토리지 비용 절감.
Apache Iceberg: rewrite_data_files
# Apache Iceberg — rewrite_data_files 액션# SparkActions API 사용from pyspark.sql import SparkSession
spark.sql(""" CALL catalog.system.rewrite_data_files( table => 'db.events', options => map('target-file-size-bytes', '134217728') -- 128MB )""")
# Iceberg도 별도의 expire_snapshots로 오래된 스냅샷 정리 필요spark.sql(""" CALL catalog.system.expire_snapshots( table => 'db.events', older_than => TIMESTAMP '2024-01-01 00:00:00' )""")Apache Hudi: Merge-on-Read (MoR) Compaction
Hudi는 Delta Lake, Iceberg와 근본적으로 다른 접근을 합니다.
[Hudi MoR 테이블 구조]Columnar Storage (Parquet) ← 기본 데이터 (읽기 최적화) +Row Storage (Avro Log) ← 변경분 (쓰기 최적화)
[Compaction]Row Storage의 변경분을 Columnar Storage에 병합→ Delta Lake/Iceberg: 같은 포맷(columnar) 파일끼리 병합→ Hudi: 이종 포맷(row → columnar) 간 병합Hudi의 compaction은 “쓰기 최적화된 로그를 읽기 최적화된 포맷으로 변환”하는 작업
Kafka Log Compaction
Kafka의 compaction은 기본적으로 파일 크기 최적화가 아니라 동일 키의 이전 버전을 삭제하는 것
# Kafka 토픽 생성 시 log compaction 설정from confluent_kafka.admin import AdminClient, NewTopic
admin = AdminClient({"bootstrap.servers": "broker:9092"})
topic = NewTopic( "user-profiles", num_partitions=3, config={ "cleanup.policy": "compact", # compaction 활성화 "min.compaction.lag.ms": "3600000", # 최소 1시간 후 compaction "max.compaction.lag.ms": "86400000", # 최대 24시간 내 compaction })admin.create_topics([topic])[Kafka Compaction 전]Key=user1, Value={name: "A"} (offset 0)Key=user2, Value={name: "B"} (offset 1)Key=user1, Value={name: "C"} (offset 2) ← user1 업데이트Key=user1, Value=null (offset 3) ← user1 삭제 (tombstone)
[Kafka Compaction 후]Key=user2, Value={name: "B"} (offset 1) ← 최신 값만 유지Key=user1, Value=null (offset 3) ← tombstone 유지 (일정 시간 후 삭제)| 구분 | Lake Compaction | Kafka Compaction |
|---|---|---|
| 목적 | 파일 수 감소 → I/O 최적화 | 동일 키의 구버전 제거 → 스토리지 절약 |
| 데이터 변경 | 없음 (동일 데이터, 다른 물리 구조) | 있음 (구버전 데이터 삭제) |
| 결정론적 | 예 (명시적 명령) | 아니오 (비결정론적 스케줄) |
Cost vs Performance 트레이드오프
Compaction 빈도는 Cost와 Performance관련 설계 결정임
# 전략 1: 드물게 실행 (예: 하루 1회, 업무 외 시간)# 낮은 컴퓨팅 비용# compaction 전 데이터를 읽는 소비자는 여전히 느림
# 전략 2: 수집 파이프라인에 포함 (매번 실행)# 소비자 항상 최적화된 데이터 읽기# 수집 파이프라인 레이턴시 증가
# 전략 3: 적응형 — 파일 수 기반 트리거def should_compact(spark: SparkSession, table_path: str, threshold: int = 500) -> bool: """파일 수가 임계값을 넘으면 compaction 트리거""" from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, table_path) file_count = dt.detail().select("numFiles").collect()[0][0] return file_count > thresholdConsistency와 Cleaning
- ACID를 지원하는 포맷(Delta Lake, Iceberg)에서는 compaction이 트랜잭션으로 보호되므로, 읽기/쓰기와 충돌하지 않음.
- raw 포맷(JSON, CSV)에서는 소비자가 compaction 중인 파일과 완료된 파일을 구분할 수 없어 데이터 중복이나 누락 위험이 존재
[Compaction 라이프사이클]OPTIMIZE → 새 큰 파일 생성 (구 파일도 존재) ↓VACUUM → retention 지난 구 파일 삭제 ↓ 삭제된 파일 기반 Time Travel 불가Concept
- Small Files Problem : 다수의 소형 파일로 인해 메타데이터 작업(리스팅, open/close)이 실제 처리보다 오래 걸리는 문제
- Compactor 패턴 : 작은 파일을 큰 파일로 병합하여 I/O 오버헤드를 줄이는 최적화 패턴
- Delta Lake OPTIMIZE : Delta Lake의 네이티브 compaction 명령. 트랜잭션 보호 하에 파일 병합 수행
- VACUUM : 더 이상 참조되지 않는 오래된 파일을 삭제하는 cleaning 작업. Retention 기간 내 파일은 보존
- Apache Iceberg rewrite_data_files : Iceberg의 compaction 액션. target-file-size-bytes로 목표 파일 크기 지정
- expire_snapshots : Iceberg의 cleaning 작업. 오래된 스냅샷과 관련 데이터 파일 삭제
- Hudi Merge-on-Read (MoR) : Columnar 기본 데이터 + Row 변경 로그 구조. Compaction이 row→columnar 병합을 의미
- Kafka Log Compaction : 동일 키의 구버전을 삭제하고 최신 값만 유지하는 Kafka 고유의 compaction. 비결정론적 실행
- Tombstone : Kafka compaction에서 키 삭제를 나타내는 value=null 레코드
- Cost vs Performance 트레이드오프 : Compaction 빈도가 높으면 읽기 성능 향상, 낮으면 컴퓨팅 비용 절감
- Retention Threshold : VACUUM/expire_snapshots의 보존 기간. 너무 짧으면 활성 파일 삭제 위험, 너무 길면 스토리지 낭비
[DE Design Pattern]02-5. Data Compaction
https://yjinheon.netlify.app/posts/02de/de-design-pattern/02-data-ingestion/02-di-05-data_compaction/