1491 words
7 minutes
[DE Design Pattern]02-5. Data Compaction

05. Data Compaction#

  • Data Compaction은 다수의 작은 파일을 더 적은 수의 큰 파일로 병합하여 I/O 오버헤드를 줄이는 것
  • 보통 “Small Files Problem”이라 불림

Small Files Problem#

  • 실시간 수집 파이프라인이 스트리밍 브로커에서 오브젝트 스토어로 데이터를 쓰면, 짧은 주기마다 작은 파일이 생성됨.
  • 3개월이 지나면 수십만 개의 소형 파일이 쌓임
[문제 상황]
실행 시간의 70%: 파일 리스팅 (메타데이터 작업)
실행 시간의 30%: 실제 데이터 처리
→ 파일 수가 많을수록 리스팅, open/close I/O가 지배적
→ pay-as-you-go 환경에서 비용 낭비

Compaction은 이 문제를 다음과 같이 해결합니다.

[Compaction 전]
part-00001.parquet (1MB)
part-00002.parquet (1MB)
...
part-01000.parquet (1MB)
→ 1,000개 파일, 1,000번의 open/close
[Compaction 후]
part-00001.parquet (100MB)
part-00002.parquet (100MB)
...
part-00010.parquet (100MB)
→ 10개 파일, 10번의 open/close, 동일 데이터

Open Table Format별 Compaction#

Delta Lake: OPTIMIZE + VACUUM#

  • Delta Lake 의 compaction api는 optimize와 vacuum의 두 단계로 나뉨
from delta.tables import DeltaTable
# 01. OPTIMIZE — 작은 파일들을 큰 파일로 병합
devices_table = DeltaTable.forPath(spark_session, "s3://lake/devices")
devices_table.optimize().executeCompaction()
# 이 시점에서 새 큰 파일이 생성되었지만, 원본 작은 파일도 여전히 존재
# → 아직 스토리지 절약 효과 없음, 하지만 읽기 성능은 개선됨
# (Delta 트랜잭션 로그가 새 파일만 참조하므로)
# 02. VACUUM — 더 이상 참조되지 않는 오래된 파일 삭제
devices_table = DeltaTable.forPath(spark_session, "s3://lake/devices")
devices_table.vacuum() # 기본 retention: 7일
# 주의: retention 기간 내 파일은 삭제하지 않음
# → 너무 짧게 설정하면 아직 커밋되지 않은 파일이 삭제될 위험
# → VACUUM 후에는 해당 버전으로의 Time Travel이 불가능

OPTIMIZE는 읽기 성능 개선, VACUUM은 스토리지 비용 절감.

Apache Iceberg: rewrite_data_files#

# Apache Iceberg — rewrite_data_files 액션
# SparkActions API 사용
from pyspark.sql import SparkSession
spark.sql("""
CALL catalog.system.rewrite_data_files(
table => 'db.events',
options => map('target-file-size-bytes', '134217728') -- 128MB
)
""")
# Iceberg도 별도의 expire_snapshots로 오래된 스냅샷 정리 필요
spark.sql("""
CALL catalog.system.expire_snapshots(
table => 'db.events',
older_than => TIMESTAMP '2024-01-01 00:00:00'
)
""")

Apache Hudi: Merge-on-Read (MoR) Compaction#

Hudi는 Delta Lake, Iceberg와 근본적으로 다른 접근을 합니다.

[Hudi MoR 테이블 구조]
Columnar Storage (Parquet) ← 기본 데이터 (읽기 최적화)
+
Row Storage (Avro Log) ← 변경분 (쓰기 최적화)
[Compaction]
Row Storage의 변경분을 Columnar Storage에 병합
→ Delta Lake/Iceberg: 같은 포맷(columnar) 파일끼리 병합
→ Hudi: 이종 포맷(row → columnar) 간 병합

Hudi의 compaction은 “쓰기 최적화된 로그를 읽기 최적화된 포맷으로 변환”하는 작업

Kafka Log Compaction#

Kafka의 compaction은 기본적으로 파일 크기 최적화가 아니라 동일 키의 이전 버전을 삭제하는 것

