1367 words
7 minutes
[DE Design Pattern]02-4. Data Replication
04. Data Replication
데이터를 한 위치에서 다른 위치로 복제하는 패턴
| 구분 | Data Loading | Data Replication |
|---|---|---|
| 환경 | 이기종 스토어 간 이동 가능 | 동종 스토어 간 복제가 기본 |
| 메타데이터 | 보존 불필요 | PK, 이벤트 순서, 헤더 등 보존이 원칙 |
| 목적 | 분석/처리를 위한 데이터 수집 | 동일 데이터의 환경 간 동기화 |
Passthrough Replicator
- 기본적으로 데이터를 있는 그대로 복제하는 것
- 보통 production 데이터를 development/staging 환경으로 복제시 사용
from pyspark.sql import SparkSession
def passthrough_replicate_files(spark: SparkSession, source: str, dest: str): """ Passthrough Replicator — 가장 단순한 텍스트 API 사용. JSON I/O API가 아닌 text API로 행을 그대로 복사. """ # text API → 파싱 없이 라인 그대로 복사 input_dataset = spark.read.text(source) input_dataset.write.mode("overwrite").text(dest)Push vs Pull 전략
복제 방향관련 tradeoff
[Pull 방식] — 위험staging 환경이 production DB에 접근하여 데이터를 가져옴→ staging의 버그가 production에 영향 줄 수 있음.ex) IP 고갈
[Push 방식] — 권장production 환경이 직접 staging으로 데이터를 복사→ production이 프로세스를 통제 (빈도, 처리량 제어)# Push 방식 구현 개념def push_replicate(spark: SparkSession, prod_path: str, target_envs: list[str]): """ Push 방식: production이 다른 환경으로 데이터를 밀어넣음. production이 빈도와 처리량을 통제. """ source_data = spark.read.text(prod_path)
for env_path in target_envs: source_data.write.mode("overwrite").text(env_path)
# push_replicate(spark, "s3://prod/devices", ["s3://staging/devices", "s3://dev/devices"])메타데이터 보존
def replicate_kafka_with_ordering(events_stream, output_topic: str, checkpoint: str): from pyspark.sql import DataFrame
events_to_replicate = events_stream.selectExpr( "key", "value", "partition", "headers", "offset" )
def write_sorted_events(events: DataFrame, batch_id: int): (events .sortWithinPartitions("offset", ascending=True) # 순서보존핵심 .drop("offset") .write.format("kafka") .option("kafka.bootstrap.servers", "broker:9092") .option("topic", output_topic) .option("includeHeaders", "true") # 헤더 .save())
(events_to_replicate.writeStream .option("checkpointLocation", checkpoint) .foreachBatch(write_sorted_events) .start())sortWithinPartitions가 없으면 분산 처리 과정에서 이벤트 순서가 뒤바뀔 수 있고,includeHeaders가 없으면 메타데이터가 유실됨.- Delta Lake의 Parquet 파일만 복제하고
_delta_log/를 빠뜨리는 것도 같은 종류의 실수
인프라 기반 복제
클라우드 제공자의 복제는 eventual consistency이므로, 시간 민감한 시나리오에는 부적합할 수 있음
s3_replication_config = { "role": "arn:aws:iam::role/replication", "source_bucket": "devices-production", "rules": [{ "id": "devices", "status": "Enabled", "destination": { "bucket": "devices-staging", "storage_class": "STANDARD" } }]}# 실제로는 Terraform HCL이나 AWS CLI로 설정Transformation Replicator
- Production 데이터로 테스트해야 하지만, PII(개인식별정보)가 포함되어 있어 staging/dev 환경으로 그대로 옮길 수 없는 상황
- 합성 데이터 생성기로는 프로듀서의 실제 데이터 품질 이슈를 재현할 수 없으므로, 실 데이터 기반 변환 복제가 필요
방법 1: 컬럼 제거 (Drop)
불필요한 민감 컬럼을 삭제
def replicate_with_column_drop(spark: SparkSession, source: str, dest: str): dataset = spark.read.format("delta").load(source)
safe_dataset = dataset.drop("ip", "latitude", "longitude") safe_dataset.write.mode("overwrite").format("delta").save(dest)방법 2: 컬럼 변환 (Anonymize)
컬럼을 제거할 수 없고 구조는 유지해야 하지만, 값을 난독화해야 할 때 사용
from pyspark.sql import functions as F
def replicate_with_anonymization(spark: SparkSession, source: str, dest: str): """PII 컬럼을 변환(난독화)하고 복제""" dataset = spark.read.format("delta").load(source)
anonymized = dataset.withColumn( "full_name", F.expr("SUBSTRING(full_name, 2, LENGTH(full_name))") # 첫 글자 제거 )
anonymized.write.mode("overwrite").format("delta").save(dest)방법 3: 접근 제어 (GRANT)
grant_sql = """ GRANT SELECT (visit_id, event_time, user_id) ON TABLE visits TO user_a"""# user_a는 ip, latitude, longitude에 접근 불가# 복제 잡이 user_a 권한으로 실행되면 자동으로 PII 제외Transformation Replicator 주의점
- JSON/CSV를 파싱할 때 datetime 포맷이 프레임워크 기본값과 다르면 null로 변환될 수 있음
- timestamp 컬럼을 그냥 string으로 처리하는 것이 보다 안전
# 위험: timestamp 파싱 시 포맷 불일치로 null 변환schema_risky = "id STRING, event_time TIMESTAMP, name STRING"
# 안전: timestamp을 string으로 유지schema_safe = "id STRING, event_time STRING, name STRING"- 비동기화(Desynchronization) — PII 필드는 시간이 지나면 변함
- 데이터 카탈로그나 데이터 계약(Data Contract) 에서 민감 필드를 태깅하고 변환 로직을 자동화
[수동 관리] — 위험변환 로직에 ip, latitude, longitude 하드코딩→ 새 PII 컬럼(phone_number) 추가 시 누락 가능
[자동화] — 권장Data Catalog에서 PII 태그된 컬럼 목록을 동적으로 조회→ 변환 로직이 항상 최신 PII 정의를 반영Concept
- Data Replication : 동종 스토어 간 데이터를 메타데이터 포함하여 복제하는 패턴. Loading보다 원본 보존에 엄격
- Passthrough Replicator : 변환 없이 데이터를 있는 그대로 복제. 가장 단순한 API(text API 등)를 사용하는 것이 핵심
- Transformation Replicator : PII 제거/변환을 포함하는 복제. Drop, Anonymize, GRANT 세 가지 구현 방식
- Push vs Pull 복제 : Push(소유자가 밀어넣기)가 Pull(소비자가 가져가기)보다 안전. 소유자가 빈도와 처리량을 통제
- Keep It Simple 원칙 : 복제 시 가능한 가장 단순한 API 사용. JSON API 대신 text API로 의도치 않은 타입 변환 방지
- 메타데이터 보존 : Kafka 헤더/파티션 순서, Delta Lake의
_delta_log/등 데이터 값 외의 구조적 정보도 함께 복제 - 컬럼 레벨 접근 제어 (GRANT) : 데이터를 변환하지 않고 사용자 권한으로 PII 접근을 차단하는 방식
- 텍스트 포맷 변환 위험 : JSON/CSV 파싱 시 datetime 등의 포맷 불일치로 인한 silent null 변환 문제
- Desynchronization : PII 필드 정의가 시간에 따라 변하면서 변환 로직과 불일치가 발생하는 문제
- Data Contract / Data Catalog : 민감 필드를 태깅하여 변환 로직을 자동화하는 거버넌스 도구
[DE Design Pattern]02-4. Data Replication
https://yjinheon.netlify.app/posts/02de/de-design-pattern/02-data-ingestion/02-di-04-data-replication/