1273 words
6 minutes
[DE Design Pattern]02-2. Incremental Load
02. Incremental Load Pattern
- Incremental Load는 마지막 실행 이후 변경/추가된 데이터만 가져오는 패턴.
Delta Column 기반 구현
from pyspark.sql import SparkSession, functions as F
def incremental_load_delta_column( spark: SparkSession, input_path: str, output_path: str, date_from: str, # e.g. "2024-01-01 10:00:00" date_to: str # e.g. "2024-01-01 11:00:00"): """ Delta Column 기반 Incremental Loader. """ raw_data = spark.read.text(input_path)
parsed = raw_data.select( F.from_json(F.col("value"), "id STRING, ingestion_time TIMESTAMP, payload STRING") .alias("data") ).select("data.*")
# 핵심: 시간 범위(Ingestion Window)로 필터링 incremental = parsed.filter( f'ingestion_time BETWEEN "{date_from}" AND "{date_to}"' )
incremental.write.mode("append").parquet(output_path)- 핵심은
BETWEEN절로 Ingestion Window를 제한하는 것 ingestion_time > last_run_time으로 할 경우, 백필 시 전체 데이터를 한 번에 가져오게 되어 Full Load와 다를 바 없어짐
Partition 기반 구현
- 소스 데이터가 이미 시간 기반 파티션으로 물리적으로 분리되어 있을 때 사용
# Partition 기반 Incremental Loaderfrom airflow import DAGfrom airflow.sensors.filesystem import FileSensorfrom airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
with DAG("incremental_partition_loader", schedule_interval="@hourly") as dag:
# 1) 파티션 존재 여부 확인 (Readiness) wait_for_partition = FileSensor( task_id="wait_for_partition", filepath="/data/input/date={{ ds }}/hour={{ execution_date.hour }}", mode="reschedule", # 워커 슬롯을 점유하지 않음 )
# 2) 해당 파티션만 적재 load_partition = SparkKubernetesOperator( task_id="load_partition", application_file="load_job_spec.yaml", # arguments에 {{ ds }} 같은 immutable 매크로 사용 )
wait_for_partition >> load_partition두 방식의 결정적 차이를 정리하면:
| 구분 | Delta Column | Partition 기반 |
|---|---|---|
| 상태 관리 | 마지막 ingestion_time 저장 필요 | 불필요 (실행 시간에서 추론) |
| 백필 | Window 제한 없으면 Full Load화 | 파티션 단위로 자연스럽게 분리 |
| 소스 요구사항 | 델타 컬럼 존재 | 시간 기반 파티션 구조 |
| 동시 백필 | Window 제한 시 가능 | 파티션별 독립 실행 가능 |
3. Hard Delete 문제
근본적 한계는 물리적 삭제를 감지할 수 없다는 것
# 소스 DB: row_id=5가 DELETE됨 → 해당 행이 물리적으로 사라짐# 타겟 DB: row_id=5가 여전히 존재 → 소스와 불일치
# 해결 1: Soft Delete — 프로듀서가 DELETE 대신 UPDATE 사용# UPDATE devices SET is_deleted = true, updated_at = NOW() WHERE id = 5
def incremental_load_with_soft_delete(spark, input_path, output_path, date_from, date_to): """Soft Delete가 적용된 소스에서 증분 로드""" incremental = ( spark.read.parquet(input_path) .filter(f'updated_at BETWEEN "{date_from}" AND "{date_to}"') )
# is_deleted=true인 행도 함께 가져옴 → 타겟에서 삭제 반영 가능 incremental.write.mode("append").parquet(output_path)
# 해결 2: Insert-Only (Append-Only) 테이블# 모든 변경을 INSERT로 기록하고, 소비자가 최신 상태를 재구성# | id | action | updated_at |# | 5 | created | 10:00 |# | 5 | updated | 10:30 |# | 5 | deleted | 11:00 | ← DELETE도 INSERT로 기록4. Backfilling과 Ingestion Window
백필 시 Delta Column 방식이 특히 위험
def safe_backfill_with_window( spark: SparkSession, input_path: str, output_path: str, date_from: str, date_to: str, window_hours: int = 1): """ Ingestion Window를 제한하여 백필 시에도 안정적인 데이터 볼륨 유지.
핵심: delta_column BETWEEN ingestion_time AND ingestion_time + INTERVAL '1 HOUR' → 백필이든 정상 실행이든 항상 동일한 크기의 데이터만 처리 → 여러 백필 잡을 동시에 실행 """ data = spark.read.parquet(input_path)
windowed = data.filter( f'ingestion_time BETWEEN "{date_from}" AND "{date_to}"' )
windowed.write.mode("append").parquet(output_path)
# 백필 실행 예시: 각 시간대별로 병렬 실행 가능# safe_backfill_with_window(spark, path, out, "2024-01-01 00:00", "2024-01-01 01:00")# safe_backfill_with_window(spark, path, out, "2024-01-01 01:00", "2024-01-01 02:00")# safe_backfill_with_window(spark, path, out, "2024-01-01 02:00", "2024-01-01 03:00")Event Time을 Delta Column으로 쓸 때의 주의점
ingestion_time대신event_time을 델타 컬럼으로 쓰면 Late Data 문제가 발생함.- 이벤트가 실제 발생 시간보다 늦게 도착하면, 이미 처리 완료된 시간 범위에 속하므로 로더에서 누락시킴
- 가능하면
ingestion_time(시스템이 데이터를 받은 시간)을 델타 컬럼으로 사용하는 것이 안전
Concept
- Incremental Load : 마지막 실행 이후 변경/추가된 데이터만 적재하는 패턴. 대규모, 지속 증가 데이터셋에 적합
- Delta Column : 행의 변경 시점을 나타내는 컬럼(ingestion_time, updated_at). 증분 로드의 필터 조건으로 사용
- Ingestion Window : 한 번의 실행에서 처리할 시간 범위를 명시적으로 제한하는 기법. 백필 시 데이터 볼륨 폭발 방지 및 병렬 실행 가능
- Partition 기반 로드 : 시간 파티션 구조를 활용해 실행 시간에서 처리 대상을 암묵적으로 결정하는 방식. 상태 관리 불필요
- Hard Delete : 물리적 행 삭제. Incremental Loader가 감지할 수 없는 근본적 한계
- Soft Delete : 물리 삭제 대신
is_deleted플래그로 삭제를 표현하는 방식. DELETE → UPDATE로 전환 - Insert-Only (Append-Only) Table : 모든 변경(생성/수정/삭제)을 INSERT로 기록하는 테이블. 소비자가 최신 상태를 재구성해야 함
- Backfilling : 과거 데이터를 재처리하는 작업. Window 제한 없으면 Incremental이 Full Load화되는 위험
- Late Data : 이벤트 발생 시간보다 늦게 도착하는 데이터. event_time을 델타 컬럼으로 쓰면 누락 위험
- Immutable Execution Time : Airflow의
{{ ds }}같은 실행 시간 매크로. 백필 시에도 변하지 않아 재현성 보장
추가검토
- Airflow의 execution_date vs logical_date : Airflow 2.x에서 실행 시간 개념의 변화와 백필에 미치는 영향
- Watermark (워터마크) : Spark Structured Streaming에서 Late Data를 처리하는 메커니즘
[DE Design Pattern]02-2. Incremental Load
https://yjinheon.netlify.app/posts/02de/de-design-pattern/02-data-ingestion/02-di-02-incremental_load/