1122 words
6 minutes
[DE Design Pattern]05-1. Static_Joiner
1. Data Enrichment: Static Joiner
01. Pattern Overview
-
Static Joiner는 정적(at-rest) 참조 데이터셋을 활용하여 원본 데이터에 컨텍스트를 추가하는 패턴
-
ex) visit 이벤트에 user 프로필 정보를 붙이는 것
-
구현의 핵심은 두 데이터셋 간의 JOIN 키(예:
user_id)를 식별하는 것 -
참조 데이터가 시간에 따라 변하면 **SCD(Slowly Changing Dimensions)**를 도입해 시점별 정확한 매칭을 보장해야 함
02. SCD Type 2: 단일 테이블에 이력 관리
- SCD Type 2는 하나의 테이블에서
start_date,end_date로 각 행의 유효 기간을 관리 - 현재 값은
end_date가 NULL이거나 먼 미래 값으로 설정
# SCD Type 2 JOIN: 실행 시점 기준으로 유효한 user 매칭query = """SELECT v.visit_id, v.event_time, v.page, u.id AS user_id, u.loginFROM visits vJOIN users u ON u.id = v.user_id AND v.event_time BETWEEN u.start_date AND u.end_date"""# 멱등성을 위해 NOW() 대신 오케스트레이터의 execution_time 사용NOW()대신 Airflow의logical_date같은 불변 시간값을 쓰면, 재실행 시에도 동일 결과를 보장하여 멱등성을 확보할 수있음
03. SCD Type 4: 현재값/이력 테이블 분리
- SCD Type 4는 현재값 테이블과 이력 테이블을 분리
- 조인 쿼리는 Type 2와 동일하지만, 현재값만 필요할 때 이력 테이블을 스캔하지 않아도 됨
04. Stream-to-Batch JOIN
스트리밍 데이터에 정적 참조 데이터를 붙이는 방식도 가능. API는 일반 배치 JOIN과 동일
# 스트리밍 visits + 정적 devices 테이블 JOINdevices = spark.read.format("delta").load("/data/devices")visits = spark.readStream.format("kafka").load()
enriched = visits.join( devices, [visits.device_type == devices.type, visits.device_version == devices.version], "left")- 핵심은 정적 데이터셋과 스트리밍 잡은 별도 라이프사이클을 가진다는 것.
- 스트리밍 잡이 정적 데이터 업데이트를 기다리지 않으므로, Delta Lake 같은 ACID 포맷을 사용해야 빈 테이블과 조인되는 문제를 방지할 수 있음
05. API 기반 Enrichment
외부 API로 enrichment할 때는 bulk 요청으로 네트워크 비용을 최적화합니다.
# 버퍼링 후 bulk API 호출class KafkaWriterWithEnricher: BUFFER_THRESHOLD = 100
def process(self, row): self.buffer.append(row) if len(self.buffer) >= self.BUFFER_THRESHOLD: ips = set(r.ip for r in self.buffer) # 한 번의 bulk 호출로 여러 IP 조회 response = requests.get( f"http://ip-service/bulk?ips={','.join(ips)}" ) self.enriched_ips.update(response.json()) self._flush_records()(1) 처리 중 실시간 API 호출, (2) API 데이터를 먼저 테이블로 물리화(materialize)한 후 JOIN.
06. 주요 트레이드오프
- Late data: 스트리밍에서는 참조 데이터 갱신 지연으로 조인 미스 발생 가능 → Dynamic Joiner로 해결
- 멱등성: 백필 시 enrichment 데이터셋이 시점별로 달라질 수 있음 → SCD + 불변 execution_time으로 해결
- API enrichment: 실시간 호출은 멱등하지 않음 → 물리화 후 SCD 활용
Concept
- Static Joiner : 정적 참조 데이터를 JOIN하여 원본 데이터에 컨텍스트를 추가하는 패턴
- SCD (Slowly Changing Dimensions) : 시간에 따라 변하는 참조 데이터의 이력을 추적하는 데이터 모델링 전략
- SCD Type 2 : 단일 테이블에서 start_date/end_date로 행의 유효 기간을 관리하는 방식
- SCD Type 4 : 현재값 테이블과 이력 테이블을 분리하여 관리하는 방식
- Stream-to-Batch JOIN : 스트리밍 데이터와 정적 배치 데이터를 결합하는 연산. ACID 포맷 사용이 중요
- Materialized API : API 데이터를 테이블로 물리화한 후 JOIN하는 방식. 멱등성 보장에 유리
- Bulk API Enrichment : 네트워크 비용 최적화를 위해 레코드를 버퍼링 후 일괄 API 호출하는 기법
- Broadcast JOIN : 작은 참조 테이블을 각 노드에 복사하여 shuffle 없이 JOIN하는 기법. shuffle이 없다는 것은 key 기준으로 데이터를 다시 partition에 분배할 필요가 없다는 것을 의미
- 스트리밍 잡에서 Static Joiner를 사용할 때, 참조 데이터를 JSON/CSV 같은 raw 포맷 대신 Delta Lake로 저장해야 하는 이유
- 기본적으로 스트리밍 잡은 참조데이터 업데이트를 기다리지 않음. raw 포맷은 전체 덮어쓰기 시 일시적으로 빈 상태가 될 수 있어, 그 순간 JOIN하면 빈 결과를 받을 수있음.
- Delta Lake는 트랜잭션과 원자성을 보장하므로 커밋되지 않은 파일을 읽지 않아 이와같은 문제를 방지함
[DE Design Pattern]05-1. Static_Joiner
https://yjinheon.netlify.app/posts/02de/de-design-pattern/05_data_value/05-01-static-joiner/