1150 words
6 minutes
[DE Design Pattern]09-01 Audit-Write-Audit-Publish

1.Adit-Write-Audit-Publish (AWAP)#

Pattern Overview#

  • 입력과 출력 양쪽에 audit(검증) 단계를 삽입
  • 품질이 낮은 데이터가 downstream으로 전파되는 것을 막는 패턴.

Audit 단계들#

  • 첫 번째 Audit (입력 검증) — 변환 전 입력 데이터를 대상 검증을 수행. 파일 포맷 확인, 파일/테이블 크기 체크, 스키마 검증 등
  • 두 번째 Audit (출력 검증) — 변환된 데이터를 대상으로 비즈니스 로직 결과 검증. NULL 검증, 데이터 볼륨 비교, 컬럼 값 분포 확인 등

Audit 실패 시 3가지 전략#

  • Pipeline Failure — 전체 파이프라인을 중단
  • Data Dispatching — 유효한 레코드만 downstream으로 보내고, 유효하지 않은 레코드는 별도 저장소로 분리
  • Nonblocking Audit — 품질 이슈가 있어도 데이터를 publish하되, 이슈 내용을 annotation(데이터 요약 엔트리)으로 남김. 해당 컬럼을 사용하지 않는 consumer는 그대로 쓸 수 있고, 사용하는 consumer는 threshold를 기준으로 판단

Batch implmentation#

# Airflow DAG 구조: Audit → Transform → Audit → Load
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
# 1st Audit
def validate_input_file(file_path, min_size=1024, min_lines=10):
import os, json
f_size = os.path.getsize(file_path)
validation_errors = []
if f_size < min_size:
validation_errors.append(f'File too small: {f_size} < {min_size}')
with open(file_path) as f:
lines = f.readlines()
if len(lines) < min_lines:
validation_errors.append(f'Too few lines: {len(lines)} < {min_lines}')
# JSON 포맷 검증 (첫 줄만 샘플링)
try:
json.loads(lines[0])
except json.JSONDecodeError as e:
validation_errors.append(f'Invalid JSON at line 1: {e}')
if validation_errors:
raise Exception(f'Input audit failed:\n' + '\n'.join(validation_errors))
# 2nd Audit
def validate_transformed_data(csv_path):
import pandas as pd
df = pd.read_csv(csv_path)
required_columns = ['visit_id', 'event_time', 'user_id', 'page']
cols_with_nulls = [col for col in required_columns if df[col].isnull().any()]
if cols_with_nulls:
raise Exception(f'NULL found in required columns: {cols_with_nulls}')

이 코드에서 첫 번째 함수는 파일 크기, 라인 수, JSON 유효성 같은 메타데이터 수준 검증을 수행하고, 두 번째 함수는 pandas로 실제 데이터를 읽어 필수 컬럼의 NULL 여부를 검증

Streaming 적용#

  • Window-based — 스트리밍 잡 내부에 processing time window를 만들고, 윈도우가 닫힐 때 audit 로직을 실행. 상태 관리 오버헤드 존재
  • Staging-based — 변환 결과를 staging 테이블에 먼저 쓰고, 별도 audit 잡이 검증 후 최종 출력으로 promote한다. 기존 처리 로직을 수정할 필요가 없다.
# Staging-based 접근: Spark Structured Streaming → Delta Lake staging table
from pyspark.sql import functions as F
# Step 1: 스트리밍 데이터를 staging 테이블에 쓰기
visits = (spark_session.readStream
.format('kafka')
.option('kafka.bootstrap.servers', 'localhost:9092')
.option('subscribe', 'visits')
.load()
.selectExpr('CAST(value AS STRING)')
.select(F.from_json('value', visit_schema).alias('visit'))
.selectExpr('visit.*')
)
write_query = (visits.writeStream
.trigger(processingTime='15 seconds')
.option('checkpointLocation', checkpoint_dir)
.foreachBatch(write_dataset_to_staging_table)
.start()
)
# Step 2: 별도 잡이 staging 테이블을 읽고 row-level 검증 후 최종 출력
visits_to_audit = (spark_session.readStream
.format('delta')
.table('staging_visits')
.withColumn('is_valid',
F.col('visit_id').isNotNull() & F.col('event_time').isNotNull()
)
)
# is_valid == True → final output, False → error output

Consequences#

Compute cost — 메타데이터 기반 검증은 저렴하지만, row-level 검증은 추가 컴퓨팅 비용이 발생

Rules coverage — 데이터셋은 시간이 지나면 변하므로, 오늘 정의한 규칙이 내일의 데이터를 완벽히 커버하지 못할 수 있다. Quality Observation 패턴으로 보완필요

False positive — audit 실패가 실제 문제가 아닐 수 있다. 예를 들어, 소셜 미디어에 인용되어 방문자가 급증한 경우 데이터 볼륨이 예상을 초과하지만 이는 정상이다. 모든 audit 실패를 critical로 취급하지 말고, alert + 조사 트리거로 사용가능


Concept

  • AWAP (Audit-Write-Audit-Publish) : WAP의 확장. 파이프라인 입력/출력 양쪽에 audit 단계를 삽입하여 데이터 품질을 검증하는 패턴
  • WAP (Write-Audit-Publish) : Netflix에서 소개한 원본 패턴. 출력 데이터만 검증 후 publish. AWAP의 전신
  • First Audit (Input Audit) : 변환 전 입력 데이터의 메타데이터 수준 검증 (포맷, 크기, 스키마). 가볍고 빠르게 수행
  • Second Audit (Output Audit) : 변환 후 결과 데이터의 비즈니스 로직 검증 (NULL, 값 분포, 볼륨). unit test의 실데이터 확장판
  • Data Dispatching : audit 실패 시 유효한 데이터만 downstream으로 보내고 무효 데이터를 별도 저장소로 분리하는 전략
  • Nonblocking Audit : 품질 이슈가 있어도 데이터를 publish하되, 이슈 annotation을 남겨 consumer가 판단하게 하는 전략
  • Window-based AWAP (Streaming) : 스트리밍 잡 내 processing time window 단위로 audit을 수행하는 방식
  • Staging-based AWAP (Streaming) : 변환 결과를 staging layer에 먼저 쓰고, 별도 audit 잡이 검증 후 최종 출력으로 promote하는 방식
  • Exhaustiveness Rule : 같은 검증을 양쪽에 넣으면 이중 처리 비용이 발생하므로, 가장 포괄적인 위치(보통 2nd Audit)에 배치하는 원칙

[DE Design Pattern]09-01 Audit-Write-Audit-Publish
https://yjinheon.netlify.app/posts/02de/00-de-design-pattern/09_data_quality/09-01-audit-write-audit-publish/
Author
Datamind
Published at
2026-04-03
License
CC BY-NC-SA 4.0