1290 words
6 minutes
[de design pattern]03-04. static, dynamic late data integrator

4. Static & Dynamic Late Data Integrator 패턴#

핵심 문제#

  • 이커머스 주문 데이터처럼 손실이 허용되지 않는 경우에 감지된 지연 데이터를 실제로 기존 데이터셋에 어떻게 통합할 것인가의 문제

Pattern A: Static Late Data Integrator#

개념#

고정된 lookback window를 정의하여 매 실행마다 현재 파티션 + 과거 N일치를 함께 처리

ex) 파이프라인이 2024-12-31 실행, lookback window = 14일이면 → 2024-12-17 ~ 2024-12-30 파티션도 함께 재처리.

파이프라인 배치 전략#

  • 전략 1 (late data 먼저 → 현재 처리): stateful 파이프라인에 적합. 과거 데이터가 올바라야 현재 결과도 올바르므로 late data 통합이 선행되어야 함
  • 전략 2 (현재 처리 → late data 후속): 현재 데이터를 빨리 전달하고 싶을 때.
  • 전략 3 (병렬 처리): stateless 파이프라인에서 현재와 late data를 동시에 처리하여 전체 시간 단축.

구현: Airflow Dynamic Task Mapping#

# 고정 lookback window(2일)로 backfill 대상 날짜 생성
@task
def generate_backfilling_runs():
dr: DagRun = get_current_context()['dag_run']
days_to_backfill = 2
start_date = dr.execution_date - datetime.timedelta(days=days_to_backfill)
backfilling_dates = []
for days_to_add in range(0, days_to_backfill):
date = start_date + datetime.timedelta(days=days_to_add)
backfilling_dates.append(date.date().isoformat())
return backfilling_dates
# 각 날짜에 대해 동적으로 태스크 생성
@task
def integrate_late_data(late_date: str):
copy_file(late_date)
# expand()로 날짜 수만큼 태스크 자동 생성
integrate_late_data.expand(late_date=generate_backfilling_runs())
# 전체 워크플로우: 현재 로드 → backfill 날짜 생성 → 날짜별 통합
backfilling_runs_generator = generate_backfilling_runs()
(file_to_load_sensor >> load_current_file() >> backfilling_runs_generator >>
integrate_late_data.expand(late_date=backfilling_runs_generator))

핵심: expand()가 리스트 길이만큼 태스크를 동적으로 생성. lookback window가 2일이면 항상 2개 태스크

Static의 주요 Consequences#

  • Overlapping backfill 주의: lookback=4일, 10/10~10/12 세 번 실행을 모두 backfill하면 같은 날짜가 중복 처리됨. 실제로는 가장 마지막(10/12) 실행만 재실행하면 됨
  • 리소스 낭비: 고정 window이므로 late data가 없는 파티션도 매번 재처리
  • 파이프라인 트리거 제약: backfill 태스크는 반드시 메인 파이프라인 내부에서 실행해야함. 별도 파이프라인으로 트리거할 경우 overlapping 문제가 발생

Pattern B: Dynamic Late Data Integrator#

개념#

Static의 리소스 낭비를 해결합니다. 실제로 late data가 있는 파티션만 동적으로 선택해 재처리합니다. 이를 위해 각 파티션의 “마지막 처리 시간”과 “마지막 업데이트 시간”을 추적하는 state table 필요

State Table 구조#

PartitionLast processed timeLast update time
2024-12-172024-12-17T10:202024-12-17T03:00
2024-12-182024-12-18T09:552024-12-20T10:12

핵심 쿼리#

-- late data가 있는 파티션만 조회
SELECT partition FROM state_table
WHERE `Last update time` > `Last processed time`
AND `Partition` < `Processed partition`

핵심: update time > processed time이면 마지막 처리 이후 새 데이터가 들어왔다는 뜻. Static처럼 고정 N일이 아니라 실제 변경이 있는 파티션만 대상.

동시성 문제 해결#

파이프라인이 병렬 실행되면 같은 파티션을 중복 backfill할 수 있습니다.

해결책: state table에 Is processed 컬럼을 추가

-- 동시성 보호: 이미 처리 중인 파티션 제외
SELECT partition FROM state_table
WHERE `Last update time` > `Last processed time`
AND `Partition` < `Processed partition`
AND `Is processed` = false
# Airflow - depends_on_past로 순차 실행 보장 (핵심 태스크만)
with DAG('devices_loader', max_active_runs=5,
default_args={'depends_on_past': False}) as dag:
# 이 태스크는 이전 실행 성공 후에만 실행
processing_marker = SparkKubernetesOperator(
task_id='mark_partition_as_being_processed',
depends_on_past=True # 순차 실행 강제
)
backfill_creation_job = SparkKubernetesOperator(
task_id='get_late_partitions_and_mark_them_as_processed',
depends_on_past=True # 순차 실행 강제
)

핵심: 전체 DAG는 max_active_runs=5로 병렬 가능하지만, 핵심 태스크만 depends_on_past=True로 순차 실행(race condition 방지)

Dynamic의 주요 Consequences#

  • Stateful 파이프라인 + 매우 늦은 데이터: 마지막 실행이 10/20이고 9/21에 late data가 감지되면, stateful 잡은 9/21~10/20 전체 순차 재생성 필요 이 경우 dynamic이라도 accepted lookback window 상한을 두는 것이 현실적
  • 스케줄링 복잡도: 파티션별 last update time을 얻는 것 자체가 스토리지 기술의 내부 구현에 의존하므로 구현 난이도가 높음

Concept

  • Static Late Data Integrator : 고정된 lookback window로 매 실행마다 과거 N일치를 무조건 재처리하는 패턴
  • Dynamic Late Data Integrator : state table 기반으로 실제 late data가 있는 파티션만 선택적으로 재처리하는 패턴
  • Lookback Window : late data 통합을 위해 과거로 돌아가는 시간 범위. static은 고정, dynamic은 가변
  • State Table : 파티션별 last processed time, last update time을 추적하는 메타데이터 테이블
  • Dynamic Task Mapping : Airflow에서 런타임에 리스트 길이만큼 태스크를 동적으로 생성하는 기능 (expand())
  • depends_on_past : Airflow 태스크 속성. 이전 실행이 성공해야 다음 실행이 시작되는 순차 실행 보장
  • Overlapping Backfill : static lookback window에서 여러 실행을 동시에 backfill할 때 같은 날짜가 중복 처리되는 문제
  • Is processed 플래그 : dynamic 패턴에서 동시성 문제를 방지하기 위해 state table에 추가하는 상태 컬럼
  • Snowball Backfilling Effect : late data 통합이 downstream consumer 체인 전체로 연쇄 backfill을 유발하는 현상

[de design pattern]03-04. static, dynamic late data integrator
https://yjinheon.netlify.app/posts/02de/de-design-pattern/03-dead-letter/03-dl-04-static-dynamic-late_date_intergator/
Author
Datamind
Published at
2025-02-24
License
CC BY-NC-SA 4.0