1150 words
6 minutes
[DE Design Pattern]09-01 Audit-Write-Audit-Publish
1.Adit-Write-Audit-Publish (AWAP)
Pattern Overview
- 입력과 출력 양쪽에 audit(검증) 단계를 삽입
- 품질이 낮은 데이터가 downstream으로 전파되는 것을 막는 패턴.
Audit 단계들
- 첫 번째 Audit (입력 검증) — 변환 전 입력 데이터를 대상 검증을 수행. 파일 포맷 확인, 파일/테이블 크기 체크, 스키마 검증 등
- 두 번째 Audit (출력 검증) — 변환된 데이터를 대상으로 비즈니스 로직 결과 검증. NULL 검증, 데이터 볼륨 비교, 컬럼 값 분포 확인 등
Audit 실패 시 3가지 전략
- Pipeline Failure — 전체 파이프라인을 중단
- Data Dispatching — 유효한 레코드만 downstream으로 보내고, 유효하지 않은 레코드는 별도 저장소로 분리
- Nonblocking Audit — 품질 이슈가 있어도 데이터를 publish하되, 이슈 내용을 annotation(데이터 요약 엔트리)으로 남김. 해당 컬럼을 사용하지 않는 consumer는 그대로 쓸 수 있고, 사용하는 consumer는 threshold를 기준으로 판단
Batch implmentation
# Airflow DAG 구조: Audit → Transform → Audit → Loadfrom airflow.operators.python import PythonOperatorfrom airflow.providers.postgres.operators.postgres import PostgresOperator
# 1st Auditdef validate_input_file(file_path, min_size=1024, min_lines=10): import os, json f_size = os.path.getsize(file_path) validation_errors = []
if f_size < min_size: validation_errors.append(f'File too small: {f_size} < {min_size}')
with open(file_path) as f: lines = f.readlines() if len(lines) < min_lines: validation_errors.append(f'Too few lines: {len(lines)} < {min_lines}') # JSON 포맷 검증 (첫 줄만 샘플링) try: json.loads(lines[0]) except json.JSONDecodeError as e: validation_errors.append(f'Invalid JSON at line 1: {e}')
if validation_errors: raise Exception(f'Input audit failed:\n' + '\n'.join(validation_errors))
# 2nd Auditdef validate_transformed_data(csv_path): import pandas as pd df = pd.read_csv(csv_path) required_columns = ['visit_id', 'event_time', 'user_id', 'page']
cols_with_nulls = [col for col in required_columns if df[col].isnull().any()]
if cols_with_nulls: raise Exception(f'NULL found in required columns: {cols_with_nulls}')이 코드에서 첫 번째 함수는 파일 크기, 라인 수, JSON 유효성 같은 메타데이터 수준 검증을 수행하고, 두 번째 함수는 pandas로 실제 데이터를 읽어 필수 컬럼의 NULL 여부를 검증
Streaming 적용
- Window-based — 스트리밍 잡 내부에 processing time window를 만들고, 윈도우가 닫힐 때 audit 로직을 실행. 상태 관리 오버헤드 존재
- Staging-based — 변환 결과를 staging 테이블에 먼저 쓰고, 별도 audit 잡이 검증 후 최종 출력으로 promote한다. 기존 처리 로직을 수정할 필요가 없다.
# Staging-based 접근: Spark Structured Streaming → Delta Lake staging tablefrom pyspark.sql import functions as F
# Step 1: 스트리밍 데이터를 staging 테이블에 쓰기visits = (spark_session.readStream .format('kafka') .option('kafka.bootstrap.servers', 'localhost:9092') .option('subscribe', 'visits') .load() .selectExpr('CAST(value AS STRING)') .select(F.from_json('value', visit_schema).alias('visit')) .selectExpr('visit.*'))
write_query = (visits.writeStream .trigger(processingTime='15 seconds') .option('checkpointLocation', checkpoint_dir) .foreachBatch(write_dataset_to_staging_table) .start())
# Step 2: 별도 잡이 staging 테이블을 읽고 row-level 검증 후 최종 출력visits_to_audit = (spark_session.readStream .format('delta') .table('staging_visits') .withColumn('is_valid', F.col('visit_id').isNotNull() & F.col('event_time').isNotNull() ))# is_valid == True → final output, False → error outputConsequences
Compute cost — 메타데이터 기반 검증은 저렴하지만, row-level 검증은 추가 컴퓨팅 비용이 발생
Rules coverage — 데이터셋은 시간이 지나면 변하므로, 오늘 정의한 규칙이 내일의 데이터를 완벽히 커버하지 못할 수 있다. Quality Observation 패턴으로 보완필요
False positive — audit 실패가 실제 문제가 아닐 수 있다. 예를 들어, 소셜 미디어에 인용되어 방문자가 급증한 경우 데이터 볼륨이 예상을 초과하지만 이는 정상이다. 모든 audit 실패를 critical로 취급하지 말고, alert + 조사 트리거로 사용가능
Concept
- AWAP (Audit-Write-Audit-Publish) : WAP의 확장. 파이프라인 입력/출력 양쪽에 audit 단계를 삽입하여 데이터 품질을 검증하는 패턴
- WAP (Write-Audit-Publish) : Netflix에서 소개한 원본 패턴. 출력 데이터만 검증 후 publish. AWAP의 전신
- First Audit (Input Audit) : 변환 전 입력 데이터의 메타데이터 수준 검증 (포맷, 크기, 스키마). 가볍고 빠르게 수행
- Second Audit (Output Audit) : 변환 후 결과 데이터의 비즈니스 로직 검증 (NULL, 값 분포, 볼륨). unit test의 실데이터 확장판
- Data Dispatching : audit 실패 시 유효한 데이터만 downstream으로 보내고 무효 데이터를 별도 저장소로 분리하는 전략
- Nonblocking Audit : 품질 이슈가 있어도 데이터를 publish하되, 이슈 annotation을 남겨 consumer가 판단하게 하는 전략
- Window-based AWAP (Streaming) : 스트리밍 잡 내 processing time window 단위로 audit을 수행하는 방식
- Staging-based AWAP (Streaming) : 변환 결과를 staging layer에 먼저 쓰고, 별도 audit 잡이 검증 후 최종 출력으로 promote하는 방식
- Exhaustiveness Rule : 같은 검증을 양쪽에 넣으면 이중 처리 비용이 발생하므로, 가장 포괄적인 위치(보통 2nd Audit)에 배치하는 원칙
[DE Design Pattern]09-01 Audit-Write-Audit-Publish
https://yjinheon.netlify.app/posts/02de/00-de-design-pattern/09_data_quality/09-01-audit-write-audit-publish/