1902 words
10 minutes
[DE Design Pattern]02-6. Data Readiness

06. Data Readiness & External Trigger#

기본적으로 데이터를 언제 수집을 시작할 지에 대한 패턴

Readiness Marker 패턴#

데이터 완성 시점을 어떻게 알릴 것인가의 문제

Flag File 방식#

데이터 생성이 완료되면 마커 파일을 생성하고 consumer는 이 파일의 존재 여부로 처리 시작을 판단

# [producer] Apache Spark는 _SUCCESS 파일을 자동 생성
dataset = spark.read.schema("...").json(input_path)
dataset.write.mode("overwrite").format("parquet").save(output_path)
# → output_path/_SUCCESS 파일이 자동 생성됨
# [consumer] Airflow FileSensor로 _SUCCESS 대기
from airflow.sensors.filesystem import FileSensor
wait_for_data = FileSensor(
task_id="wait_for_data",
filepath=f"{input_data_path}/_SUCCESS",
mode="reschedule", # 핵심: 워커 슬롯을 점유하지 않음
)
  • 기본값 poke는 센서가 워커 슬롯을 계속 점유하면서 주기적으로 확인
  • reschedule은 확인 후 슬롯을 반환하고, 다음 확인 시점에 다시 스케줄링
[poke 모드]
워커 슬롯 점유: ████████████████████ (파일 존재할 때까지 계속)
→ 다른 태스크가 실행 불가
[reschedule 모드]
워커 슬롯 점유: ██░░░░██░░░░██░░░░██ (확인 후 반환, 주기적 재확인)
→ 사이사이 다른 태스크 실행 가능

Spark가 _SUCCESS를 자동 생성하지 않는 상황이라면, Airflow에서 마커 파일을 직접 생성

# [생산자 측] Airflow 태스크로 Readiness Marker 직접 생성
from airflow.decorators import task
import shutil
@task
def delete_dataset():
shutil.rmtree(dataset_dir, ignore_errors=True)
@task
def generate_dataset():
# 데이터 처리 로직 (생략)
pass
@task
def create_readiness_file():
"""반드시 파이프라인의 마지막 태스크로 실행"""
with open(f"{dataset_dir}/COMPLETED", "w") as f:
f.write("")
# 핵심: readiness marker는 항상 마지막
delete_dataset() >> generate_dataset() >> create_readiness_file()

구현 2: 다음 파티션 Convention 방식#

시간 기반 파티션 구조에서는 별도 마커 파일 없이 규약(convention) 으로 readiness를 판단

# 규약: "다음 파티션이 존재하면 현재 파티션은 완성된 것"
#
# 잡이 매시간 실행:
# 10:00 실행 → partition=10 생성
# 11:00 실행 → partition=11 생성
#
# 소비자가 partition=10을 처리하려면?
# → partition=11이 존재하는지 확인
# → 존재하면 partition=10은 완성된 것으로 간주
from airflow.sensors.filesystem import FileSensor
# partition=11의 존재로 partition=10의 완성을 판단
wait_for_next_partition = FileSensor(
task_id="wait_for_partition_readiness",
filepath="/data/output/partition=11",
mode="reschedule",
)
process_partition_10 = SparkKubernetesOperator(
task_id="process_partition_10",
# ...
)
wait_for_next_partition >> process_partition_10

Late Data 문제#

파티션 기반 readiness에는 치명적 약점이 있습니다. 이벤트 시간 기반 파티션에서 Late Data가 도착하면, 이미 “완성”으로 간주된 파티션에 새 데이터가 추가되는 문제 발생

[시간축]
10:00 파티션 완성 → 소비자가 처리 완료
11:00 파티션 완성
...
12:30 — 10:00 시간대의 늦은 이벤트 도착!
→ 10:00 파티션에 데이터 추가
→ 소비자는 이미 처리 완료 → 누락 발생

두 가지 대응 전략이 있습니다.

# 전략 1: 파티션 불변 원칙
# 한번 닫힌 파티션은 절대 수정하지 않음
# 늦은 데이터는 별도 파티션(예: late_data/)에 적재
# 전략 2: 변경 통지
# 파티션 업데이트 시 소비자에게 알림
# 소비자가 해당 파티션을 재처리할지 결정
  • 어떤 전략을 택하든 소비자와 사전 합의가 필수
  • Readiness Marker는 강제력이 없는 규약이므로, 소비자가 마커를 무시하고 불완전한 데이터를 읽는 것을 기술적으로 막을 수 없음

External Trigger 패턴#

문제: 예측 불가능한 데이터 생성#

데이터 생성 시점 자체가 불규칙할 경우

[스케줄 기반의 문제]
매일 실행 → 월~목 중 1회만 새 데이터 존재
→ 나머지 실행은 "변경 없음" 확인만 하고 종료
→ 컴퓨팅 리소스 낭비 + 불필요한 운영 부담

3단계 구조#

External Trigger는 보통 세 단계로 구성

