01. Keyed Idempotency: 불변 키 생성 전략
Pattern Overview
- 스트리밍 파이프라인이 방문 이벤트를 사용자 세션으로 그룹핑하여 key-value 스토어에 저장합니다. 태스크 재시도 시 중복 세션이 생기지 않도록 멱등성 보장필요
- key-value 저장소에서 같은 키로 쓰면 항상 덮어쓰기됨. 따라서 멱등성의 핵심은 키 생성 로직 자체-> 동일한 입력에 대해 동일한 키가 나와야함
불변 속성으로 키 만들기
# 좋은 키 생성: 불변 속성 사용def generate_session_key_immutable(user_id: int, first_append_time: int) -> str: """append_time은 브로커가 기록한 시간 — 재시도해도 변하지 않음""" return f"{user_id}_{first_append_time}"
# 첫 실행key = generate_session_key_immutable(user_id=1, first_append_time=957)print(key) # "1_957"
# 장애 후 재시도 — 같은 키 생성key = generate_session_key_immutable(user_id=1, first_append_time=957)print(key) # "1_957" → 덮어쓰기 → 중복 없음event_time vs append_time
# 위험한 키 생성: event_time 사용def generate_session_key_mutable(user_id: int, events: list) -> str: """event_time 기반 — late data가 올 경우 키가 변함""" min_event_time = min(e['event_time'] for e in events) return f"{user_id}_{min_event_time}"
# 첫 실행: 09:56, 09:57 이벤트events_run1 = [ {'event_time': '09:56', 'append_time': '09:57'}, {'event_time': '09:57', 'append_time': '09:58'},]key1 = generate_session_key_mutable(1, events_run1)print(key1) # "1_09:56"
# 장애 후 재시도: late data(09:55)가 추가됨events_run2 = [ {'event_time': '09:55', 'append_time': '09:59'}, # ← late data! {'event_time': '09:56', 'append_time': '09:57'}, {'event_time': '09:57', 'append_time': '09:58'},]key2 = generate_session_key_mutable(1, events_run2)print(key2) # "1_09:55" ← 키가 바뀜! 이전 "1_09:56"은 유령 데이터로 남음- event_time은 late data에 의해 바뀔 수 있는 가변 속성 이지만. append_time은 브로커가 물리적으로 기록한 시점이라 불변 시점이다.
브로커별 append_time 속성명
| 브로커 | 속성명 |
|---|---|
| Apache Kafka | timestamp (append time 모드) |
| Amazon Kinesis | approximateArrivalTimestamp |
| 일반 DB | inserted_at, ingestion_time 등 DEFAULT NOW() |
파일/파티션에도 동일 원칙 적용
키 기반 저장소뿐 아니라 파일이나 파티션 이름에도 같은 원칙이 적용됩니다.
from datetime import date
# 배치 잡: 실행 날짜로 파일명 생성 → 재실행해도 같은 파일 덮어쓰기def get_output_path(execution_date: date) -> str: return f"s3://output/sessions/{execution_date.isoformat()}.parquet"
# 2024-11-20 실행 → "s3://output/sessions/2024-11-20.parquet"# 2024-11-20 재실행 → 같은 경로 → 덮어쓰기 → 멱등SQL 윈도우 함수로 첫 append_time 추출
-- 각 유저의 첫 번째 ingestion_time을 기준으로 세션 키 생성SELECT user_id, FIRST_VALUE(ingestion_time) OVER ( PARTITION BY user_id ORDER BY ingestion_time ASC ) AS first_append_timeFROM visits오름차순 정렬이므로 나중에 추가되는 late data는 첫 번째 값에 영향을 주지 않습니다.
주의점
DB 종류에 따른 동작 차이 — key-value 스토어(Cassandra, ScyllaDB 등)에서는 같은 키로 쓰면 자동 덮어쓰기됩니다. 반면 RDBMS에서는 같은 PK로 INSERT하면 에러가 발생하므로, INSERT ... ON CONFLICT UPDATE 또는 MERGE가 필요합니다.
Kafka의 특수성 — Kafka는 키를 지원하지만 append-only 로그이므로 같은 키의 레코드가 중복 존재합니다. 비동기 compaction으로 최종적으로 중복이 제거되지만, compaction 전까지 소비자는 중복을 볼 수 있습니다.
Compaction과 키 소실 — compaction이 오래된 레코드를 삭제하도록 설정된 경우, 재시작 시 키 생성에 사용된 첫 번째 레코드가 사라져 다른 키가 생성될 수 있습니다. 데이터 자체가 변했으므로 다른 키를 쓰는 것이 논리적으로 맞지만, 멱등성은 깨집니다.
Concept
- Keyed Idempotency : 불변 속성으로 일관된 키를 생성하여, key-value 스토어의 덮어쓰기 특성으로 멱등성을 보장하는 패턴
- Append Time : 메시지 브로커가 레코드를 물리적으로 기록한 시점. late data에 영향받지 않는 불변 속성. Kafka의
timestamp, Kinesis의approximateArrivalTimestamp - Event Time vs Append Time : event_time은 이벤트 발생 시점(가변), append_time은 저장 시점(불변). 키 생성에는 불변 속성을 사용해야 함
- Key-Value 덮어쓰기 시맨틱스 : 같은 키에 쓰면 이전 값이 교체되는 NoSQL 특성. RDBMS에서는 MERGE/UPSERT로 동일 효과 구현
- Kafka Log Compaction : 같은 키의 레코드 중 최신 것만 남기는 비동기 정리 작업. 실행 시점이 비결정적이라 일시적 중복이 존재
02. Transactional Writer
Pattern Overview
- 클라우드의 spot/preemptible 인스턴스로 비용을 60% 절감
- 노드가 회수될 때마다 실행 중이던 태스크가 실패 후 다른 노드에서 재시도됨
- 이 과정에서 **이미 쓴 데이터를 다시 쓰면서 중복이 발생
- 노드 회수 시점에 따라 불완전한 데이터가 소비자에게 노출
-> 트랜잭션의 all-or-nothing 시맨틱스를 활용. 원자성 보장.
트랜잭션의 3단계
# 트랜잭션 패턴의 기본 구조def transactional_write(db, data_batches: list): try: # STEP 1: 트랜잭션 시작 db.execute("BEGIN")
# STEP 2: 데이터 쓰기 (아직 소비자에게 보이지 않음) for batch in data_batches: db.execute(f"INSERT INTO visits SELECT * FROM {batch}")
# STEP 3a: 성공 → commit (이 시점에 모든 데이터가 한 번에 공개) db.execute("COMMIT")
except Exception: # STEP 3b: 실패 → rollback (아무것도 남지 않음) db.execute("ROLLBACK") raise단일 프로세스 vs 분산 처리
# === 단일 프로세스 (DW에서 SQL 실행) ===# 트랜잭션이 선언적이고 DB가 완전히 관리def elt_transactional(db): db.execute("BEGIN") db.execute("MERGE INTO devices USING staging_1 ON ...") db.execute("MERGE INTO devices USING staging_2 ON ...") db.execute("COMMIT") # staging_2에 오류가 있으면 staging_1의 MERGE도 함께 rollback
# === 분산 처리 (Spark 등) ===# 두 가지 구현이 존재
# 방법 1: 태스크별 로컬 트랜잭션# → 각 태스크가 독립 트랜잭션 → 잡 재시도 시 이미 커밋된 태스크 데이터 중복def local_transaction_problem(): # Task 1: BEGIN → INSERT → COMMIT ✓ (이미 커밋됨) # Task 2: BEGIN → INSERT → FAIL ✗ # Job retry: Task 1 다시 실행 → 중복 발생! pass
# 방법 2: 잡 전체가 하나의 트랜잭션# → 모든 태스크 완료 후 한 번에 커밋 → 더 강한 보장def job_level_transaction(): # Task 1: 파일 쓰기 (아직 커밋 안 됨) # Task 2: 파일 쓰기 (아직 커밋 안 됨) # 모든 태스크 완료 → commit log 생성 → 한 번에 공개 pass- 핵심 차이는 멱등성 범위. 로컬 트랜잭션은 개별 태스크만 보호하고, 잡 레벨 트랜잭션은 전체 파이프라인을 보호
Delta Lake에서의 잡 레벨 트랜잭션
# Delta Lake: 커밋 로그가 생성되어야 데이터가 공개됨input_data = spark.read.json("s3://input/visits/")
# 여러 태스크가 병렬로 파일을 쓰지만...(input_data.write .format("delta") .mode("overwrite") .save("s3://output/visits"))# → 모든 파일 쓰기 완료 후 _delta_log/에 커밋 파일 생성# → 커밋 파일이 있어야 소비자가 읽을 수 있음# → 커밋 전 장애 발생 시: 파일은 있지만 커밋이 없으므로 소비자에게 보이지 않음Kafka의 트랜잭셔널 프로듀서 (Apache Flink)
# Flink에서 Kafka 트랜잭셔널 쓰기 (개념적 Python 표현)kafka_sink_config = { "topic": "reduced_visits", "delivery_guarantee": "EXACTLY_ONCE", # 트랜잭션 활성화 "transaction_timeout_ms": 10 * 60 * 1000, # 10분}
# 동작 방식:# 1. 프로듀서가 파티션에 트랜잭션 시작 메시지 전송# 2. 데이터 레코드 전송 (소비자에게 아직 안 보임 — isolation level에 따라)# 3. Flink 체크포인트 완료 시 커밋 메시지 전송# 4. 이 시점부터 소비자가 데이터를 볼 수 있음
# transaction_timeout이 체크포인트보다 짧으면# 트랜잭션 만료 → 커밋 불가 → 데이터 유실transaction.timeout.ms는 Flink의 체크포인트 간격보다 충분히 길어야 합니다. 체크포인트가 완료되어야 커밋이 가능한데, 그 전에 타임아웃이 되면 트랜잭션이 만료됩니다.
PostgreSQL에서의 실제 트랜잭션 예시
# 두 파일의 MERGE를 하나의 트랜잭션으로 묶기transaction_sql = """-- 파일 1 로드 + MERGECREATE TEMPORARY TABLE changed_devices_file1 (LIKE dedp.devices);COPY changed_devices_file1 FROM '/data/dataset_file1.csv' CSV DELIMITER ';' HEADER;MERGE INTO dedp.devices AS d USING changed_devices_file1 AS c ON d.type = c.type AND d.version = c.version WHEN MATCHED THEN UPDATE SET full_name = c.full_name WHEN NOT MATCHED THEN INSERT (type, full_name, version) VALUES (c.type, c.full_name, c.version);
-- 파일 2 로드 + MERGE (컬럼 길이 초과 에러 발생!)CREATE TEMPORARY TABLE changed_devices_file2 (LIKE dedp.devices);COPY changed_devices_file2 FROM '/data/dataset_file2.csv' CSV DELIMITER ';' HEADER;MERGE INTO dedp.devices AS d USING changed_devices_file2 AS c ON d.type = c.type AND d.version = c.version WHEN MATCHED THEN UPDATE SET full_name = c.full_name WHEN NOT MATCHED THEN INSERT (type, full_name, version) VALUES (c.type, c.full_name, c.version);
-- 파일 2에서 에러 → COMMIT에 도달하지 못함 → 파일 1의 MERGE도 rollbackCOMMIT;"""- 파일 2에서 에러가 나면 COMMIT에 도달하지 못하고, 파일 1의 성공적인 MERGE까지 함께 rollback
- 부분성공이라는게 존재하지 않음
트랜잭션 지원 현황
| 기술 | 트랜잭션 지원 | 분산 처리 도구 연동 |
|---|---|---|
| Delta Lake, Iceberg, Hudi | 커밋 로그 기반 | Spark, Flink |
| PostgreSQL, MySQL, Oracle | 네이티브 ACID | 단일 프로세스 |
| BigQuery, Redshift, Snowflake | 네이티브 ACID | 내장 분산 처리 |
| Apache Kafka | 트랜잭셔널 프로듀서 | Flink만 (Spark 미지원) |
주의점
커밋 오버헤드 — JSON, CSV 같은 raw 포맷은 파일 쓰기 즉시 소비자에게 보이지만, Delta Lake은 커밋 로그 생성까지 기다림. 가장 느린 태스크가 완료될 때까지 전체 데이터가 보이지 않음.
분산 프레임워크의 제한적 지원 — Kafka 트랜잭셔널 프로듀서는 Flink에서만 지원되고, Spark에서는 사용불가능.
멱등성 범위는 현재 트랜잭션 — 백필로 파이프라인을 재실행하면 새 트랜잭션이 열리고 같은 데이터가 다시 삽입됩니다. 트랜잭션 자체는 “현재 실행”의 원자성만 보장하며, 재실행 간 멱등성은 별도로 처리
Dirty Read 주의 — 소비자가 READ UNCOMMITTED 격리 수준을 사용하면 커밋되지 않은 데이터를 읽을 수 있음
Concept
- Transactional Writer : 트랜잭션의 all-or-nothing 시맨틱스로 불완전한 데이터 노출을 방지하는 패턴. BEGIN → WRITE → COMMIT/ROLLBACK
- 로컬 트랜잭션 vs 잡 레벨 트랜잭션 : 태스크 단위 트랜잭션은 잡 재시도 시 중복 가능. 잡 전체 트랜잭션은 더 강한 보장이지만 구현이 어려움
- Commit Log (Delta Lake) : 데이터 파일과 별도로 존재하는 메타데이터 파일. 이것이 생성되어야 데이터가 소비자에게 공개됨
- Dirty Read : READ UNCOMMITTED 격리 수준에서 커밋되지 않은 트랜잭션의 데이터를 읽는 현상
- 트랜잭션 타임아웃 : Kafka 트랜잭션에서 체크포인트 완료 전 타임아웃이 발생하면 커밋 불가. 체크포인트 간격보다 충분히 길게 설정해야 함