2693 words
13 minutes
[DE Design Pattern]04-2. DB수준에서의 멱등성 보장

01. Keyed Idempotency: 불변 키 생성 전략#

Pattern Overview#

  • 스트리밍 파이프라인이 방문 이벤트를 사용자 세션으로 그룹핑하여 key-value 스토어에 저장합니다. 태스크 재시도 시 중복 세션이 생기지 않도록 멱등성 보장필요
  • key-value 저장소에서 같은 키로 쓰면 항상 덮어쓰기됨. 따라서 멱등성의 핵심은 키 생성 로직 자체-> 동일한 입력에 대해 동일한 키가 나와야함

불변 속성으로 키 만들기#

# 좋은 키 생성: 불변 속성 사용
def generate_session_key_immutable(user_id: int, first_append_time: int) -> str:
"""append_time은 브로커가 기록한 시간 — 재시도해도 변하지 않음"""
return f"{user_id}_{first_append_time}"
# 첫 실행
key = generate_session_key_immutable(user_id=1, first_append_time=957)
print(key) # "1_957"
# 장애 후 재시도 — 같은 키 생성
key = generate_session_key_immutable(user_id=1, first_append_time=957)
print(key) # "1_957" → 덮어쓰기 → 중복 없음

event_time vs append_time#

# 위험한 키 생성: event_time 사용
def generate_session_key_mutable(user_id: int, events: list) -> str:
"""event_time 기반 — late data가 올 경우 키가 변함"""
min_event_time = min(e['event_time'] for e in events)
return f"{user_id}_{min_event_time}"
# 첫 실행: 09:56, 09:57 이벤트
events_run1 = [
{'event_time': '09:56', 'append_time': '09:57'},
{'event_time': '09:57', 'append_time': '09:58'},
]
key1 = generate_session_key_mutable(1, events_run1)
print(key1) # "1_09:56"
# 장애 후 재시도: late data(09:55)가 추가됨
events_run2 = [
{'event_time': '09:55', 'append_time': '09:59'}, # ← late data!
{'event_time': '09:56', 'append_time': '09:57'},
{'event_time': '09:57', 'append_time': '09:58'},
]
key2 = generate_session_key_mutable(1, events_run2)
print(key2) # "1_09:55" ← 키가 바뀜! 이전 "1_09:56"은 유령 데이터로 남음
  • event_time은 late data에 의해 바뀔 수 있는 가변 속성 이지만. append_time은 브로커가 물리적으로 기록한 시점이라 불변 시점이다.

브로커별 append_time 속성명#

브로커속성명
Apache Kafkatimestamp (append time 모드)
Amazon KinesisapproximateArrivalTimestamp
일반 DBinserted_at, ingestion_time 등 DEFAULT NOW()

파일/파티션에도 동일 원칙 적용#

키 기반 저장소뿐 아니라 파일이나 파티션 이름에도 같은 원칙이 적용됩니다.

from datetime import date
# 배치 잡: 실행 날짜로 파일명 생성 → 재실행해도 같은 파일 덮어쓰기
def get_output_path(execution_date: date) -> str:
return f"s3://output/sessions/{execution_date.isoformat()}.parquet"
# 2024-11-20 실행 → "s3://output/sessions/2024-11-20.parquet"
# 2024-11-20 재실행 → 같은 경로 → 덮어쓰기 → 멱등

SQL 윈도우 함수로 첫 append_time 추출#

-- 각 유저의 첫 번째 ingestion_time을 기준으로 세션 키 생성
SELECT
user_id,
FIRST_VALUE(ingestion_time) OVER (
PARTITION BY user_id
ORDER BY ingestion_time ASC
) AS first_append_time
FROM visits

오름차순 정렬이므로 나중에 추가되는 late data는 첫 번째 값에 영향을 주지 않습니다.

주의점#

DB 종류에 따른 동작 차이 — key-value 스토어(Cassandra, ScyllaDB 등)에서는 같은 키로 쓰면 자동 덮어쓰기됩니다. 반면 RDBMS에서는 같은 PK로 INSERT하면 에러가 발생하므로, INSERT ... ON CONFLICT UPDATE 또는 MERGE가 필요합니다.

Kafka의 특수성 — Kafka는 키를 지원하지만 append-only 로그이므로 같은 키의 레코드가 중복 존재합니다. 비동기 compaction으로 최종적으로 중복이 제거되지만, compaction 전까지 소비자는 중복을 볼 수 있습니다.

Compaction과 키 소실 — compaction이 오래된 레코드를 삭제하도록 설정된 경우, 재시작 시 키 생성에 사용된 첫 번째 레코드가 사라져 다른 키가 생성될 수 있습니다. 데이터 자체가 변했으므로 다른 키를 쓰는 것이 논리적으로 맞지만, 멱등성은 깨집니다.


