2341 words
12 minutes
[DE Design Pattern]04-3. 증분데이터의 멱등성 보장

01. Merger: MERGE(UPSERT)#

Pattern Overview#

  • 새 데이터(변경분)와 기존 데이터(현재 테이블)를 불변 키(identity)를 기준으로 병합
  • SQL의 MERGE (= UPSERT) 가 사실상 핵심

MERGE가 3가지 분기#

# MERGE의 세 가지 분기를 Python으로 표현
def merge(current_table: dict, incoming_records: list):
for record in incoming_records:
key = (record['type'], record['version']) # 불변 키 조합
if key in current_table and record.get('is_deleted'):
# CASE 1: DELETE — 소프트 삭제 플래그가 있는 매칭 레코드
del current_table[key]
elif key in current_table:
# CASE 2: UPDATE — 키가 존재하면 값 갱신
current_table[key] = record
elif not record.get('is_deleted'):
# CASE 3: INSERT — 키가 없고, 삭제 표시도 아닌 새 레코드
current_table[key] = record
# is_deleted 체크가 없으면 삭제된 레코드도 INSERT됨!
# 예시 실행
table = {}
batch_1 = [
{'type': 'mobile', 'version': '1.0', 'full_name': 'Phone A', 'is_deleted': False},
{'type': 'tablet', 'version': '2.0', 'full_name': 'Tablet B', 'is_deleted': False},
]
merge(table, batch_1)
print(table) # 2건 INSERT
batch_2 = [
{'type': 'mobile', 'version': '1.0', 'full_name': 'Phone A v2', 'is_deleted': False}, # UPDATE
{'type': 'tablet', 'version': '2.0', 'full_name': 'Tablet B', 'is_deleted': True}, # DELETE
{'type': 'laptop', 'version': '3.0', 'full_name': 'Laptop C', 'is_deleted': False}, # INSERT
]
merge(table, batch_2)
print(table) # mobile=v2, laptop=C (tablet 삭제됨)

SQL MERGE#

MERGE INTO devices_output AS target
USING devices_input AS input
ON target.type = input.type AND target.version = input.version
-- 매칭 + 삭제 플래그 → 물리 삭제
WHEN MATCHED AND input.is_deleted = true THEN DELETE
-- 매칭 + 삭제 아님 → 업데이트
WHEN MATCHED AND input.is_deleted = false THEN
UPDATE SET full_name = input.full_name
-- 미매칭 + 삭제 아님 → 신규 삽입
WHEN NOT MATCHED AND input.is_deleted = false THEN
INSERT (full_name, version, type) VALUES (input.full_name, input.version, input.type)

soft delete flag가 체크되지 않을 경우#

  • 첫 번째 실행에서 이 체크가 없으면, 삭제 표시된 레코드도 신규로 INSERT -> 첫번째 실행에서만 유령 record가 남음
  • 이후 MERGE에서는 해당 레코드가 이미 테이블에 존재하므로 MATCHED 분기를 타서 DELETE
# is_deleted 체크 없는 잘못된 구현
def merge_wrong(current_table: dict, incoming_records: list):
for record in incoming_records:
key = (record['type'], record['version'])
if key in current_table and record.get('is_deleted'):
del current_table[key]
elif key in current_table:
current_table[key] = record
else:
current_table[key] = record # ❌ 삭제된 레코드도 INSERT!
# 첫 실행 시 Record 1(deleted)이 테이블에 들어감
table = {}
merge_wrong(table, [
{'type': 'A', 'version': '1', 'full_name': 'X', 'is_deleted': True},
{'type': 'B', 'version': '1', 'full_name': 'Y', 'is_deleted': False},
])
print(table) # A도 포함됨 → 잘못된 상태

backfill 일관성 문제#

  • 증분 데이터셋 백필 시 소비자가 보는 데이터가 일시적으로 달라짐
# 정상 실행 시나리오
timeline = {
"07:00": {"new": ["A"], "table_after": ["A"]},
"08:00": {"new": ["A-updated","B"], "table_after": ["A-updated","B"]},
"09:00": {"new": ["B-deleted","C"], "table_after": ["A-updated","C"]},
"10:00": {"new": ["M","N","O"], "table_after": ["A-updated","C","M","N","O"]},
}
# 08:00부터 백필 시 — 현재 테이블 상태(10:00 기준)에서 MERGE 시작
# 08:00 백필: [A-updated, B] merge → [A-updated, B, C, M, N, O]
# 정상 실행 때 08:00 시점에는 M, N, O가 존재하지 않음
# 소비자는 백필 중 정상 실행과 다른 데이터를 봄
  • 이는 Merger가 기본적으로 stateless(상태를 저장하지 않음)하기 때문
  • 테이블을 이전 상태로 복원할 방법이 없어서, 백필 시 현재 테이블에 그대로 MERGE됨

