762 words
4 minutes
[DE Design Pattern]03-04. Static, Dynamic Late Data Integrator

5. Filter Interceptor 패턴#

Pattern Overview#

  • 데이터 파이프라인에서 진행하는 필터링에서 어떤 조건이 몇 건을 걸러냈는지 조건별 통계를 보기 위한 패턴
  • 각 필터 조건을 카운터로 감싸서, 조건별로 몇 건이 제거되었는지 추적

구현 방식 1: PySpark Accumulator 기반#

# 1단계: 필터 조건 + accumulator 정의
@dataclasses.dataclass
class FilterWithAccumulator:
name: str
filter: Callable[[Any], bool]
accumulator: Accumulator[int]
filters_with_accumulators = {
'type': [
FilterWithAccumulator(
'type is null',
lambda device: device['type'] is not None,
spark_context.accumulator(0)
),
FilterWithAccumulator(
'type is too short (1 char)',
lambda device: len(device['type']) > 1,
spark_context.accumulator(0)
),
],
}
# 2단계: mapInPandas로 필터 평가 + accumulator 증가
def filter_null_type(devices_iterator):
def filter_row_with_accumulator(device_row):
for attribute in device_row.keys():
for f in filters_with_accumulators.get(attribute, []):
if not f.filter(device_row):
f.accumulator.add(1) # 조건 불만족 → 카운터 증가
return False
return True
for devices_df in devices_iterator:
yield devices_df[devices_df.apply(
lambda device: filter_row_with_accumulator(device), axis=1
) == True]
valid_devices = input_dataset.mapInPandas(filter_null_type, schema)
valid_devices.write.mode('append').format('delta').save(output_path)
# 3단계: 필터링 통계 조회
for key, accumulators in filters_with_accumulators.items():
for acc in accumulators:
print(f'{key} // {acc.name}: {acc.accumulator.value}')
# 출력 예시:
# type // type is null: 3
# type // type is too short (1 char): 12
  • 필터 조건마다 별도 accumulator를 두어 조건별 통계 생성

구현 방식 2: SQL - CASE 기반 status_flag#

-- 1단계: 각 필터 조건을 CASE문으로 컬럼화
SELECT * FROM (
SELECT
CASE
WHEN (type IS NOT NULL) IS FALSE THEN 'null_type'
WHEN (LEN(type) > 2) IS FALSE THEN 'short_type'
WHEN (full_name IS NOT NULL) IS FALSE THEN 'null_full_name'
WHEN (version IS NOT NULL) IS FALSE THEN 'null_version'
ELSE NULL
END AS status_flag,
type, full_name, version
FROM input
);
-- 2단계: 조건별 필터링 건수 집계
SELECT COUNT(*), status_flag FROM input_with_flags
WHERE status_flag IS NOT NULL
GROUP BY status_flag;
-- 3단계: 정상 레코드만 추출
SELECT type, full_name, version FROM input_with_flags
WHERE status_flag IS NULL;

핵심: CASE문이 첫 번째로 실패한 조건을 status_flag에 기록

주요 Consequences#

런타임 오버헤드: Accumulator 자체는 가벼운 자료구조이므로 영향이 작음. 다만 SQL 방식은 임시 테이블 생성 + 별도 집계 쿼리가 필요해 상대적으로 비용이 크게 발생 스트리밍 적용의 복잡도: stateless 잡에 accumulator를 추가하면 stateful로 전환 가능 또한 연속 데이터이므로 시간 기반 구간별 통계를 정의해야 “지금 어떤 필터가 문제인지” 파악가능


Concept

  • Filter Interceptor : 각 필터 조건별로 제거된 레코드 수를 추적하여 필터 버그를 감지하는 패턴
  • Accumulator : Spark의 분산 카운터. 각 worker에서 증가시키고 driver에서 합산. 필터별 통계 수집에 활용
  • mapInPandas : PySpark에서 Pandas DataFrame 단위로 커스텀 변환을 적용하는 함수. accumulator와 결합하여 필터 래핑 구현
  • status_flag : SQL 기반 구현에서 CASE문으로 생성하는 컬럼. 레코드가 어떤 필터 조건에 걸렸는지 기록
  • Query Plan Optimization : 프레임워크가 여러 필터를 하나로 합치는 최적화. 개별 조건별 통계를 볼 수 없게 만드는 원인
  • CASE문 순서 의존성 : SQL CASE는 첫 번째 매칭 조건만 반환. 한 레코드가 여러 조건에 실패해도 첫 번째만 기록됨

[DE Design Pattern]03-04. Static, Dynamic Late Data Integrator
https://yjinheon.netlify.app/posts/02de/de-design-pattern/03-dead-letter/03-dl-05-filter_interceptor/
Author
Datamind
Published at
2025-02-24
License
CC BY-NC-SA 4.0