1273 words
6 minutes
[DE Design Pattern]02-2. Incremental Load

02. Incremental Load Pattern#

  • Incremental Load는 마지막 실행 이후 변경/추가된 데이터만 가져오는 패턴.

Delta Column 기반 구현#

from pyspark.sql import SparkSession, functions as F
def incremental_load_delta_column(
spark: SparkSession,
input_path: str,
output_path: str,
date_from: str, # e.g. "2024-01-01 10:00:00"
date_to: str # e.g. "2024-01-01 11:00:00"
):
"""
Delta Column 기반 Incremental Loader.
"""
raw_data = spark.read.text(input_path)
parsed = raw_data.select(
F.from_json(F.col("value"), "id STRING, ingestion_time TIMESTAMP, payload STRING")
.alias("data")
).select("data.*")
# 핵심: 시간 범위(Ingestion Window)로 필터링
incremental = parsed.filter(
f'ingestion_time BETWEEN "{date_from}" AND "{date_to}"'
)
incremental.write.mode("append").parquet(output_path)
  • 핵심은 BETWEEN 절로 Ingestion Window를 제한하는 것
  • ingestion_time > last_run_time으로 할 경우, 백필 시 전체 데이터를 한 번에 가져오게 되어 Full Load와 다를 바 없어짐

Partition 기반 구현#

  • 소스 데이터가 이미 시간 기반 파티션으로 물리적으로 분리되어 있을 때 사용
# Partition 기반 Incremental Loader
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
with DAG("incremental_partition_loader", schedule_interval="@hourly") as dag:
# 1) 파티션 존재 여부 확인 (Readiness)
wait_for_partition = FileSensor(
task_id="wait_for_partition",
filepath="/data/input/date={{ ds }}/hour={{ execution_date.hour }}",
mode="reschedule", # 워커 슬롯을 점유하지 않음
)
# 2) 해당 파티션만 적재
load_partition = SparkKubernetesOperator(
task_id="load_partition",
application_file="load_job_spec.yaml",
# arguments에 {{ ds }} 같은 immutable 매크로 사용
)
wait_for_partition >> load_partition

두 방식의 결정적 차이를 정리하면:

구분Delta ColumnPartition 기반
상태 관리마지막 ingestion_time 저장 필요불필요 (실행 시간에서 추론)
백필Window 제한 없으면 Full Load화파티션 단위로 자연스럽게 분리
소스 요구사항델타 컬럼 존재시간 기반 파티션 구조
동시 백필Window 제한 시 가능파티션별 독립 실행 가능

3. Hard Delete 문제#

근본적 한계는 물리적 삭제를 감지할 수 없다는 것

# 소스 DB: row_id=5가 DELETE됨 → 해당 행이 물리적으로 사라짐
# 타겟 DB: row_id=5가 여전히 존재 → 소스와 불일치
# 해결 1: Soft Delete — 프로듀서가 DELETE 대신 UPDATE 사용
# UPDATE devices SET is_deleted = true, updated_at = NOW() WHERE id = 5
def incremental_load_with_soft_delete(spark, input_path, output_path, date_from, date_to):
"""Soft Delete가 적용된 소스에서 증분 로드"""
incremental = (
spark.read.parquet(input_path)
.filter(f'updated_at BETWEEN "{date_from}" AND "{date_to}"')
)
# is_deleted=true인 행도 함께 가져옴 → 타겟에서 삭제 반영 가능
incremental.write.mode("append").parquet(output_path)
# 해결 2: Insert-Only (Append-Only) 테이블
# 모든 변경을 INSERT로 기록하고, 소비자가 최신 상태를 재구성
# | id | action | updated_at |
# | 5 | created | 10:00 |
# | 5 | updated | 10:30 |
# | 5 | deleted | 11:00 | ← DELETE도 INSERT로 기록

4. Backfilling과 Ingestion Window#

백필 시 Delta Column 방식이 특히 위험

def safe_backfill_with_window(
spark: SparkSession,
input_path: str,
output_path: str,
date_from: str,
date_to: str,
window_hours: int = 1
):
"""
Ingestion Window를 제한하여 백필 시에도 안정적인 데이터 볼륨 유지.
핵심: delta_column BETWEEN ingestion_time AND ingestion_time + INTERVAL '1 HOUR'
→ 백필이든 정상 실행이든 항상 동일한 크기의 데이터만 처리
→ 여러 백필 잡을 동시에 실행
"""
data = spark.read.parquet(input_path)
windowed = data.filter(
f'ingestion_time BETWEEN "{date_from}" AND "{date_to}"'
)
windowed.write.mode("append").parquet(output_path)
# 백필 실행 예시: 각 시간대별로 병렬 실행 가능
# safe_backfill_with_window(spark, path, out, "2024-01-01 00:00", "2024-01-01 01:00")
# safe_backfill_with_window(spark, path, out, "2024-01-01 01:00", "2024-01-01 02:00")
# safe_backfill_with_window(spark, path, out, "2024-01-01 02:00", "2024-01-01 03:00")

Event Time을 Delta Column으로 쓸 때의 주의점#

  • ingestion_time 대신 event_time을 델타 컬럼으로 쓰면 Late Data 문제가 발생함.
  • 이벤트가 실제 발생 시간보다 늦게 도착하면, 이미 처리 완료된 시간 범위에 속하므로 로더에서 누락시킴
  • 가능하면 ingestion_time(시스템이 데이터를 받은 시간)을 델타 컬럼으로 사용하는 것이 안전

Concept

  • Incremental Load : 마지막 실행 이후 변경/추가된 데이터만 적재하는 패턴. 대규모, 지속 증가 데이터셋에 적합
  • Delta Column : 행의 변경 시점을 나타내는 컬럼(ingestion_time, updated_at). 증분 로드의 필터 조건으로 사용
  • Ingestion Window : 한 번의 실행에서 처리할 시간 범위를 명시적으로 제한하는 기법. 백필 시 데이터 볼륨 폭발 방지 및 병렬 실행 가능
  • Partition 기반 로드 : 시간 파티션 구조를 활용해 실행 시간에서 처리 대상을 암묵적으로 결정하는 방식. 상태 관리 불필요
  • Hard Delete : 물리적 행 삭제. Incremental Loader가 감지할 수 없는 근본적 한계
  • Soft Delete : 물리 삭제 대신 is_deleted 플래그로 삭제를 표현하는 방식. DELETE → UPDATE로 전환
  • Insert-Only (Append-Only) Table : 모든 변경(생성/수정/삭제)을 INSERT로 기록하는 테이블. 소비자가 최신 상태를 재구성해야 함
  • Backfilling : 과거 데이터를 재처리하는 작업. Window 제한 없으면 Incremental이 Full Load화되는 위험
  • Late Data : 이벤트 발생 시간보다 늦게 도착하는 데이터. event_time을 델타 컬럼으로 쓰면 누락 위험
  • Immutable Execution Time : Airflow의 {{ ds }} 같은 실행 시간 매크로. 백필 시에도 변하지 않아 재현성 보장

추가검토

  • Airflow의 execution_date vs logical_date : Airflow 2.x에서 실행 시간 개념의 변화와 백필에 미치는 영향
  • Watermark (워터마크) : Spark Structured Streaming에서 Late Data를 처리하는 메커니즘
[DE Design Pattern]02-2. Incremental Load
https://yjinheon.netlify.app/posts/02de/de-design-pattern/02-data-ingestion/02-di-02-incremental_load/
Author
Datamind
Published at
2025-01-31
License
CC BY-NC-SA 4.0