1367 words
7 minutes
[DE Design Pattern]02-4. Data Replication

04. Data Replication#

데이터를 한 위치에서 다른 위치로 복제하는 패턴

구분Data LoadingData Replication
환경이기종 스토어 간 이동 가능동종 스토어 간 복제가 기본
메타데이터보존 불필요PK, 이벤트 순서, 헤더 등 보존이 원칙
목적분석/처리를 위한 데이터 수집동일 데이터의 환경 간 동기화

Passthrough Replicator#

  • 기본적으로 데이터를 있는 그대로 복제하는 것
  • 보통 production 데이터를 development/staging 환경으로 복제시 사용
from pyspark.sql import SparkSession
def passthrough_replicate_files(spark: SparkSession, source: str, dest: str):
"""
Passthrough Replicator — 가장 단순한 텍스트 API 사용.
JSON I/O API가 아닌 text API로 행을 그대로 복사.
"""
# text API → 파싱 없이 라인 그대로 복사
input_dataset = spark.read.text(source)
input_dataset.write.mode("overwrite").text(dest)

Push vs Pull 전략#

복제 방향관련 tradeoff

[Pull 방식] — 위험
staging 환경이 production DB에 접근하여 데이터를 가져옴
→ staging의 버그가 production에 영향 줄 수 있음.ex) IP 고갈
[Push 방식] — 권장
production 환경이 직접 staging으로 데이터를 복사
→ production이 프로세스를 통제 (빈도, 처리량 제어)
# Push 방식 구현 개념
def push_replicate(spark: SparkSession, prod_path: str, target_envs: list[str]):
"""
Push 방식: production이 다른 환경으로 데이터를 밀어넣음.
production이 빈도와 처리량을 통제.
"""
source_data = spark.read.text(prod_path)
for env_path in target_envs:
source_data.write.mode("overwrite").text(env_path)
# push_replicate(spark, "s3://prod/devices", ["s3://staging/devices", "s3://dev/devices"])

메타데이터 보존#

def replicate_kafka_with_ordering(events_stream, output_topic: str, checkpoint: str):
from pyspark.sql import DataFrame
events_to_replicate = events_stream.selectExpr(
"key", "value", "partition", "headers", "offset"
)
def write_sorted_events(events: DataFrame, batch_id: int):
(events
.sortWithinPartitions("offset", ascending=True) # 순서보존핵심
.drop("offset")
.write.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("topic", output_topic)
.option("includeHeaders", "true") # 헤더
.save())
(events_to_replicate.writeStream
.option("checkpointLocation", checkpoint)
.foreachBatch(write_sorted_events)
.start())
  • sortWithinPartitions가 없으면 분산 처리 과정에서 이벤트 순서가 뒤바뀔 수 있고, includeHeaders가 없으면 메타데이터가 유실됨.
  • Delta Lake의 Parquet 파일만 복제하고 _delta_log/를 빠뜨리는 것도 같은 종류의 실수

인프라 기반 복제#

클라우드 제공자의 복제는 eventual consistency이므로, 시간 민감한 시나리오에는 부적합할 수 있음

s3_replication_config = {
"role": "arn:aws:iam::role/replication",
"source_bucket": "devices-production",
"rules": [{
"id": "devices",
"status": "Enabled",
"destination": {
"bucket": "devices-staging",
"storage_class": "STANDARD"
}
}]
}
# 실제로는 Terraform HCL이나 AWS CLI로 설정

Transformation Replicator#

  • Production 데이터로 테스트해야 하지만, PII(개인식별정보)가 포함되어 있어 staging/dev 환경으로 그대로 옮길 수 없는 상황
  • 합성 데이터 생성기로는 프로듀서의 실제 데이터 품질 이슈를 재현할 수 없으므로, 실 데이터 기반 변환 복제가 필요

방법 1: 컬럼 제거 (Drop)

불필요한 민감 컬럼을 삭제