주의점#

불변 키 필수 — 데이터 프로바이더가 row를 고유하게 식별할 수 있는 불변 속성을 제공해야함. I/O 비용 — Fast Metadata Cleaner와 달리 데이터 블록 수준에서 동작. 현대 DB와 OTF는 데이터를 활용해 관련 파일만 스캔하도록 최적화가능


Concept

  • Merger (UPSERT) : 불변 키를 기준으로 새 데이터와 기존 데이터를 병합하는 패턴. INSERT, UPDATE, DELETE 세 가지 분기를 처리
  • MERGE 구문 : SQL 표준의 UPSERT 명령. WHEN MATCHED / WHEN NOT MATCHED 분기로 Insert, Update, Delete를 하나의 쿼리로 처리
  • Soft Delete : 물리적 삭제 대신 is_deleted 같은 플래그로 삭제를 표시하는 방식. Merger 패턴에서 Hard Delete를 처리하기 위한 전제 조건
  • 불변 키(Immutable Identity) : MERGE의 ON 절에 사용되는, 변하지 않는 레코드 식별 속성 조합. 키가 변경되면 멱등성이 깨짐
  • Stateless 패턴의 백필 한계 : 상태를 저장하지 않는 Merger는 백필 시 테이블을 이전 버전으로 복원할 수 없어 소비자에게 일시적 불일치를 노출

  • Delta Lake MERGE 최적화 : Delta Lake가 내부적으로 통계(min/max)를 활용해 MERGE 대상 파일만 스캔하는 메커니즘

02. Stateful Merger#

Pattern Overview#

  • Merger 패턴에 **상태 테이블(state table)**을 추가
  • state table은 “어떤 실행 시점에 어떤 테이블 버전이 생성되었는지”를 기록
  • 백필 시 해당 버전으로 복원한 뒤 MERGE재실행

Stateful Merger Workflow#

# Stateful Merger의 세 단계
def stateful_merger_pipeline(db, spark, execution_time, data):
# STEP 1: 복원 (백필일 때만 동작)
restore_if_needed(db, spark, execution_time)
# STEP 2: MERGE (기존 Merger 패턴과 동일)
run_merge(spark, data)
# STEP 3: 상태 테이블 업데이트
update_state_table(db, spark, execution_time)

State Table Architecture#

# 상태 테이블: 실행 시점 → 테이블 버전 매핑
state_table = {
"2024-10-05": 1, # 10/05 실행 → 버전 1 생성
"2024-10-06": 2, # 10/06 실행 → 버전 2 생성
"2024-10-07": 3, # 10/07 실행 → 버전 3 생성
"2024-10-08": 4, # 10/08 실행 → 버전 4 생성 (최신)
}

백필 감지 로직#

핵심은 “이전 실행이 만든 버전”과 “테이블의 현재 최신 버전”을 비교하는 것입니다.

def restore_if_needed(state_table: dict, current_table_version: int,
execution_time: str, previous_execution_time: str):
previous_version = state_table.get(previous_execution_time)
# CASE 1: 이전 버전이 없음 → 최초 실행 또는 첫 실행 백필
if previous_version is None:
print("TRUNCATE TABLE — 처음부터 시작")
return "truncate"
# CASE 2: 이전 버전 == 현재 최신 버전 → 정상 실행
if previous_version == current_table_version:
print("정상 실행 — 복원 불필요")
return "no_restore"
# CASE 3: 이전 버전 != 현재 최신 버전 → 백필 감지!
print(f"백필 감지! 버전 {previous_version}으로 복원")
return f"restore_to_{previous_version}"
# 시나리오 1: 정상적인 다음 실행 (10/09)
result = restore_if_needed(
state_table={"2024-10-08": 4},
current_table_version=4, # 최신 = 4
execution_time="2024-10-09",
previous_execution_time="2024-10-08" # 이전 실행 버전도 4
)
# → "정상 실행 — 복원 불필요"
# 시나리오 2: 10/07 백필
result = restore_if_needed(
state_table={"2024-10-06": 2, "2024-10-07": 3, "2024-10-08": 4},
current_table_version=4, # 최신 = 4
execution_time="2024-10-07",
previous_execution_time="2024-10-06" # 이전 실행 버전 = 2, 최신 = 4 → 불일치!
)
# → "백필 감지! 버전 2로 복원"

백필 후 상태 테이블 변화#

