2341 words
12 minutes
[DE Design Pattern]04-3. 증분데이터의 멱등성 보장
01. Merger: MERGE(UPSERT)
Pattern Overview
- 새 데이터(변경분)와 기존 데이터(현재 테이블)를 불변 키(identity)를 기준으로 병합
- SQL의
MERGE(= UPSERT) 가 사실상 핵심
MERGE가 3가지 분기
# MERGE의 세 가지 분기를 Python으로 표현def merge(current_table: dict, incoming_records: list): for record in incoming_records: key = (record['type'], record['version']) # 불변 키 조합
if key in current_table and record.get('is_deleted'): # CASE 1: DELETE — 소프트 삭제 플래그가 있는 매칭 레코드 del current_table[key]
elif key in current_table: # CASE 2: UPDATE — 키가 존재하면 값 갱신 current_table[key] = record
elif not record.get('is_deleted'): # CASE 3: INSERT — 키가 없고, 삭제 표시도 아닌 새 레코드 current_table[key] = record # is_deleted 체크가 없으면 삭제된 레코드도 INSERT됨!
# 예시 실행table = {}batch_1 = [ {'type': 'mobile', 'version': '1.0', 'full_name': 'Phone A', 'is_deleted': False}, {'type': 'tablet', 'version': '2.0', 'full_name': 'Tablet B', 'is_deleted': False},]merge(table, batch_1)print(table) # 2건 INSERT
batch_2 = [ {'type': 'mobile', 'version': '1.0', 'full_name': 'Phone A v2', 'is_deleted': False}, # UPDATE {'type': 'tablet', 'version': '2.0', 'full_name': 'Tablet B', 'is_deleted': True}, # DELETE {'type': 'laptop', 'version': '3.0', 'full_name': 'Laptop C', 'is_deleted': False}, # INSERT]merge(table, batch_2)print(table) # mobile=v2, laptop=C (tablet 삭제됨)SQL MERGE
MERGE INTO devices_output AS targetUSING devices_input AS inputON target.type = input.type AND target.version = input.version
-- 매칭 + 삭제 플래그 → 물리 삭제WHEN MATCHED AND input.is_deleted = true THEN DELETE
-- 매칭 + 삭제 아님 → 업데이트WHEN MATCHED AND input.is_deleted = false THEN UPDATE SET full_name = input.full_name
-- 미매칭 + 삭제 아님 → 신규 삽입WHEN NOT MATCHED AND input.is_deleted = false THEN INSERT (full_name, version, type) VALUES (input.full_name, input.version, input.type)soft delete flag가 체크되지 않을 경우
- 첫 번째 실행에서 이 체크가 없으면, 삭제 표시된 레코드도 신규로 INSERT -> 첫번째 실행에서만 유령 record가 남음
- 이후 MERGE에서는 해당 레코드가 이미 테이블에 존재하므로 MATCHED 분기를 타서 DELETE
# is_deleted 체크 없는 잘못된 구현def merge_wrong(current_table: dict, incoming_records: list): for record in incoming_records: key = (record['type'], record['version']) if key in current_table and record.get('is_deleted'): del current_table[key] elif key in current_table: current_table[key] = record else: current_table[key] = record # ❌ 삭제된 레코드도 INSERT!
# 첫 실행 시 Record 1(deleted)이 테이블에 들어감table = {}merge_wrong(table, [ {'type': 'A', 'version': '1', 'full_name': 'X', 'is_deleted': True}, {'type': 'B', 'version': '1', 'full_name': 'Y', 'is_deleted': False},])print(table) # A도 포함됨 → 잘못된 상태backfill 일관성 문제
- 증분 데이터셋 백필 시 소비자가 보는 데이터가 일시적으로 달라짐
# 정상 실행 시나리오timeline = { "07:00": {"new": ["A"], "table_after": ["A"]}, "08:00": {"new": ["A-updated","B"], "table_after": ["A-updated","B"]}, "09:00": {"new": ["B-deleted","C"], "table_after": ["A-updated","C"]}, "10:00": {"new": ["M","N","O"], "table_after": ["A-updated","C","M","N","O"]},}
# 08:00부터 백필 시 — 현재 테이블 상태(10:00 기준)에서 MERGE 시작# 08:00 백필: [A-updated, B] merge → [A-updated, B, C, M, N, O]# 정상 실행 때 08:00 시점에는 M, N, O가 존재하지 않음# 소비자는 백필 중 정상 실행과 다른 데이터를 봄- 이는 Merger가 기본적으로 stateless(상태를 저장하지 않음)하기 때문
- 테이블을 이전 상태로 복원할 방법이 없어서, 백필 시 현재 테이블에 그대로 MERGE됨
주의점
불변 키 필수 — 데이터 프로바이더가 row를 고유하게 식별할 수 있는 불변 속성을 제공해야함. I/O 비용 — Fast Metadata Cleaner와 달리 데이터 블록 수준에서 동작. 현대 DB와 OTF는 데이터를 활용해 관련 파일만 스캔하도록 최적화가능
Concept
- Merger (UPSERT) : 불변 키를 기준으로 새 데이터와 기존 데이터를 병합하는 패턴. INSERT, UPDATE, DELETE 세 가지 분기를 처리
- MERGE 구문 : SQL 표준의 UPSERT 명령. WHEN MATCHED / WHEN NOT MATCHED 분기로 Insert, Update, Delete를 하나의 쿼리로 처리
- Soft Delete : 물리적 삭제 대신 is_deleted 같은 플래그로 삭제를 표시하는 방식. Merger 패턴에서 Hard Delete를 처리하기 위한 전제 조건
- 불변 키(Immutable Identity) : MERGE의 ON 절에 사용되는, 변하지 않는 레코드 식별 속성 조합. 키가 변경되면 멱등성이 깨짐
- Stateless 패턴의 백필 한계 : 상태를 저장하지 않는 Merger는 백필 시 테이블을 이전 버전으로 복원할 수 없어 소비자에게 일시적 불일치를 노출
- Delta Lake MERGE 최적화 : Delta Lake가 내부적으로 통계(min/max)를 활용해 MERGE 대상 파일만 스캔하는 메커니즘
02. Stateful Merger
Pattern Overview
- Merger 패턴에 **상태 테이블(state table)**을 추가
- state table은 “어떤 실행 시점에 어떤 테이블 버전이 생성되었는지”를 기록
- 백필 시 해당 버전으로 복원한 뒤 MERGE재실행
Stateful Merger Workflow
# Stateful Merger의 세 단계def stateful_merger_pipeline(db, spark, execution_time, data): # STEP 1: 복원 (백필일 때만 동작) restore_if_needed(db, spark, execution_time)
# STEP 2: MERGE (기존 Merger 패턴과 동일) run_merge(spark, data)
# STEP 3: 상태 테이블 업데이트 update_state_table(db, spark, execution_time)State Table Architecture
# 상태 테이블: 실행 시점 → 테이블 버전 매핑state_table = { "2024-10-05": 1, # 10/05 실행 → 버전 1 생성 "2024-10-06": 2, # 10/06 실행 → 버전 2 생성 "2024-10-07": 3, # 10/07 실행 → 버전 3 생성 "2024-10-08": 4, # 10/08 실행 → 버전 4 생성 (최신)}백필 감지 로직
핵심은 “이전 실행이 만든 버전”과 “테이블의 현재 최신 버전”을 비교하는 것입니다.
def restore_if_needed(state_table: dict, current_table_version: int, execution_time: str, previous_execution_time: str): previous_version = state_table.get(previous_execution_time)
# CASE 1: 이전 버전이 없음 → 최초 실행 또는 첫 실행 백필 if previous_version is None: print("TRUNCATE TABLE — 처음부터 시작") return "truncate"
# CASE 2: 이전 버전 == 현재 최신 버전 → 정상 실행 if previous_version == current_table_version: print("정상 실행 — 복원 불필요") return "no_restore"
# CASE 3: 이전 버전 != 현재 최신 버전 → 백필 감지! print(f"백필 감지! 버전 {previous_version}으로 복원") return f"restore_to_{previous_version}"
# 시나리오 1: 정상적인 다음 실행 (10/09)result = restore_if_needed( state_table={"2024-10-08": 4}, current_table_version=4, # 최신 = 4 execution_time="2024-10-09", previous_execution_time="2024-10-08" # 이전 실행 버전도 4)# → "정상 실행 — 복원 불필요"
# 시나리오 2: 10/07 백필result = restore_if_needed( state_table={"2024-10-06": 2, "2024-10-07": 3, "2024-10-08": 4}, current_table_version=4, # 최신 = 4 execution_time="2024-10-07", previous_execution_time="2024-10-06" # 이전 실행 버전 = 2, 최신 = 4 → 불일치!)# → "백필 감지! 버전 2로 복원"백필 후 상태 테이블 변화
# 10/07 백필 전state = {"2024-10-05": 1, "2024-10-06": 2, "2024-10-07": 3, "2024-10-08": 4}
# 10/07 백필 실행 → 버전 2로 복원 → MERGE → 새 버전 5 생성state = {"2024-10-05": 1, "2024-10-06": 2, "2024-10-07": 5, "2024-10-08": 4}# ↑ 3→5로 업데이트
# 이후 10/08도 자동 백필됨# 이전 버전(10/07) = 5, 최신 버전 = 5 → 일치 → 정상 실행 모드로 진입- 10/07을 백필하면 오케스트레이터가 10/08도 연쇄적으로 백필.
- 그런데 이 시점에서 이전 실행(10/07)의 버전과 최신 버전이 동일하므로, 복원 없이 정상 실행 모드로 MERGE됩니다.
Compaction이 끼어들 때의 문제
- Delta Lake에서 compaction은 데이터를 변경하지 않지만 새 버전을 생성.
- 이전 실행의 버전으로 복원하면 compaction 결과를 놓칠 수 있음
# 상태 테이블 + 테이블 히스토리state = {"2024-10-05": 5, "2024-10-06": 7, "2024-10-07": 9, "2024-10-08": 10}history = {5: "MERGE", 6: "COMPACTION", 7: "MERGE", 8: "COMPACTION", 9: "MERGE", 10: "MERGE"}
# 10/07 백필 시 — 잘못된 방식: 이전 실행(10/06)의 버전 7로 복원# → 버전 8(COMPACTION)을 놓침
# 올바른 방식: 현재 실행 버전 - 1 로 복원def get_version_to_restore(state_table, execution_time): current_version = state_table[execution_time] # 9 return current_version - 1 # 8 (COMPACTION 포함)version_to_restore = version_for_current_execution - 1 공식을 쓰면, 현재 실행 직전의 버전(compaction 포함)으로 정확히 복원됨
버전 관리가 없는 DB에서의 대안
Delta Lake처럼 버전 관리를 지원하지 않는 DB(PostgreSQL 등)에서는 raw data 히스토리 테이블로 대체
# PostgreSQL 대안: raw data 히스토리 테이블 활용def stateful_merger_no_versioning(db, execution_time, data): # 백필 감지: 미래 execution_time의 데이터가 있는지 확인 is_backfill = db.query(f""" SELECT COUNT(*) > 0 FROM devices_history WHERE execution_time > '{execution_time}' """)
if is_backfill: # 1. 미래 데이터 삭제 db.execute(f"DELETE FROM devices_history WHERE execution_time >= '{execution_time}'") # 2. 남은 히스토리로 메인 테이블 재구성 (Windowed Deduplicator) db.execute("REBUILD devices FROM devices_history")
# 3. 새 데이터를 히스토리에 적재 db.execute(f"INSERT INTO devices_history (execution_time, ...) VALUES ('{execution_time}', ...)")
# 4. MERGE 실행 db.execute("MERGE INTO devices USING devices_history ...")주의점
VACUUM과 복원 한계 — Delta Lake/Iceberg의 retention 기간이 지나면 이전 버전 파일이 삭제됨. retention과 storage 비용의 trageoff 추가적인 운영 복잡성 — Merger 대비 상태 테이블 관리, 복원 로직, compaction 대응 등 운영 부담이 늘어남. 백필 일관성이 중요하지 않다면 기본 Merger로 충분
Concept
- Stateful Merger : Merger 패턴에 상태 테이블을 추가하여 백필 시 테이블을 이전 버전으로 복원한 후 MERGE를 수행하는 패턴
- 상태 테이블(State Table) : 파이프라인 실행 시점과 해당 실행이 생성한 테이블 버전을 매핑하는 메타데이터 테이블
- 백필 감지 로직 : 이전 실행의 테이블 버전과 현재 최신 버전을 비교하여, 불일치하면 백필 모드로 진입하는 판단 로직
- Compaction과 버전 간극 : compaction이 데이터 변경 없이 새 버전을 생성하므로, 복원 시
current_version - 1공식으로 정확한 버전을 찾아야 함 - Raw Data History 테이블 : 버전 관리를 지원하지 않는 DB에서 execution_time 컬럼으로 히스토리를 쌓아 Stateful Merger를 구현하는 대안
[DE Design Pattern]04-3. 증분데이터의 멱등성 보장
https://yjinheon.netlify.app/posts/02de/de-design-pattern/04-idempotency/04-03-update_pattern/