1122 words
6 minutes
[DE Design Pattern]05-1. Static_Joiner

1. Data Enrichment: Static Joiner#

01. Pattern Overview#

  • Static Joiner는 정적(at-rest) 참조 데이터셋을 활용하여 원본 데이터에 컨텍스트를 추가하는 패턴

  • ex) visit 이벤트에 user 프로필 정보를 붙이는 것

  • 구현의 핵심은 두 데이터셋 간의 JOIN 키(예: user_id)를 식별하는 것

  • 참조 데이터가 시간에 따라 변하면 **SCD(Slowly Changing Dimensions)**를 도입해 시점별 정확한 매칭을 보장해야 함

02. SCD Type 2: 단일 테이블에 이력 관리#

  • SCD Type 2는 하나의 테이블에서 start_date, end_date로 각 행의 유효 기간을 관리
  • 현재 값은 end_date가 NULL이거나 먼 미래 값으로 설정
# SCD Type 2 JOIN: 실행 시점 기준으로 유효한 user 매칭
query = """
SELECT v.visit_id, v.event_time, v.page,
u.id AS user_id, u.login
FROM visits v
JOIN users u
ON u.id = v.user_id
AND v.event_time BETWEEN u.start_date AND u.end_date
"""
# 멱등성을 위해 NOW() 대신 오케스트레이터의 execution_time 사용
  • NOW() 대신 Airflow의 logical_date 같은 불변 시간값을 쓰면, 재실행 시에도 동일 결과를 보장하여 멱등성을 확보할 수있음

03. SCD Type 4: 현재값/이력 테이블 분리#

  • SCD Type 4는 현재값 테이블과 이력 테이블을 분리
  • 조인 쿼리는 Type 2와 동일하지만, 현재값만 필요할 때 이력 테이블을 스캔하지 않아도 됨

04. Stream-to-Batch JOIN#

스트리밍 데이터에 정적 참조 데이터를 붙이는 방식도 가능. API는 일반 배치 JOIN과 동일

# 스트리밍 visits + 정적 devices 테이블 JOIN
devices = spark.read.format("delta").load("/data/devices")
visits = spark.readStream.format("kafka").load()
enriched = visits.join(
devices,
[visits.device_type == devices.type,
visits.device_version == devices.version],
"left"
)
  • 핵심은 정적 데이터셋과 스트리밍 잡은 별도 라이프사이클을 가진다는 것.
  • 스트리밍 잡이 정적 데이터 업데이트를 기다리지 않으므로, Delta Lake 같은 ACID 포맷을 사용해야 빈 테이블과 조인되는 문제를 방지할 수 있음

05. API 기반 Enrichment#

외부 API로 enrichment할 때는 bulk 요청으로 네트워크 비용을 최적화합니다.

# 버퍼링 후 bulk API 호출
class KafkaWriterWithEnricher:
BUFFER_THRESHOLD = 100
def process(self, row):
self.buffer.append(row)
if len(self.buffer) >= self.BUFFER_THRESHOLD:
ips = set(r.ip for r in self.buffer)
# 한 번의 bulk 호출로 여러 IP 조회
response = requests.get(
f"http://ip-service/bulk?ips={','.join(ips)}"
)
self.enriched_ips.update(response.json())
self._flush_records()

(1) 처리 중 실시간 API 호출, (2) API 데이터를 먼저 테이블로 물리화(materialize)한 후 JOIN.

06. 주요 트레이드오프#

  • Late data: 스트리밍에서는 참조 데이터 갱신 지연으로 조인 미스 발생 가능 → Dynamic Joiner로 해결
  • 멱등성: 백필 시 enrichment 데이터셋이 시점별로 달라질 수 있음 → SCD + 불변 execution_time으로 해결
  • API enrichment: 실시간 호출은 멱등하지 않음 → 물리화 후 SCD 활용

Concept

  • Static Joiner : 정적 참조 데이터를 JOIN하여 원본 데이터에 컨텍스트를 추가하는 패턴
  • SCD (Slowly Changing Dimensions) : 시간에 따라 변하는 참조 데이터의 이력을 추적하는 데이터 모델링 전략
  • SCD Type 2 : 단일 테이블에서 start_date/end_date로 행의 유효 기간을 관리하는 방식
  • SCD Type 4 : 현재값 테이블과 이력 테이블을 분리하여 관리하는 방식
  • Stream-to-Batch JOIN : 스트리밍 데이터와 정적 배치 데이터를 결합하는 연산. ACID 포맷 사용이 중요
  • Materialized API : API 데이터를 테이블로 물리화한 후 JOIN하는 방식. 멱등성 보장에 유리
  • Bulk API Enrichment : 네트워크 비용 최적화를 위해 레코드를 버퍼링 후 일괄 API 호출하는 기법
  • Broadcast JOIN : 작은 참조 테이블을 각 노드에 복사하여 shuffle 없이 JOIN하는 기법. shuffle이 없다는 것은 key 기준으로 데이터를 다시 partition에 분배할 필요가 없다는 것을 의미

  • 스트리밍 잡에서 Static Joiner를 사용할 때, 참조 데이터를 JSON/CSV 같은 raw 포맷 대신 Delta Lake로 저장해야 하는 이유
  • 기본적으로 스트리밍 잡은 참조데이터 업데이트를 기다리지 않음. raw 포맷은 전체 덮어쓰기 시 일시적으로 빈 상태가 될 수 있어, 그 순간 JOIN하면 빈 결과를 받을 수있음.
  • Delta Lake는 트랜잭션과 원자성을 보장하므로 커밋되지 않은 파일을 읽지 않아 이와같은 문제를 방지함
[DE Design Pattern]05-1. Static_Joiner
https://yjinheon.netlify.app/posts/02de/de-design-pattern/05_data_value/05-01-static-joiner/
Author
Datamind
Published at
2025-03-14
License
CC BY-NC-SA 4.0