1128 words
6 minutes
[DE Design Pattern]02-3. Change Data Capture
03. Change Data Capture
- 데이터베이스의 내부 커밋 로그(WAL)를 직접 읽어서 모든 변경사항을 실시간으로 캡처하는 패턴.
| 구분 | Incremental Loader | CDC |
|---|---|---|
| Latency | 분~시간 (배치 스케줄링 오버헤드) | 초 단위 (커밋 로그 스트리밍) |
| Hard Delete | 감지 불가 (Soft Delete 필요) | 커밋 로그에 DELETE 연산 기록됨 |
| 데이터 접근 방식 | 고수준 SQL 쿼리 | 저수준 커밋 로그 직접 읽기 |
동작 원리
- 데이터베이스는 모든 쓰기 연산(INSERT, UPDATE, DELETE)을 커밋 로그(append-only 구조) 에 먼저 기록
- CDC client에서 이 로그를 스트리밍으로 읽어서 변경분을 외부 시스템으로 전송
[PostgreSQL WAL]LSN 100: INSERT INTO visits (id=1, user='A')LSN 101: UPDATE visits SET user='B' WHERE id=1LSN 102: DELETE FROM visits WHERE id=1
↓ CDC Consumer (Debezium)
[Kafka Topic: dedp.schema.visits]{op: "c", id: 1, after: {user: "A"}} ← create{op: "u", id: 1, before: {user: "A"}, after: {user: "B"}} ← update{op: "d", id: 1, before: {user: "B"}} ← deleteIncremental Loader는 단순히 행의 최신 상태만 가져오지만, CDC는 변경의 전체 이력을 제공함
Debezium 설정
Kafka Connect 기반 declarative config
import requests
debezium_config = { "name": "visits-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "postgres", "database.server.name": "dedp", "schema.include.list": "dedp_schema", "topic.prefix": "dedp", # 아래 설정이 PostgreSQL WAL 연동의 핵심 "plugin.name": "pgoutput", # logical replication 플러그인 }}
# Kafka Connect REST API로 커넥터 등록response = requests.post( "http://kafka-connect:8083/connectors", json=debezium_config, headers={"Content-Type": "application/json"})# 결과: dedp_schema.visits 테이블 → dedp.dedp_schema.visits 토픽으로 CDC 스트리밍Data at Rest → Data in Motion
- 정적 데이터(data at rest)가 동적 데이터(data in motion)로 전환됨
- 정적 table join과 동적 streaming join사이의 차이 발생
from pyspark.sql import functions as F
orders_stream = spark.readStream.format("kafka").load()users_stream = spark.readStream.format("kafka").load()
# 스트리밍 JOIN에서는 watermark로 "얼마나 기다릴지" 정의해야 함orders_with_watermark = orders_stream.withWatermark("event_time", "10 minutes")users_with_watermark = users_stream.withWatermark("event_time", "10 minutes")
joined = orders_with_watermark.join( users_with_watermark, on="user_id", how="inner" # 한쪽 스트림이 늦으면 매칭이 지연될 수 있음)Delta Lake Change Data Feed (CDF)
외부 CDC 도구 없이, Delta Lake 자체적으로 변경 피드를 제공
from pyspark.sql import SparkSessionfrom delta.tables import DeltaTable
spark = (SparkSession.builder .config("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true") .getOrCreate())
# 방법 1: 글로벌 설정 (위의 config)# 방법 2: 테이블 레벨 설정spark.sql(""" CREATE TABLE events ( visit_id STRING, event_time TIMESTAMP, user_id STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true)""")
# CDF 스트리밍 읽기cdf_stream = ( spark.readStream.format("delta") .option("readChangeFeed", "true") .option("startingVersion", 0) # 어떤 버전부터 읽을지 .option("maxFilesPerTrigger", 4) # 처리량 제어 .table("events"))
# CDF 출력에는 추가 메타 컬럼이 포함됨:# _change_type: insert | update_preimage | update_postimage | delete# _commit_version: 변경이 발생한 Delta 커밋 버전# _commit_timestamp: 변경 시간
query = cdf_stream.writeStream.format("console").start()| visit_id | _change_type | _commit_version ||----------|-------------------|-----------------|| v001 | update_preimage | 3 | ← 변경 전| v001 | update_postimage | 3 | ← 변경 후Debezium CDC와의 차이는, CDF는 같은 Delta Lake 생태계 안에서 작동하므로 별도 인프라(Kafka Connect 등)가 불필요하다는 점입니다. 다만 소스가 Delta Lake 테이블이어야 한다는 제약이 있습니다.
CDC의 데이터 스코프 주의점
- 초기 스냅샷은 Full Load나 Incremental Load로 가져온 뒤 이후 변경분을 CDC로 처리하는 하이브리드 접근이 필요
- 이는 cdc가 시작 시점이후의 변경분만 처리하기 때문
[시간축]──────────────────────────────────────────── 기존 데이터 CDC 시작 시점 이후 ←── Full Load ──→ ←── CDC 스트리밍 ──→Concept
- Change Data Capture (CDC) : 데이터베이스 커밋 로그를 직접 읽어 모든 변경(INSERT/UPDATE/DELETE)을 실시간 캡처하는 패턴
- Commit Log (WAL) : 데이터베이스가 모든 쓰기 연산을 순차적으로 기록하는 append-only 구조. CDC의 데이터 소스
- Debezium : Kafka Connect 기반의 오픈소스 CDC 프레임워크. 다양한 RDBMS/NoSQL 지원
- Kafka Connect : 외부 시스템과 Kafka 간 데이터를 연결하는 프레임워크. Debezium의 실행 환경
- CDC Payload : 연산 유형(op), 변경 전(before), 변경 후(after) 값을 포함하는 CDC 메시지 구조
- Data at Rest vs Data in Motion : 정적 저장 데이터 vs 스트리밍 동적 데이터. CDC를 통해 전자가 후자로 전환되며, JOIN 등의 연산 의미론이 달라짐
- Delta Lake CDF (Change Data Feed) : Delta Lake 내장 CDC 기능.
_change_type컬럼으로 insert/update_preimage/update_postimage/delete 구분 - update_preimage / update_postimage : CDF에서 UPDATE 발생 시 변경 전/후 상태를 각각 별도 행으로 제공
- Logical Replication (pgoutput) : PostgreSQL의 논리 복제 플러그인. Debezium이 WAL을 읽기 위해 필요
- maxFilesPerTrigger / maxBytesPerTrigger : CDF 스트리밍 읽기의 처리량을 제어하는 옵션
[DE Design Pattern]02-3. Change Data Capture
https://yjinheon.netlify.app/posts/02de/de-design-pattern/02-data-ingestion/02-di-03-cdc/