762 words
4 minutes
[DE Design Pattern]03-04. Static, Dynamic Late Data Integrator
5. Filter Interceptor 패턴
Pattern Overview
- 데이터 파이프라인에서 진행하는 필터링에서 어떤 조건이 몇 건을 걸러냈는지 조건별 통계를 보기 위한 패턴
- 각 필터 조건을 카운터로 감싸서, 조건별로 몇 건이 제거되었는지 추적
구현 방식 1: PySpark Accumulator 기반
# 1단계: 필터 조건 + accumulator 정의@dataclasses.dataclassclass FilterWithAccumulator: name: str filter: Callable[[Any], bool] accumulator: Accumulator[int]
filters_with_accumulators = { 'type': [ FilterWithAccumulator( 'type is null', lambda device: device['type'] is not None, spark_context.accumulator(0) ), FilterWithAccumulator( 'type is too short (1 char)', lambda device: len(device['type']) > 1, spark_context.accumulator(0) ), ],}# 2단계: mapInPandas로 필터 평가 + accumulator 증가def filter_null_type(devices_iterator): def filter_row_with_accumulator(device_row): for attribute in device_row.keys(): for f in filters_with_accumulators.get(attribute, []): if not f.filter(device_row): f.accumulator.add(1) # 조건 불만족 → 카운터 증가 return False return True
for devices_df in devices_iterator: yield devices_df[devices_df.apply( lambda device: filter_row_with_accumulator(device), axis=1 ) == True]
valid_devices = input_dataset.mapInPandas(filter_null_type, schema)valid_devices.write.mode('append').format('delta').save(output_path)# 3단계: 필터링 통계 조회for key, accumulators in filters_with_accumulators.items(): for acc in accumulators: print(f'{key} // {acc.name}: {acc.accumulator.value}')
# 출력 예시:# type // type is null: 3# type // type is too short (1 char): 12- 필터 조건마다 별도 accumulator를 두어 조건별 통계 생성
구현 방식 2: SQL - CASE 기반 status_flag
-- 1단계: 각 필터 조건을 CASE문으로 컬럼화SELECT * FROM ( SELECT CASE WHEN (type IS NOT NULL) IS FALSE THEN 'null_type' WHEN (LEN(type) > 2) IS FALSE THEN 'short_type' WHEN (full_name IS NOT NULL) IS FALSE THEN 'null_full_name' WHEN (version IS NOT NULL) IS FALSE THEN 'null_version' ELSE NULL END AS status_flag, type, full_name, version FROM input);
-- 2단계: 조건별 필터링 건수 집계SELECT COUNT(*), status_flag FROM input_with_flagsWHERE status_flag IS NOT NULLGROUP BY status_flag;
-- 3단계: 정상 레코드만 추출SELECT type, full_name, version FROM input_with_flagsWHERE status_flag IS NULL;핵심: CASE문이 첫 번째로 실패한 조건을 status_flag에 기록
주요 Consequences
런타임 오버헤드: Accumulator 자체는 가벼운 자료구조이므로 영향이 작음. 다만 SQL 방식은 임시 테이블 생성 + 별도 집계 쿼리가 필요해 상대적으로 비용이 크게 발생 스트리밍 적용의 복잡도: stateless 잡에 accumulator를 추가하면 stateful로 전환 가능 또한 연속 데이터이므로 시간 기반 구간별 통계를 정의해야 “지금 어떤 필터가 문제인지” 파악가능
Concept
- Filter Interceptor : 각 필터 조건별로 제거된 레코드 수를 추적하여 필터 버그를 감지하는 패턴
- Accumulator : Spark의 분산 카운터. 각 worker에서 증가시키고 driver에서 합산. 필터별 통계 수집에 활용
- mapInPandas : PySpark에서 Pandas DataFrame 단위로 커스텀 변환을 적용하는 함수. accumulator와 결합하여 필터 래핑 구현
- status_flag : SQL 기반 구현에서 CASE문으로 생성하는 컬럼. 레코드가 어떤 필터 조건에 걸렸는지 기록
- Query Plan Optimization : 프레임워크가 여러 필터를 하나로 합치는 최적화. 개별 조건별 통계를 볼 수 없게 만드는 원인
- CASE문 순서 의존성 : SQL CASE는 첫 번째 매칭 조건만 반환. 한 레코드가 여러 조건에 실패해도 첫 번째만 기록됨
[DE Design Pattern]03-04. Static, Dynamic Late Data Integrator
https://yjinheon.netlify.app/posts/02de/de-design-pattern/03-dead-letter/03-dl-05-filter_interceptor/