2510 words
13 minutes
[Design Pattern] 02. Full Load Pattern

01. Full Load#

개념#

Full Load Pattern은 매번 소스시스템의 전체 데이터를 읽어서 타깃에 적재하는 패턴.

  • 데이터 볼륨이 작을때
  • 소스에서 변경 추적(Delta Column)을 알 수 없을 때
  • 데이터 정합성이 매우 중요한 시나리오

사용 시나리오#

  • 초기 데이터 마이그레이션
  • DW 구축 시 초기 데이터 적재
  • DQ 문제 발생 시 복구
  • 데이터 모델 변경 시 재구성 (스키마 변경)
  • 소스 데이터 크기가 작고 빈번한 변경이 없는 경우 ex) 코드 테이블)

Concept

  • Full Loader Pattern : 소스 시스템의 전체 데이터를 읽어서 타겟에 적재하는 패턴.
  • Truncate and Load : 기존 데이터를 단순 Truncate하고 새 데이터를 적재하는 가장 단순한 Full Load 구성. 적재 중 데이터 공백 발생 가능
  • Swap Table Pattern : 임시테이블에 데이터 적재 후 원자적으로 테이블 교체 하는 안전한 방식. 특성상 스토리지가 2배 필요
  • Partition Swap : 날짜별 Partition으로 데이터를 관리하여 대용량 데이터 Full Load를 효율화하는 패턴 오래된 파티션만 삭제해 Storage관리
  • Change Rate(변경비율) : 전체 데이터 중 실제로 변경되는 비율. 변경 비율이 낮을수록 Full Load는 비효율적이고 incremental load가 유리
  • Snapshot : 특정 시점의 전체 데이터 상태. Full Load는 매번 새로운 Snapshot을 생성하지만 변경 이력은 추적 불가능
  • History Tracking Problem: Full Load의 주요 한계점. 어떤 데이터가 언제 어떻게 변경되었는지 알 수 없어 감사(Audit) 및 분석에 제약.

02. Isolation Level#

Isolation Level#

1) Read Uncommitted

import psycopg2
from datetime import datetime
def demo_read_uncommitted():
"""
read uncommitted 컨셉 구현
보통 DB 레벨에서 이런 트랜잭션은 차단됨
"""
# Connection 1: Writer (Full Load 실행 중)
conn1 = psycopg2.connect("dbname=demo user=etl")
cursor1 = conn1.cursor()
print("=" * 60)
print("Read Uncommitted Demo")
print("=" * 60)
# Writer: 트랜잭션 시작 커밋안함
cursor1.execute("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED")
cursor1.execute("BEGIN")
cursor1.execute("DELETE FROM products")
cursor1.execute("INSERT INTO products VALUES (1, 'New Product', 1000)")
print("\n[Writer] 새 데이터 삽입 (아직 커밋 안함)")
# Reader: 동시에 데이터 읽기
conn2 = psycopg2.connect("dbname=warehouse user=analyst")
cursor2 = conn2.cursor()
cursor2.execute("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED")
cursor2.execute("SELECT COUNT(*) FROM products")
count = cursor2.fetchone()[0]
print(f"[Reader] 현재 조회되는 레코드 수: {count}건")
print("아직 커밋 안된 데이터가 보임! (Dirty Read)")
# Writer: 롤백
cursor1.execute("ROLLBACK")
print("\n[Writer] 트랜잭션 롤백!")
# Reader: 다시 읽기
cursor2.execute("SELECT COUNT(*) FROM products")
count = cursor2.fetchone()[0]
print(f"[Reader] 롤백 후 레코드 수: {count}건")
conn1.close()
conn2.close()

2) Read Committed

  • 기본적으로 커밋된 데이터만 읽음
  • Dirty Read 방지됨
  • Full Load 중 소스 데이터가 계속 변경될 경우 일관성 없는 스냅샷을 읽을 수 있음