Concept

  • Keyed Idempotency : 불변 속성으로 일관된 키를 생성하여, key-value 스토어의 덮어쓰기 특성으로 멱등성을 보장하는 패턴
  • Append Time : 메시지 브로커가 레코드를 물리적으로 기록한 시점. late data에 영향받지 않는 불변 속성. Kafka의 timestamp, Kinesis의 approximateArrivalTimestamp
  • Event Time vs Append Time : event_time은 이벤트 발생 시점(가변), append_time은 저장 시점(불변). 키 생성에는 불변 속성을 사용해야 함
  • Key-Value 덮어쓰기 시맨틱스 : 같은 키에 쓰면 이전 값이 교체되는 NoSQL 특성. RDBMS에서는 MERGE/UPSERT로 동일 효과 구현
  • Kafka Log Compaction : 같은 키의 레코드 중 최신 것만 남기는 비동기 정리 작업. 실행 시점이 비결정적이라 일시적 중복이 존재

02. Transactional Writer#

Pattern Overview#

  • 클라우드의 spot/preemptible 인스턴스로 비용을 60% 절감
  • 노드가 회수될 때마다 실행 중이던 태스크가 실패 후 다른 노드에서 재시도됨
  • 이 과정에서 **이미 쓴 데이터를 다시 쓰면서 중복이 발생
  • 노드 회수 시점에 따라 불완전한 데이터가 소비자에게 노출

-> 트랜잭션의 all-or-nothing 시맨틱스를 활용. 원자성 보장.

트랜잭션의 3단계#

# 트랜잭션 패턴의 기본 구조
def transactional_write(db, data_batches: list):
try:
# STEP 1: 트랜잭션 시작
db.execute("BEGIN")
# STEP 2: 데이터 쓰기 (아직 소비자에게 보이지 않음)
for batch in data_batches:
db.execute(f"INSERT INTO visits SELECT * FROM {batch}")
# STEP 3a: 성공 → commit (이 시점에 모든 데이터가 한 번에 공개)
db.execute("COMMIT")
except Exception:
# STEP 3b: 실패 → rollback (아무것도 남지 않음)
db.execute("ROLLBACK")
raise

단일 프로세스 vs 분산 처리#

# === 단일 프로세스 (DW에서 SQL 실행) ===
# 트랜잭션이 선언적이고 DB가 완전히 관리
def elt_transactional(db):
db.execute("BEGIN")
db.execute("MERGE INTO devices USING staging_1 ON ...")
db.execute("MERGE INTO devices USING staging_2 ON ...")
db.execute("COMMIT")
# staging_2에 오류가 있으면 staging_1의 MERGE도 함께 rollback
# === 분산 처리 (Spark 등) ===
# 두 가지 구현이 존재
# 방법 1: 태스크별 로컬 트랜잭션
# → 각 태스크가 독립 트랜잭션 → 잡 재시도 시 이미 커밋된 태스크 데이터 중복
def local_transaction_problem():
# Task 1: BEGIN → INSERT → COMMIT ✓ (이미 커밋됨)
# Task 2: BEGIN → INSERT → FAIL ✗
# Job retry: Task 1 다시 실행 → 중복 발생!
pass
# 방법 2: 잡 전체가 하나의 트랜잭션
# → 모든 태스크 완료 후 한 번에 커밋 → 더 강한 보장
def job_level_transaction():
# Task 1: 파일 쓰기 (아직 커밋 안 됨)
# Task 2: 파일 쓰기 (아직 커밋 안 됨)
# 모든 태스크 완료 → commit log 생성 → 한 번에 공개
pass
  • 핵심 차이는 멱등성 범위. 로컬 트랜잭션은 개별 태스크만 보호하고, 잡 레벨 트랜잭션은 전체 파이프라인을 보호

Delta Lake에서의 잡 레벨 트랜잭션#

# Delta Lake: 커밋 로그가 생성되어야 데이터가 공개됨
input_data = spark.read.json("s3://input/visits/")
# 여러 태스크가 병렬로 파일을 쓰지만...
(input_data.write
.format("delta")
.mode("overwrite")
.save("s3://output/visits"))
# → 모든 파일 쓰기 완료 후 _delta_log/에 커밋 파일 생성
# → 커밋 파일이 있어야 소비자가 읽을 수 있음
# → 커밋 전 장애 발생 시: 파일은 있지만 커밋이 없으므로 소비자에게 보이지 않음
# Flink에서 Kafka 트랜잭셔널 쓰기 (개념적 Python 표현)
kafka_sink_config = {
"topic": "reduced_visits",
"delivery_guarantee": "EXACTLY_ONCE", # 트랜잭션 활성화
"transaction_timeout_ms": 10 * 60 * 1000, # 10분
}
# 동작 방식:
# 1. 프로듀서가 파티션에 트랜잭션 시작 메시지 전송
# 2. 데이터 레코드 전송 (소비자에게 아직 안 보임 — isolation level에 따라)
# 3. Flink 체크포인트 완료 시 커밋 메시지 전송
# 4. 이 시점부터 소비자가 데이터를 볼 수 있음
# transaction_timeout이 체크포인트보다 짧으면
# 트랜잭션 만료 → 커밋 불가 → 데이터 유실