def replicate_with_column_drop(spark: SparkSession, source: str, dest: str):
dataset = spark.read.format("delta").load(source)
safe_dataset = dataset.drop("ip", "latitude", "longitude")
safe_dataset.write.mode("overwrite").format("delta").save(dest)

방법 2: 컬럼 변환 (Anonymize)

컬럼을 제거할 수 없고 구조는 유지해야 하지만, 값을 난독화해야 할 때 사용

from pyspark.sql import functions as F
def replicate_with_anonymization(spark: SparkSession, source: str, dest: str):
"""PII 컬럼을 변환(난독화)하고 복제"""
dataset = spark.read.format("delta").load(source)
anonymized = dataset.withColumn(
"full_name",
F.expr("SUBSTRING(full_name, 2, LENGTH(full_name))") # 첫 글자 제거
)
anonymized.write.mode("overwrite").format("delta").save(dest)

방법 3: 접근 제어 (GRANT)

grant_sql = """
GRANT SELECT (visit_id, event_time, user_id)
ON TABLE visits
TO user_a
"""
# user_a는 ip, latitude, longitude에 접근 불가
# 복제 잡이 user_a 권한으로 실행되면 자동으로 PII 제외

Transformation Replicator 주의점#

  • JSON/CSV를 파싱할 때 datetime 포맷이 프레임워크 기본값과 다르면 null로 변환될 수 있음
  • timestamp 컬럼을 그냥 string으로 처리하는 것이 보다 안전
# 위험: timestamp 파싱 시 포맷 불일치로 null 변환
schema_risky = "id STRING, event_time TIMESTAMP, name STRING"
# 안전: timestamp을 string으로 유지
schema_safe = "id STRING, event_time STRING, name STRING"
  • 비동기화(Desynchronization) — PII 필드는 시간이 지나면 변함
  • 데이터 카탈로그나 데이터 계약(Data Contract) 에서 민감 필드를 태깅하고 변환 로직을 자동화
[수동 관리] — 위험
변환 로직에 ip, latitude, longitude 하드코딩
→ 새 PII 컬럼(phone_number) 추가 시 누락 가능
[자동화] — 권장
Data Catalog에서 PII 태그된 컬럼 목록을 동적으로 조회
→ 변환 로직이 항상 최신 PII 정의를 반영

Concept

  • Data Replication : 동종 스토어 간 데이터를 메타데이터 포함하여 복제하는 패턴. Loading보다 원본 보존에 엄격
  • Passthrough Replicator : 변환 없이 데이터를 있는 그대로 복제. 가장 단순한 API(text API 등)를 사용하는 것이 핵심
  • Transformation Replicator : PII 제거/변환을 포함하는 복제. Drop, Anonymize, GRANT 세 가지 구현 방식
  • Push vs Pull 복제 : Push(소유자가 밀어넣기)가 Pull(소비자가 가져가기)보다 안전. 소유자가 빈도와 처리량을 통제
  • Keep It Simple 원칙 : 복제 시 가능한 가장 단순한 API 사용. JSON API 대신 text API로 의도치 않은 타입 변환 방지
  • 메타데이터 보존 : Kafka 헤더/파티션 순서, Delta Lake의 _delta_log/ 등 데이터 값 외의 구조적 정보도 함께 복제
  • 컬럼 레벨 접근 제어 (GRANT) : 데이터를 변환하지 않고 사용자 권한으로 PII 접근을 차단하는 방식
  • 텍스트 포맷 변환 위험 : JSON/CSV 파싱 시 datetime 등의 포맷 불일치로 인한 silent null 변환 문제
  • Desynchronization : PII 필드 정의가 시간에 따라 변하면서 변환 로직과 불일치가 발생하는 문제
  • Data Contract / Data Catalog : 민감 필드를 태깅하여 변환 로직을 자동화하는 거버넌스 도구

[DE Design Pattern]02-4. Data Replication
https://yjinheon.netlify.app/posts/02de/de-design-pattern/02-data-ingestion/02-di-04-data-replication/
Author
Datamind
Published at
2025-01-31
License
CC BY-NC-SA 4.0