def demo_read_committed():
"""
Dirty Read 방지
"""
print("=" * 60)
print("Read Committed demo")
print("=" * 60)
conn1 = psycopg2.connect("dbname=warehouse user=etl")
cursor1 = conn1.cursor()
conn2 = psycopg2.connect("dbname=warehouse user=analyst")
cursor2 = cursor2.cursor()
# init
cursor1.execute("DELETE FROM products")
cursor1.execute("INSERT INTO products VALUES (1, 'Product A', 1000)")
conn1.commit()
print("[초기] Product A가 있음")
# Reader: 첫 번째 읽기
cursor2.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
cursor2.execute("BEGIN")
cursor2.execute("SELECT price FROM products WHERE id = 1")
price1 = cursor2.fetchone()[0]
print(f"\n[Reader] 첫 번째 읽기: 가격 = {price1}원")
# Writer: 가격 변경하고 커밋
cursor1.execute("UPDATE products SET price = 2000 WHERE id = 1")
conn1.commit()
print("[Writer] 가격을 2000원으로 변경하고 커밋")
# Reader: 같은 트랜잭션 내에서 다시 읽기
cursor2.execute("SELECT price FROM products WHERE id = 1")
price2 = cursor2.fetchone()[0]
print(f"[Reader] 두 번째 읽기: 가격 = {price2}원")
print(f" 같은 트랜잭션인데 값이 {price1}원 → {price2}원으로 변경됨!")
print(" (Non-Repeatable Read)")
cursor2.execute("COMMIT")
conn1.close()
conn2.close()
demo_read_committed()

3) Repeatable Read

Repeatable Read는 트랜잭션 시작 시점의 일관된 스냅샷을 읽는 격리 수준이다. mysql, postgresql 기본값

from pyspark.sql import SparkSession
def full_load_from_postgres_with_isolation():
"""
PySpark로 PostgreSQL에서 Full Load 시 Isolation Level 설정
"""
spark = SparkSession.builder \
.appName("FullLoadWithIsolation") \
.config("spark.jars", "/path/to/postgresql-jdbc.jar") \
.getOrCreate()
# Repeatable Read로 설정하여 일관된 스냅샷 읽기
jdbc_url = "jdbc:postgresql://source-db:5432/production"
# 옵션 1: JDBC 연결 레벨에서 설정
df = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", "products") \
.option("user", "etl") \
.option("password", "password") \
.option("isolationLevel", "REPEATABLE_READ") \
.load()
print(f"{df.count():,}건 읽기 완료 (Repeatable Read)")
# 쿼리로 명시적 설정
custom_query = """
(
SELECT set_config('transaction_isolation', 'repeatable read', true);
SELECT * FROM products
) as subq
"""
df2 = spark.read \
.format("jdbc") \
.option("url", jdbc_url) \
.option("dbtable", custom_query) \
.option("user", "etl_user") \
.option("password", "password") \
.load()
df.write \
.mode("overwrite") \
.parquet("s3://datalake/products/")
spark.stop()

4) Serializable

Serializable은 모든 트랜잭션이 순차적으로(직렬적으로) 실행된 것처럼 보이게 만드는 격리 수준이다. 여러 트랜잭션이 동시에 실행되더라도 결과는 마치 하나씩 차례로 실행된 것과 동일하게 보여야 한다.

트랜잭션 격리 수준 설정하기

SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET TRANSACTION ISOLATION LEVEL REPEATABLE_READ
SET TRANSACTION ISOLATION LEVEL READ COMMITTED
-- READ UNCOMMITTED는 postges의 경우 DB 수준에서 막기 때문에 READ COMMITTED로 대체됨
SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED

Concept

  • Isolation Level : 동시에 실행되는 트랜잭션들이 서로 얼마나 격리되었는지 정의하는 수준. 높을수록 일관성 증가하지만 동시성 저하. 기본적으로 일관성과 동시성의 트레이드오프. Read Uncommited -> Read Commited -> Repeated Read-> Serializable 순으로 격리 수준이 올라감
  • Read Uncommited : 커밋되지 않는 데이터도 읽을 수 있는 가장 낮은 격리 수준. Dirty Read 발생가능. 보통 DB 수준에서 막음
  • Read Commited : 커밋된 데이터만 읽는 격리 수준 대부분의 DB의 기본 값이지만 None Repeatable Read 발생 가능
  • Repeatable Read : 트랜잭션 시작 시점의 일관된 스냅샷을 읽는 격리 수준. Full Load에 가장 적합함.
  • Serializable : 가장 엄격한 격리 수준. Transaction이 순차적으로 실행하는 것처럼 동작. Shared-Lock, Exclusive-Lock에 더해 Range Lock(검색조건에 해당하는 범위를 Lock) 을 사용하는 Lock 기반 방식과 Snapshot 기반으로 실행 후 커밋 시점에 직렬과 가능성을 검증하는 MVCC기반 방식이 있음
  • MVCC : Multi Version Concurrency Control. 데이터의 여러 버전을 유지하여 Reader랑 Writer가 서로 블록하지 않고 동시 실행가능하게 하는 기법. Repeatable Read의 핵심 메커니즘
  • Anomalies : 기본적으로 트랜잭션 간 동시실행으로 데이터의 일관성이 깨지는 현상을 이상 현상이라 한다
  • Dirty Read : 한 트랜잭션이 아직 커밋되지 않는 다른 트랜잭션의 변경 내용을 읽는 현상
  • Non-Repeatable Read : 한 트랜잭션이 같은 데이터를 두 번 읽을 때 그 사이에 다른 트랜잭션이 값을 변경하여 결과가 달라지는 현상. 기본적으로 긴 트랜잭션 중에 다른 트랜잭션이 데이터를 바꿀 수있는 구조일 경우 발생가능
  • Phantom Read : 트랜잭션 중 같은 조건으로 다시 조회할 때 다른 트랜잭션이 행을 추가/삭제해서 결과집합이 달라지는 것
  • Phantom Read와 Non-Repeatable Read 차이 : Non-Repeatable read는 같은 레코드에 대해 값이 바뀌는 것이고 Phantom Read는 행이 바뀌는 것(추가/삭제)
  • Snapshot Isolation : 트랜잭션이 시작될 때 그 시점의 데이터 스냅샷을 읽고 이후 다른 트랜잭션이 데이터를 바꿔도 자신은 그 스냅샷을 계속읽는 방식
  • Orphan Rows : 참조 무결성이 깨진 데이터. 참조 무결성은 기본 키와 참조 키의 관계가 항상 유지되는 것을 보장하는 것 ex) order_items는 있는데 order가 없는 경우