# 1단계: 알림 채널 구독
# AWS S3 이벤트 → SNS → Lambda 연결 등
# 2단계: 이벤트 핸들러 (필터링 + 컨텍스트 구성)
# 3단계: 파이프라인 트리거
import json
import urllib.parse
import requests
def lambda_handler(event, ctx):
"""AWS Lambda — S3 객체 생성 이벤트 → Airflow DAG 트리거"""
payload = {
"event": json.dumps(event),
"trigger": {
"function_name": ctx.function_name,
"function_version": ctx.function_version,
"lambda_request_id": ctx.aws_request_id,
},
"file_to_load": urllib.parse.unquote_plus(
event["Records"][0]["s3"]["object"]["key"]
),
"dag_run_id": f"External-{ctx.aws_request_id}",
}
# Airflow REST API로 DAG 트리거
response = requests.post(
"http://airflow:8080/api/v1/dags/devices-loader/dagRuns",
data=json.dumps({"conf": payload}),
auth=("admin", "admin"),
)
if response.status_code != 200:
raise Exception(f"Trigger failed: {response.text} for {payload}")
return True
from airflow import DAG
with DAG(
"devices-loader",
schedule_interval=None,
max_active_runs=5,
default_args={"depends_on_past": False},
) as dag:
# conf에서 payload 꺼내서 처리
# {{ dag_run.conf['file_to_load'] }} 로 접근 가능
pass

Push vs Pull Trigger#

[Pull 기반 Trigger]
장기 실행 프로세스가 주기적으로 새 이벤트 확인
→ 대부분의 시간을 "확인만 하고 0건" 상태로 대기
→ 리소스 낭비, 하지만 구현은 단순
[Push 기반 Trigger] — 권장
이벤트 소스가 엔드포인트에 알림을 보냄
→ 이벤트 있을 때만 핸들러 인스턴스 생성
→ 리소스 효율적, 서버리스(Lambda 등)에 적합

Error Management#

  • External Trigger에서 이벤트 유실은 파이프라인이 아예 실행되지 않는다는 것을 의미.
  • 스케줄 기반은 다음 실행에서 만회할 수 있지만, 이벤트 기반은 유실되면 해당 데이터 수집이 영구 누락
# AWS Lambda의 인프라 레벨 에러 관리
# 1) 실패한 이벤트를 Dead Letter Queue(DLQ)로 전송
# 2) 재시도 횟수 설정
# 3) 배치 크기 및 동시성 제어
# Lambda 설정 (개념적 표현)
lambda_config = {
"dead_letter_queue": "arn:aws:sqs:...:trigger-dlq",
"retry_attempts": 2,
"max_concurrency": 10,
}
# DLQ에 쌓인 실패 이벤트를 모니터링하고 수동/자동 재처리
  • 이벤트 핸들러 자체의 실패뿐 아니라, 트리거된 파이프라인의 실패도 고려.
  • Airflow DAG가 실패하면 Lambda는 성공(200 응답)으로 간주하므로, 파이프라인 레벨의 실패 모니터링과 알림이 별도로 필요

Concept

  • Readiness Marker : 데이터 생성 완료를 알리는 신호. 소비자가 불완전한 데이터를 읽는 것을 방지하는 pull 기반 패턴
  • Flag File (_SUCCESS, COMPLETED) : 데이터 생성 완료 후 생성되는 마커 파일. Spark은 raw 포맷 출력 시 _SUCCESS를 자동 생성
  • 다음 파티션 Convention : 별도 마커 없이 “다음 시간 파티션이 존재하면 현재 파티션은 완성”으로 간주하는 규약
  • FileSensor mode (poke vs reschedule) : poke는 워커 슬롯 계속 점유, reschedule은 확인 후 슬롯 반환. 리소스 효율성 차이
  • Late Data : 이벤트 발생 시간보다 늦게 도착하는 데이터. 파티션 기반 readiness에서 이미 완성 판정된 파티션에 추가되는 문제
  • 파티션 불변성 (Partition Immutability) : 한번 닫힌 파티션은 수정하지 않는 원칙. Late Data 처리 전략 중 하나
  • External Trigger : 이벤트 발생 시에만 파이프라인을 실행하는 push 기반 패턴. 불규칙한 데이터 생성에 적합
  • Push vs Pull Trigger : Push는 이벤트 소스가 알림을 보냄(서버리스 적합), Pull은 소비자가 주기적으로 확인(장기 실행 프로세스)
  • 실행 컨텍스트 (Execution Context) : 트리거 시 전달하는 메타데이터(이벤트 내용, 트리거 버전, 처리 시간 등). 모니터링과 디버깅에 필수
  • Dead Letter Queue (DLQ) : 처리 실패한 이벤트를 별도 큐에 저장하여 유실을 방지하는 에러 관리 패턴
  • schedule_interval=None : Airflow에서 스케줄러의 자동 실행을 비활성화하고, 외부 트리거로만 DAG를 실행하는 설정
    • Dagster Asset Materialization : Dagster의 Software-Defined Asset이 Readiness를 선언적으로 관리하는 방식

[DE Design Pattern]02-6. Data Readiness
https://yjinheon.netlify.app/posts/02de/de-design-pattern/02-data-ingestion/02-di-06-data_readiness/
Author
Datamind
Published at
2025-01-31
License
CC BY-NC-SA 4.0