1128 words
6 minutes
[DE Design Pattern]02-3. Change Data Capture

03. Change Data Capture#

  • 데이터베이스의 내부 커밋 로그(WAL)를 직접 읽어서 모든 변경사항을 실시간으로 캡처하는 패턴.
구분Incremental LoaderCDC
Latency분~시간 (배치 스케줄링 오버헤드)초 단위 (커밋 로그 스트리밍)
Hard Delete감지 불가 (Soft Delete 필요)커밋 로그에 DELETE 연산 기록됨
데이터 접근 방식고수준 SQL 쿼리저수준 커밋 로그 직접 읽기

동작 원리#

  • 데이터베이스는 모든 쓰기 연산(INSERT, UPDATE, DELETE)을 커밋 로그(append-only 구조) 에 먼저 기록
  • CDC client에서 이 로그를 스트리밍으로 읽어서 변경분을 외부 시스템으로 전송
[PostgreSQL WAL]
LSN 100: INSERT INTO visits (id=1, user='A')
LSN 101: UPDATE visits SET user='B' WHERE id=1
LSN 102: DELETE FROM visits WHERE id=1
↓ CDC Consumer (Debezium)
[Kafka Topic: dedp.schema.visits]
{op: "c", id: 1, after: {user: "A"}} ← create
{op: "u", id: 1, before: {user: "A"}, after: {user: "B"}} ← update
{op: "d", id: 1, before: {user: "B"}} ← delete

Incremental Loader는 단순히 행의 최신 상태만 가져오지만, CDC는 변경의 전체 이력을 제공함

Debezium 설정#

Kafka Connect 기반 declarative config

import requests
debezium_config = {
"name": "visits-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"database.server.name": "dedp",
"schema.include.list": "dedp_schema",
"topic.prefix": "dedp",
# 아래 설정이 PostgreSQL WAL 연동의 핵심
"plugin.name": "pgoutput", # logical replication 플러그인
}
}
# Kafka Connect REST API로 커넥터 등록
response = requests.post(
"http://kafka-connect:8083/connectors",
json=debezium_config,
headers={"Content-Type": "application/json"}
)
# 결과: dedp_schema.visits 테이블 → dedp.dedp_schema.visits 토픽으로 CDC 스트리밍

Data at Rest → Data in Motion#

  • 정적 데이터(data at rest)가 동적 데이터(data in motion)로 전환됨
  • 정적 table join과 동적 streaming join사이의 차이 발생
from pyspark.sql import functions as F
orders_stream = spark.readStream.format("kafka").load()
users_stream = spark.readStream.format("kafka").load()
# 스트리밍 JOIN에서는 watermark로 "얼마나 기다릴지" 정의해야 함
orders_with_watermark = orders_stream.withWatermark("event_time", "10 minutes")
users_with_watermark = users_stream.withWatermark("event_time", "10 minutes")
joined = orders_with_watermark.join(
users_with_watermark,
on="user_id",
how="inner" # 한쪽 스트림이 늦으면 매칭이 지연될 수 있음
)

Delta Lake Change Data Feed (CDF)#

외부 CDC 도구 없이, Delta Lake 자체적으로 변경 피드를 제공

from pyspark.sql import SparkSession
from delta.tables import DeltaTable
spark = (SparkSession.builder
.config("spark.databricks.delta.properties.defaults.enableChangeDataFeed", "true")
.getOrCreate()
)
# 방법 1: 글로벌 설정 (위의 config)
# 방법 2: 테이블 레벨 설정
spark.sql("""
CREATE TABLE events (
visit_id STRING, event_time TIMESTAMP, user_id STRING
)
TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# CDF 스트리밍 읽기
cdf_stream = (
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0) # 어떤 버전부터 읽을지
.option("maxFilesPerTrigger", 4) # 처리량 제어
.table("events")
)
# CDF 출력에는 추가 메타 컬럼이 포함됨:
# _change_type: insert | update_preimage | update_postimage | delete
# _commit_version: 변경이 발생한 Delta 커밋 버전
# _commit_timestamp: 변경 시간
query = cdf_stream.writeStream.format("console").start()
| visit_id | _change_type | _commit_version |
|----------|-------------------|-----------------|
| v001 | update_preimage | 3 | ← 변경 전
| v001 | update_postimage | 3 | ← 변경 후

Debezium CDC와의 차이는, CDF는 같은 Delta Lake 생태계 안에서 작동하므로 별도 인프라(Kafka Connect 등)가 불필요하다는 점입니다. 다만 소스가 Delta Lake 테이블이어야 한다는 제약이 있습니다.

CDC의 데이터 스코프 주의점#

  • 초기 스냅샷은 Full Load나 Incremental Load로 가져온 뒤 이후 변경분을 CDC로 처리하는 하이브리드 접근이 필요
  • 이는 cdc가 시작 시점이후의 변경분만 처리하기 때문
[시간축]
────────────────────────────────────────────
기존 데이터 CDC 시작 시점 이후
←── Full Load ──→ ←── CDC 스트리밍 ──→

Concept

  • Change Data Capture (CDC) : 데이터베이스 커밋 로그를 직접 읽어 모든 변경(INSERT/UPDATE/DELETE)을 실시간 캡처하는 패턴
  • Commit Log (WAL) : 데이터베이스가 모든 쓰기 연산을 순차적으로 기록하는 append-only 구조. CDC의 데이터 소스
  • Debezium : Kafka Connect 기반의 오픈소스 CDC 프레임워크. 다양한 RDBMS/NoSQL 지원
  • Kafka Connect : 외부 시스템과 Kafka 간 데이터를 연결하는 프레임워크. Debezium의 실행 환경
  • CDC Payload : 연산 유형(op), 변경 전(before), 변경 후(after) 값을 포함하는 CDC 메시지 구조
  • Data at Rest vs Data in Motion : 정적 저장 데이터 vs 스트리밍 동적 데이터. CDC를 통해 전자가 후자로 전환되며, JOIN 등의 연산 의미론이 달라짐
  • Delta Lake CDF (Change Data Feed) : Delta Lake 내장 CDC 기능. _change_type 컬럼으로 insert/update_preimage/update_postimage/delete 구분
  • update_preimage / update_postimage : CDF에서 UPDATE 발생 시 변경 전/후 상태를 각각 별도 행으로 제공
  • Logical Replication (pgoutput) : PostgreSQL의 논리 복제 플러그인. Debezium이 WAL을 읽기 위해 필요
  • maxFilesPerTrigger / maxBytesPerTrigger : CDF 스트리밍 읽기의 처리량을 제어하는 옵션

[DE Design Pattern]02-3. Change Data Capture
https://yjinheon.netlify.app/posts/02de/de-design-pattern/02-data-ingestion/02-di-03-cdc/
Author
Datamind
Published at
2025-01-31
License
CC BY-NC-SA 4.0