03. Lock#

현재 Lock 상태 →
요청 Lock ↓ | No Lock | Shared Lock | Exclusive Lock
─────────────────────┼─────────┼─────────────┼───────────────
Shared Lock (읽기) |||
Exclusive Lock (쓰기)|||
1) No Lock: 아무도 안쓰고 있음 → 모두 가능
2) Shared Lock (읽기 중):
- 다른 읽기 가능 ✅ (여러 명이 동시에 읽기 가능)
- 쓰기 불가 ❌ (읽는 중에는 수정 못함)
3) Exclusive Lock (쓰기 중):
- 읽기 불가 ❌ (쓰는 중에는 읽기도 못함)
- 쓰기 불가 ❌ (쓰는 중에는 다른 쓰기도 못함)

Concepts

  • Lock: 여러 트랜잭션이 동시에 같은 데이터를 접근할 때 데이터 무결성을 보장하기 위한 메커니즘. 창고 자물쇠와 유사한 개념.
  • Shared Lock (S-Lock, 공유 잠금): 읽기 작업에 사용되는 Lock. 여러 트랜잭션이 동시에 획득 가능하지만, Exclusive Lock과는 양립 불가.
  • Exclusive Lock (X-Lock, 배타 잠금): 쓰기 작업에 사용되는 Lock. 한 트랜잭션만 획득 가능하며, 다른 모든 Lock과 양립 불가. 읽기도 차단됨.
  • Lock Granularity (Lock 세밀도): Lock의 범위 크기. Database > Table > Page > Row 순으로 세밀해짐. 세밀할수록 동시성 증가하지만 오버헤드도 증가.
  • Table Lock: 테이블 전체에 거는 Lock. Full Load에서 자주 사용되지만 동시성이 낮음. Swap Table Pattern으로 회피 권장.
  • Row Lock: 개별 행에 거는 Lock. OLTP에 적합하며 높은 동시성 제공하지만, 대량 처리 시 오버헤드 큼.
  • Deadlock (교착 상태): 두 개 이상의 트랜잭션이 서로가 보유한 Lock을 기다리며 무한 대기하는 상태. DB가 감지하여 하나를 강제 롤백.
  • Lock Timeout: Lock을 획득하기 위해 대기하는 최대 시간. 설정 시간 초과 시 트랜잭션 실패. 무한 대기 방지용.
  • NOWAIT: Lock을 즉시 획득하지 못하면 대기하지 않고 바로 에러를 발생시키는 옵션. Deadlock 방지에 유용.
  • Lock Ordering: 모든 트랜잭션이 동일한 순서로 Lock을 획득하도록 하는 규칙. Deadlock 방지의 핵심 전략. 보통 알파벳순 사용.
  • Optimistic Concurrency Control: Lock 대신 충돌 발생 시 재시도하는 방식. DuckDB가 사용. 충돌이 적은 환경에서 효율적.
  • Write-Ahead Logging (WAL): 트랜잭션 변경사항을 먼저 로그에 기록하는 방식. MVCC와 함께 동시성 제어에 사용.

References#

  • Data Engineering Design Pattern