1902 words
10 minutes
[DE Design Pattern]02-6. Data Readiness
06. Data Readiness & External Trigger
기본적으로 데이터를 언제 수집을 시작할 지에 대한 패턴
Readiness Marker 패턴
데이터 완성 시점을 어떻게 알릴 것인가의 문제
Flag File 방식
데이터 생성이 완료되면 마커 파일을 생성하고 consumer는 이 파일의 존재 여부로 처리 시작을 판단
# [producer] Apache Spark는 _SUCCESS 파일을 자동 생성dataset = spark.read.schema("...").json(input_path)dataset.write.mode("overwrite").format("parquet").save(output_path)# → output_path/_SUCCESS 파일이 자동 생성됨
# [consumer] Airflow FileSensor로 _SUCCESS 대기from airflow.sensors.filesystem import FileSensor
wait_for_data = FileSensor( task_id="wait_for_data", filepath=f"{input_data_path}/_SUCCESS", mode="reschedule", # 핵심: 워커 슬롯을 점유하지 않음)- 기본값
poke는 센서가 워커 슬롯을 계속 점유하면서 주기적으로 확인 reschedule은 확인 후 슬롯을 반환하고, 다음 확인 시점에 다시 스케줄링
[poke 모드]워커 슬롯 점유: ████████████████████ (파일 존재할 때까지 계속)→ 다른 태스크가 실행 불가
[reschedule 모드]워커 슬롯 점유: ██░░░░██░░░░██░░░░██ (확인 후 반환, 주기적 재확인)→ 사이사이 다른 태스크 실행 가능Spark가 _SUCCESS를 자동 생성하지 않는 상황이라면, Airflow에서 마커 파일을 직접 생성
# [생산자 측] Airflow 태스크로 Readiness Marker 직접 생성from airflow.decorators import taskimport shutil
@taskdef delete_dataset(): shutil.rmtree(dataset_dir, ignore_errors=True)
@taskdef generate_dataset(): # 데이터 처리 로직 (생략) pass
@taskdef create_readiness_file(): """반드시 파이프라인의 마지막 태스크로 실행""" with open(f"{dataset_dir}/COMPLETED", "w") as f: f.write("")
# 핵심: readiness marker는 항상 마지막delete_dataset() >> generate_dataset() >> create_readiness_file()구현 2: 다음 파티션 Convention 방식
시간 기반 파티션 구조에서는 별도 마커 파일 없이 규약(convention) 으로 readiness를 판단
# 규약: "다음 파티션이 존재하면 현재 파티션은 완성된 것"## 잡이 매시간 실행:# 10:00 실행 → partition=10 생성# 11:00 실행 → partition=11 생성## 소비자가 partition=10을 처리하려면?# → partition=11이 존재하는지 확인# → 존재하면 partition=10은 완성된 것으로 간주
from airflow.sensors.filesystem import FileSensor
# partition=11의 존재로 partition=10의 완성을 판단wait_for_next_partition = FileSensor( task_id="wait_for_partition_readiness", filepath="/data/output/partition=11", mode="reschedule",)
process_partition_10 = SparkKubernetesOperator( task_id="process_partition_10", # ...)
wait_for_next_partition >> process_partition_10Late Data 문제
파티션 기반 readiness에는 치명적 약점이 있습니다. 이벤트 시간 기반 파티션에서 Late Data가 도착하면, 이미 “완성”으로 간주된 파티션에 새 데이터가 추가되는 문제 발생
[시간축]10:00 파티션 완성 → 소비자가 처리 완료11:00 파티션 완성 ...12:30 — 10:00 시간대의 늦은 이벤트 도착! → 10:00 파티션에 데이터 추가 → 소비자는 이미 처리 완료 → 누락 발생두 가지 대응 전략이 있습니다.
# 전략 1: 파티션 불변 원칙# 한번 닫힌 파티션은 절대 수정하지 않음# 늦은 데이터는 별도 파티션(예: late_data/)에 적재
# 전략 2: 변경 통지# 파티션 업데이트 시 소비자에게 알림# 소비자가 해당 파티션을 재처리할지 결정- 어떤 전략을 택하든 소비자와 사전 합의가 필수
- Readiness Marker는 강제력이 없는 규약이므로, 소비자가 마커를 무시하고 불완전한 데이터를 읽는 것을 기술적으로 막을 수 없음
External Trigger 패턴
문제: 예측 불가능한 데이터 생성
데이터 생성 시점 자체가 불규칙할 경우
[스케줄 기반의 문제]매일 실행 → 월~목 중 1회만 새 데이터 존재→ 나머지 실행은 "변경 없음" 확인만 하고 종료→ 컴퓨팅 리소스 낭비 + 불필요한 운영 부담3단계 구조
External Trigger는 보통 세 단계로 구성
# 1단계: 알림 채널 구독# AWS S3 이벤트 → SNS → Lambda 연결 등
# 2단계: 이벤트 핸들러 (필터링 + 컨텍스트 구성)# 3단계: 파이프라인 트리거
import jsonimport urllib.parseimport requests
def lambda_handler(event, ctx): """AWS Lambda — S3 객체 생성 이벤트 → Airflow DAG 트리거"""
payload = { "event": json.dumps(event), "trigger": { "function_name": ctx.function_name, "function_version": ctx.function_version, "lambda_request_id": ctx.aws_request_id, }, "file_to_load": urllib.parse.unquote_plus( event["Records"][0]["s3"]["object"]["key"] ), "dag_run_id": f"External-{ctx.aws_request_id}", }
# Airflow REST API로 DAG 트리거 response = requests.post( "http://airflow:8080/api/v1/dags/devices-loader/dagRuns", data=json.dumps({"conf": payload}), auth=("admin", "admin"), )
if response.status_code != 200: raise Exception(f"Trigger failed: {response.text} for {payload}") return Truefrom airflow import DAG
with DAG( "devices-loader", schedule_interval=None, max_active_runs=5, default_args={"depends_on_past": False},) as dag: # conf에서 payload 꺼내서 처리 # {{ dag_run.conf['file_to_load'] }} 로 접근 가능 passPush vs Pull Trigger
[Pull 기반 Trigger]장기 실행 프로세스가 주기적으로 새 이벤트 확인→ 대부분의 시간을 "확인만 하고 0건" 상태로 대기→ 리소스 낭비, 하지만 구현은 단순
[Push 기반 Trigger] — 권장이벤트 소스가 엔드포인트에 알림을 보냄→ 이벤트 있을 때만 핸들러 인스턴스 생성→ 리소스 효율적, 서버리스(Lambda 등)에 적합Error Management
- External Trigger에서 이벤트 유실은 파이프라인이 아예 실행되지 않는다는 것을 의미.
- 스케줄 기반은 다음 실행에서 만회할 수 있지만, 이벤트 기반은 유실되면 해당 데이터 수집이 영구 누락
# AWS Lambda의 인프라 레벨 에러 관리# 1) 실패한 이벤트를 Dead Letter Queue(DLQ)로 전송# 2) 재시도 횟수 설정# 3) 배치 크기 및 동시성 제어
# Lambda 설정 (개념적 표현)lambda_config = { "dead_letter_queue": "arn:aws:sqs:...:trigger-dlq", "retry_attempts": 2, "max_concurrency": 10,}
# DLQ에 쌓인 실패 이벤트를 모니터링하고 수동/자동 재처리- 이벤트 핸들러 자체의 실패뿐 아니라, 트리거된 파이프라인의 실패도 고려.
- Airflow DAG가 실패하면 Lambda는 성공(200 응답)으로 간주하므로, 파이프라인 레벨의 실패 모니터링과 알림이 별도로 필요
Concept
- Readiness Marker : 데이터 생성 완료를 알리는 신호. 소비자가 불완전한 데이터를 읽는 것을 방지하는 pull 기반 패턴
- Flag File (_SUCCESS, COMPLETED) : 데이터 생성 완료 후 생성되는 마커 파일. Spark은 raw 포맷 출력 시
_SUCCESS를 자동 생성 - 다음 파티션 Convention : 별도 마커 없이 “다음 시간 파티션이 존재하면 현재 파티션은 완성”으로 간주하는 규약
- FileSensor mode (poke vs reschedule) : poke는 워커 슬롯 계속 점유, reschedule은 확인 후 슬롯 반환. 리소스 효율성 차이
- Late Data : 이벤트 발생 시간보다 늦게 도착하는 데이터. 파티션 기반 readiness에서 이미 완성 판정된 파티션에 추가되는 문제
- 파티션 불변성 (Partition Immutability) : 한번 닫힌 파티션은 수정하지 않는 원칙. Late Data 처리 전략 중 하나
- External Trigger : 이벤트 발생 시에만 파이프라인을 실행하는 push 기반 패턴. 불규칙한 데이터 생성에 적합
- Push vs Pull Trigger : Push는 이벤트 소스가 알림을 보냄(서버리스 적합), Pull은 소비자가 주기적으로 확인(장기 실행 프로세스)
- 실행 컨텍스트 (Execution Context) : 트리거 시 전달하는 메타데이터(이벤트 내용, 트리거 버전, 처리 시간 등). 모니터링과 디버깅에 필수
- Dead Letter Queue (DLQ) : 처리 실패한 이벤트를 별도 큐에 저장하여 유실을 방지하는 에러 관리 패턴
- schedule_interval=None : Airflow에서 스케줄러의 자동 실행을 비활성화하고, 외부 트리거로만 DAG를 실행하는 설정
- Dagster Asset Materialization : Dagster의 Software-Defined Asset이 Readiness를 선언적으로 관리하는 방식
[DE Design Pattern]02-6. Data Readiness
https://yjinheon.netlify.app/posts/02de/de-design-pattern/02-data-ingestion/02-di-06-data_readiness/