# 10/07 백필 전
state = {"2024-10-05": 1, "2024-10-06": 2, "2024-10-07": 3, "2024-10-08": 4}
# 10/07 백필 실행 → 버전 2로 복원 → MERGE → 새 버전 5 생성
state = {"2024-10-05": 1, "2024-10-06": 2, "2024-10-07": 5, "2024-10-08": 4}
# ↑ 3→5로 업데이트
# 이후 10/08도 자동 백필됨
# 이전 버전(10/07) = 5, 최신 버전 = 5 → 일치 → 정상 실행 모드로 진입
  • 10/07을 백필하면 오케스트레이터가 10/08도 연쇄적으로 백필.
  • 그런데 이 시점에서 이전 실행(10/07)의 버전과 최신 버전이 동일하므로, 복원 없이 정상 실행 모드로 MERGE됩니다.

Compaction이 끼어들 때의 문제#

  • Delta Lake에서 compaction은 데이터를 변경하지 않지만 새 버전을 생성.
  • 이전 실행의 버전으로 복원하면 compaction 결과를 놓칠 수 있음
# 상태 테이블 + 테이블 히스토리
state = {"2024-10-05": 5, "2024-10-06": 7, "2024-10-07": 9, "2024-10-08": 10}
history = {5: "MERGE", 6: "COMPACTION", 7: "MERGE", 8: "COMPACTION", 9: "MERGE", 10: "MERGE"}
# 10/07 백필 시 — 잘못된 방식: 이전 실행(10/06)의 버전 7로 복원
# → 버전 8(COMPACTION)을 놓침
# 올바른 방식: 현재 실행 버전 - 1 로 복원
def get_version_to_restore(state_table, execution_time):
current_version = state_table[execution_time] # 9
return current_version - 1 # 8 (COMPACTION 포함)

version_to_restore = version_for_current_execution - 1 공식을 쓰면, 현재 실행 직전의 버전(compaction 포함)으로 정확히 복원됨

버전 관리가 없는 DB에서의 대안#

Delta Lake처럼 버전 관리를 지원하지 않는 DB(PostgreSQL 등)에서는 raw data 히스토리 테이블로 대체

# PostgreSQL 대안: raw data 히스토리 테이블 활용
def stateful_merger_no_versioning(db, execution_time, data):
# 백필 감지: 미래 execution_time의 데이터가 있는지 확인
is_backfill = db.query(f"""
SELECT COUNT(*) > 0 FROM devices_history
WHERE execution_time > '{execution_time}'
""")
if is_backfill:
# 1. 미래 데이터 삭제
db.execute(f"DELETE FROM devices_history WHERE execution_time >= '{execution_time}'")
# 2. 남은 히스토리로 메인 테이블 재구성 (Windowed Deduplicator)
db.execute("REBUILD devices FROM devices_history")
# 3. 새 데이터를 히스토리에 적재
db.execute(f"INSERT INTO devices_history (execution_time, ...) VALUES ('{execution_time}', ...)")
# 4. MERGE 실행
db.execute("MERGE INTO devices USING devices_history ...")

주의점#

VACUUM과 복원 한계 — Delta Lake/Iceberg의 retention 기간이 지나면 이전 버전 파일이 삭제됨. retention과 storage 비용의 trageoff 추가적인 운영 복잡성 — Merger 대비 상태 테이블 관리, 복원 로직, compaction 대응 등 운영 부담이 늘어남. 백필 일관성이 중요하지 않다면 기본 Merger로 충분


Concept

  • Stateful Merger : Merger 패턴에 상태 테이블을 추가하여 백필 시 테이블을 이전 버전으로 복원한 후 MERGE를 수행하는 패턴
  • 상태 테이블(State Table) : 파이프라인 실행 시점과 해당 실행이 생성한 테이블 버전을 매핑하는 메타데이터 테이블
  • 백필 감지 로직 : 이전 실행의 테이블 버전과 현재 최신 버전을 비교하여, 불일치하면 백필 모드로 진입하는 판단 로직
  • Compaction과 버전 간극 : compaction이 데이터 변경 없이 새 버전을 생성하므로, 복원 시 current_version - 1 공식으로 정확한 버전을 찾아야 함
  • Raw Data History 테이블 : 버전 관리를 지원하지 않는 DB에서 execution_time 컬럼으로 히스토리를 쌓아 Stateful Merger를 구현하는 대안

[DE Design Pattern]04-3. 증분데이터의 멱등성 보장
https://yjinheon.netlify.app/posts/02de/de-design-pattern/04-idempotency/04-03-update_pattern/
Author
Datamind
Published at
2025-02-28
License
CC BY-NC-SA 4.0