transaction.timeout.ms는 Flink의 체크포인트 간격보다 충분히 길어야 합니다. 체크포인트가 완료되어야 커밋이 가능한데, 그 전에 타임아웃이 되면 트랜잭션이 만료됩니다.

PostgreSQL에서의 실제 트랜잭션 예시#

# 두 파일의 MERGE를 하나의 트랜잭션으로 묶기
transaction_sql = """
-- 파일 1 로드 + MERGE
CREATE TEMPORARY TABLE changed_devices_file1 (LIKE dedp.devices);
COPY changed_devices_file1 FROM '/data/dataset_file1.csv' CSV DELIMITER ';' HEADER;
MERGE INTO dedp.devices AS d USING changed_devices_file1 AS c
ON d.type = c.type AND d.version = c.version
WHEN MATCHED THEN UPDATE SET full_name = c.full_name
WHEN NOT MATCHED THEN INSERT (type, full_name, version) VALUES (c.type, c.full_name, c.version);
-- 파일 2 로드 + MERGE (컬럼 길이 초과 에러 발생!)
CREATE TEMPORARY TABLE changed_devices_file2 (LIKE dedp.devices);
COPY changed_devices_file2 FROM '/data/dataset_file2.csv' CSV DELIMITER ';' HEADER;
MERGE INTO dedp.devices AS d USING changed_devices_file2 AS c
ON d.type = c.type AND d.version = c.version
WHEN MATCHED THEN UPDATE SET full_name = c.full_name
WHEN NOT MATCHED THEN INSERT (type, full_name, version) VALUES (c.type, c.full_name, c.version);
-- 파일 2에서 에러 → COMMIT에 도달하지 못함 → 파일 1의 MERGE도 rollback
COMMIT;
"""
  • 파일 2에서 에러가 나면 COMMIT에 도달하지 못하고, 파일 1의 성공적인 MERGE까지 함께 rollback
  • 부분성공이라는게 존재하지 않음

트랜잭션 지원 현황#

기술트랜잭션 지원분산 처리 도구 연동
Delta Lake, Iceberg, Hudi커밋 로그 기반Spark, Flink
PostgreSQL, MySQL, Oracle네이티브 ACID단일 프로세스
BigQuery, Redshift, Snowflake네이티브 ACID내장 분산 처리
Apache Kafka트랜잭셔널 프로듀서Flink만 (Spark 미지원)

주의점#

커밋 오버헤드 — JSON, CSV 같은 raw 포맷은 파일 쓰기 즉시 소비자에게 보이지만, Delta Lake은 커밋 로그 생성까지 기다림. 가장 느린 태스크가 완료될 때까지 전체 데이터가 보이지 않음.

분산 프레임워크의 제한적 지원 — Kafka 트랜잭셔널 프로듀서는 Flink에서만 지원되고, Spark에서는 사용불가능.

멱등성 범위는 현재 트랜잭션 — 백필로 파이프라인을 재실행하면 새 트랜잭션이 열리고 같은 데이터가 다시 삽입됩니다. 트랜잭션 자체는 “현재 실행”의 원자성만 보장하며, 재실행 간 멱등성은 별도로 처리

Dirty Read 주의 — 소비자가 READ UNCOMMITTED 격리 수준을 사용하면 커밋되지 않은 데이터를 읽을 수 있음


Concept

  • Transactional Writer : 트랜잭션의 all-or-nothing 시맨틱스로 불완전한 데이터 노출을 방지하는 패턴. BEGIN → WRITE → COMMIT/ROLLBACK
  • 로컬 트랜잭션 vs 잡 레벨 트랜잭션 : 태스크 단위 트랜잭션은 잡 재시도 시 중복 가능. 잡 전체 트랜잭션은 더 강한 보장이지만 구현이 어려움
  • Commit Log (Delta Lake) : 데이터 파일과 별도로 존재하는 메타데이터 파일. 이것이 생성되어야 데이터가 소비자에게 공개됨
  • Dirty Read : READ UNCOMMITTED 격리 수준에서 커밋되지 않은 트랜잭션의 데이터를 읽는 현상
  • 트랜잭션 타임아웃 : Kafka 트랜잭션에서 체크포인트 완료 전 타임아웃이 발생하면 커밋 불가. 체크포인트 간격보다 충분히 길게 설정해야 함

[DE Design Pattern]04-2. DB수준에서의 멱등성 보장
https://yjinheon.netlify.app/posts/02de/de-design-pattern/04-idempotency/04-04-database_pattern/
Author
Datamind
Published at
2025-02-28
License
CC BY-NC-SA 4.0