# Kafka 토픽 생성 시 log compaction 설정
from confluent_kafka.admin import AdminClient, NewTopic
admin = AdminClient({"bootstrap.servers": "broker:9092"})
topic = NewTopic(
"user-profiles",
num_partitions=3,
config={
"cleanup.policy": "compact", # compaction 활성화
"min.compaction.lag.ms": "3600000", # 최소 1시간 후 compaction
"max.compaction.lag.ms": "86400000", # 최대 24시간 내 compaction
}
)
admin.create_topics([topic])
[Kafka Compaction 전]
Key=user1, Value={name: "A"} (offset 0)
Key=user2, Value={name: "B"} (offset 1)
Key=user1, Value={name: "C"} (offset 2) ← user1 업데이트
Key=user1, Value=null (offset 3) ← user1 삭제 (tombstone)
[Kafka Compaction 후]
Key=user2, Value={name: "B"} (offset 1) ← 최신 값만 유지
Key=user1, Value=null (offset 3) ← tombstone 유지 (일정 시간 후 삭제)
구분Lake CompactionKafka Compaction
목적파일 수 감소 → I/O 최적화동일 키의 구버전 제거 → 스토리지 절약
데이터 변경없음 (동일 데이터, 다른 물리 구조)있음 (구버전 데이터 삭제)
결정론적예 (명시적 명령)아니오 (비결정론적 스케줄)

Cost vs Performance 트레이드오프#

Compaction 빈도는 Cost와 Performance관련 설계 결정임

# 전략 1: 드물게 실행 (예: 하루 1회, 업무 외 시간)
# 낮은 컴퓨팅 비용
# compaction 전 데이터를 읽는 소비자는 여전히 느림
# 전략 2: 수집 파이프라인에 포함 (매번 실행)
# 소비자 항상 최적화된 데이터 읽기
# 수집 파이프라인 레이턴시 증가
# 전략 3: 적응형 — 파일 수 기반 트리거
def should_compact(spark: SparkSession, table_path: str, threshold: int = 500) -> bool:
"""파일 수가 임계값을 넘으면 compaction 트리거"""
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, table_path)
file_count = dt.detail().select("numFiles").collect()[0][0]
return file_count > threshold

Consistency와 Cleaning#

  • ACID를 지원하는 포맷(Delta Lake, Iceberg)에서는 compaction이 트랜잭션으로 보호되므로, 읽기/쓰기와 충돌하지 않음.
  • raw 포맷(JSON, CSV)에서는 소비자가 compaction 중인 파일과 완료된 파일을 구분할 수 없어 데이터 중복이나 누락 위험이 존재
[Compaction 라이프사이클]
OPTIMIZE → 새 큰 파일 생성 (구 파일도 존재)
VACUUM → retention 지난 구 파일 삭제
삭제된 파일 기반 Time Travel 불가

Concept

  • Small Files Problem : 다수의 소형 파일로 인해 메타데이터 작업(리스팅, open/close)이 실제 처리보다 오래 걸리는 문제
  • Compactor 패턴 : 작은 파일을 큰 파일로 병합하여 I/O 오버헤드를 줄이는 최적화 패턴
  • Delta Lake OPTIMIZE : Delta Lake의 네이티브 compaction 명령. 트랜잭션 보호 하에 파일 병합 수행
  • VACUUM : 더 이상 참조되지 않는 오래된 파일을 삭제하는 cleaning 작업. Retention 기간 내 파일은 보존
  • Apache Iceberg rewrite_data_files : Iceberg의 compaction 액션. target-file-size-bytes로 목표 파일 크기 지정
  • expire_snapshots : Iceberg의 cleaning 작업. 오래된 스냅샷과 관련 데이터 파일 삭제
  • Hudi Merge-on-Read (MoR) : Columnar 기본 데이터 + Row 변경 로그 구조. Compaction이 row→columnar 병합을 의미
  • Kafka Log Compaction : 동일 키의 구버전을 삭제하고 최신 값만 유지하는 Kafka 고유의 compaction. 비결정론적 실행
  • Tombstone : Kafka compaction에서 키 삭제를 나타내는 value=null 레코드
  • Cost vs Performance 트레이드오프 : Compaction 빈도가 높으면 읽기 성능 향상, 낮으면 컴퓨팅 비용 절감
  • Retention Threshold : VACUUM/expire_snapshots의 보존 기간. 너무 짧으면 활성 파일 삭제 위험, 너무 길면 스토리지 낭비

[DE Design Pattern]02-5. Data Compaction
https://yjinheon.netlify.app/posts/02de/de-design-pattern/02-data-ingestion/02-di-05-data_compaction/
Author
Datamind
Published at
2025-01-31
License
CC BY-NC-SA 4.0