1290 words
6 minutes
[de design pattern]03-04. static, dynamic late data integrator
4. Static & Dynamic Late Data Integrator 패턴
핵심 문제
- 이커머스 주문 데이터처럼 손실이 허용되지 않는 경우에 감지된 지연 데이터를 실제로 기존 데이터셋에 어떻게 통합할 것인가의 문제
Pattern A: Static Late Data Integrator
개념
고정된 lookback window를 정의하여 매 실행마다 현재 파티션 + 과거 N일치를 함께 처리
ex) 파이프라인이 2024-12-31 실행, lookback window = 14일이면 → 2024-12-17 ~ 2024-12-30 파티션도 함께 재처리.
파이프라인 배치 전략
- 전략 1 (late data 먼저 → 현재 처리): stateful 파이프라인에 적합. 과거 데이터가 올바라야 현재 결과도 올바르므로 late data 통합이 선행되어야 함
- 전략 2 (현재 처리 → late data 후속): 현재 데이터를 빨리 전달하고 싶을 때.
- 전략 3 (병렬 처리): stateless 파이프라인에서 현재와 late data를 동시에 처리하여 전체 시간 단축.
구현: Airflow Dynamic Task Mapping
# 고정 lookback window(2일)로 backfill 대상 날짜 생성@taskdef generate_backfilling_runs(): dr: DagRun = get_current_context()['dag_run'] days_to_backfill = 2 start_date = dr.execution_date - datetime.timedelta(days=days_to_backfill)
backfilling_dates = [] for days_to_add in range(0, days_to_backfill): date = start_date + datetime.timedelta(days=days_to_add) backfilling_dates.append(date.date().isoformat()) return backfilling_dates# 각 날짜에 대해 동적으로 태스크 생성@taskdef integrate_late_data(late_date: str): copy_file(late_date)
# expand()로 날짜 수만큼 태스크 자동 생성integrate_late_data.expand(late_date=generate_backfilling_runs())# 전체 워크플로우: 현재 로드 → backfill 날짜 생성 → 날짜별 통합backfilling_runs_generator = generate_backfilling_runs()
(file_to_load_sensor >> load_current_file() >> backfilling_runs_generator >> integrate_late_data.expand(late_date=backfilling_runs_generator))핵심: expand()가 리스트 길이만큼 태스크를 동적으로 생성. lookback window가 2일이면 항상 2개 태스크
Static의 주요 Consequences
- Overlapping backfill 주의: lookback=4일, 10/10~10/12 세 번 실행을 모두 backfill하면 같은 날짜가 중복 처리됨. 실제로는 가장 마지막(10/12) 실행만 재실행하면 됨
- 리소스 낭비: 고정 window이므로 late data가 없는 파티션도 매번 재처리
- 파이프라인 트리거 제약: backfill 태스크는 반드시 메인 파이프라인 내부에서 실행해야함. 별도 파이프라인으로 트리거할 경우 overlapping 문제가 발생
Pattern B: Dynamic Late Data Integrator
개념
Static의 리소스 낭비를 해결합니다. 실제로 late data가 있는 파티션만 동적으로 선택해 재처리합니다. 이를 위해 각 파티션의 “마지막 처리 시간”과 “마지막 업데이트 시간”을 추적하는 state table 필요
State Table 구조
| Partition | Last processed time | Last update time |
|---|---|---|
| 2024-12-17 | 2024-12-17T10:20 | 2024-12-17T03:00 |
| 2024-12-18 | 2024-12-18T09:55 | 2024-12-20T10:12 |
핵심 쿼리
-- late data가 있는 파티션만 조회SELECT partition FROM state_tableWHERE `Last update time` > `Last processed time` AND `Partition` < `Processed partition`핵심: update time > processed time이면 마지막 처리 이후 새 데이터가 들어왔다는 뜻. Static처럼 고정 N일이 아니라 실제 변경이 있는 파티션만 대상.
동시성 문제 해결
파이프라인이 병렬 실행되면 같은 파티션을 중복 backfill할 수 있습니다.
해결책: state table에 Is processed 컬럼을 추가
-- 동시성 보호: 이미 처리 중인 파티션 제외SELECT partition FROM state_tableWHERE `Last update time` > `Last processed time` AND `Partition` < `Processed partition` AND `Is processed` = false# Airflow - depends_on_past로 순차 실행 보장 (핵심 태스크만)with DAG('devices_loader', max_active_runs=5, default_args={'depends_on_past': False}) as dag:
# 이 태스크는 이전 실행 성공 후에만 실행 processing_marker = SparkKubernetesOperator( task_id='mark_partition_as_being_processed', depends_on_past=True # 순차 실행 강제 )
backfill_creation_job = SparkKubernetesOperator( task_id='get_late_partitions_and_mark_them_as_processed', depends_on_past=True # 순차 실행 강제 )핵심: 전체 DAG는 max_active_runs=5로 병렬 가능하지만, 핵심 태스크만 depends_on_past=True로 순차 실행(race condition 방지)
Dynamic의 주요 Consequences
- Stateful 파이프라인 + 매우 늦은 데이터: 마지막 실행이 10/20이고 9/21에 late data가 감지되면, stateful 잡은 9/21~10/20 전체 순차 재생성 필요 이 경우 dynamic이라도 accepted lookback window 상한을 두는 것이 현실적
- 스케줄링 복잡도: 파티션별 last update time을 얻는 것 자체가 스토리지 기술의 내부 구현에 의존하므로 구현 난이도가 높음
Concept
- Static Late Data Integrator : 고정된 lookback window로 매 실행마다 과거 N일치를 무조건 재처리하는 패턴
- Dynamic Late Data Integrator : state table 기반으로 실제 late data가 있는 파티션만 선택적으로 재처리하는 패턴
- Lookback Window : late data 통합을 위해 과거로 돌아가는 시간 범위. static은 고정, dynamic은 가변
- State Table : 파티션별 last processed time, last update time을 추적하는 메타데이터 테이블
- Dynamic Task Mapping : Airflow에서 런타임에 리스트 길이만큼 태스크를 동적으로 생성하는 기능 (expand())
- depends_on_past : Airflow 태스크 속성. 이전 실행이 성공해야 다음 실행이 시작되는 순차 실행 보장
- Overlapping Backfill : static lookback window에서 여러 실행을 동시에 backfill할 때 같은 날짜가 중복 처리되는 문제
- Is processed 플래그 : dynamic 패턴에서 동시성 문제를 방지하기 위해 state table에 추가하는 상태 컬럼
- Snowball Backfilling Effect : late data 통합이 downstream consumer 체인 전체로 연쇄 backfill을 유발하는 현상
[de design pattern]03-04. static, dynamic late data integrator
https://yjinheon.netlify.app/posts/02de/de-design-pattern/03-dead-letter/03-dl-04-static-dynamic-late_date_intergator/