1727 words
9 minutes
[DE Design Pattern]05-9. Data Ordering
9. Data Ordering: Bin Pack Orderer & FIFO Orderer
01. Pattern Overview
ORDER BY만으로 충분해 보이지만, 분산 환경에서 순서를 보장하며 전달하는 것은 다른 종류의 문제- 특히 bulk API를 사용하는 데이터 스토어에서 partial commit(부분 커밋)이 발생하면 순서가 깨질 수 있음.
- Partial commit: 분산시스템에서 발생하는 순서역전 시스템의 일종. 3개 레코드(10:00, 10:10, 10:20)를 bulk 전송했는데 10:20만 성공하고 나머지는 실패. 재시도 시 10:00, 10:10이 뒤늦게 기록되어 순서가 역전
02. Pattern: Bin Pack Orderer
문제
- Partial commit 시맨틱을 가진 스토어(Kinesis PutRecords, DynamoDB BatchWriteItem, Elasticsearch bulk)에 순서를 보장하며 bulk 전송해야 함.
- 개별 전송은 순서가 보장되지만 네트워크 비용이 과다.
- 전송순서를 깨지 않으며서 batch효율을 높여야함
binning
-> 순서를 깨지 않는 범위에서 레코드를 batch로 묶어서 처리하기
Step 1: 그룹핑 키 + 시간으로 정렬
정렬 전: 정렬 후:Key=1 Time=10:00 Key=1 Time=09:04Key=2 Time=09:04 Key=1 Time=10:00Key=1 Time=09:04 Key=1 Time=10:04Key=1 Time=10:04 Key=2 Time=09:04Key=3 Time=10:15 Key=2 Time=10:00Key=2 Time=10:00 Key=3 Time=10:15Step 2: 같은 그룹핑 키가 하나의 bin에 최대 1개만 포함되도록 bin 생성
Bin 1: [Key=1 09:04, Key=2 09:04, Key=3 10:15] ← 각 키의 첫 번째Bin 2: [Key=1 10:00, Key=2 10:00] ← 각 키의 두 번째Bin 3: [Key=1 10:04] ← 각 키의 세 번째Step 3: bin을 순서대로 전송. 현재 bin이 완전히 성공할 때까지 다음 bin 전송 안 함
- bin 내에서 같은 키가 1개뿐이므로, partial commit으로 재시도해도 같은 키의 순서가 역전되지 않음.
구현: PySpark + Kinesis
from pyspark.sql import functions as F
# Step 1: 파티션 내 로컬 정렬 (shuffle 없음)sorted_events = events.sortWithinPartitions( [F.col("visit_id"), F.col("event_time")])
# Step 2+3: 파티션별로 bin 생성 후 순차 전송sorted_events.foreachPartition( lambda rows: write_records_to_kinesis("output-stream", rows))# Bin 생성 로직def write_records_to_kinesis(output_stream, visits_rows): producer = boto3.client("kinesis") delivery_bins = [] bin_index = 0 last_visit_id = None
for visit in visits_rows: if visit.visit_id != last_visit_id: # 새 키 → bin index 리셋 last_visit_id = visit.visit_id bin_index = 0
# bin이 부족하면 새로 생성 if len(delivery_bins) <= bin_index: delivery_bins.append([])
delivery_bins[bin_index].append(visit) bin_index += 1
# 순차 전송: bin 1 완료 → bin 2 → bin 3 for bin_records in delivery_bins: send_bulk_with_retry(producer, output_stream, bin_records)0 핵심: visit_id가 바뀔 때마다 bin_index를 0으로 리셋.
- 결과적으로 같은 visit_id의 n번째 레코드는 항상 bin n에 들어갑니다.
트레이드오프
- 재시도 범위: 파이프라인 전체가 실패할 경우 이미 전송된 bin의 레코드가 재전송되어 전체 순서가 깨질 수 있음. bin 내부 순서만 보장
- 복잡도: 단순 sort 대비 커스텀 bin 생성 로직이 필요
03. Pattern: FIFO Orderer
문제
레코드를 가능한 빨리 개별 전달해야 하며, 버퍼링이 허용되지 않음
핵심 아이디어
First-In-First-Out: 가장 오래된 레코드부터 하나씩 전송하고, 각 전송의 acknowledgment를 받은 후 다음 레코드를 전송
구현 1: 개별 전송 (가장 단순)
# Kafka: produce 후 즉시 flush → ack 대기producer.produce(topic, key=key, value=value)producer.flush() # 브로커 응답까지 블로킹안전하지만 매 레코드마다 네트워크 왕복이 발생하여 처리량이 낮음
구현 2: Bulk + 동시성 제한
# Kafka: 버퍼링 허용하되 동시 요청을 1개로 제한producer = Producer({ "max.in.flight.requests.per.connection": 1, "queue.buffering.max.ms": 1000 # 1초간 버퍼링})producer.produce(topic, key=key, value=value)# flush 없음 — 프로듀서가 자동으로 bulk 전송1초간 모은 레코드를 한 번에 보내되, 동시 요청이 1개이므로 순서가 보장됨
구현 3: Idempotent Producer (최적)
# Kafka: 멱등 프로듀서 — 동시 요청 5개까지 순서 보장producer = Producer({ "max.in.flight.requests.per.connection": 5, "enable.idempotence": True, "queue.buffering.max.ms": 2000})producer.produce(topic, key=key, value=value)- Kafka의 idempotent producer는 최대 5개 동시 요청을 보내면서도 브로커 측에서 시퀀스 넘버로 순서를 보장.
- 처리량과 순서 보장을 모두 달성하는 방법입니다.
Kinesis: SequenceNumberForOrdering
Kinesis는 partial commit 시맨틱이므로 bulk API로 순서 보장 불가. 개별 PutRecord + 시퀀스 번호를 사용합니다.
previous_seq = Nonefor record in records_to_deliver: result = client.put_record( StreamName="output-stream", Data=record.data, PartitionKey=record.key, SequenceNumberForOrdering=previous_seq # 이전 시퀀스 참조 ) previous_seq = result["SequenceNumber"]FIFO ≠ Exactly-once
for message in messages: producer.send(message) # 성공 consumer.ack(message) # ← 여기서 실패하면?# 재시작 시 이미 전송된 message를 다시 send → 중복FIFO는 순서만 보장 하기 때문에 멱등성 패턴과 결합해야 Exactly-once가 가능
04. Bin Pack Orderer vs FIFO Orderer 비교
| 구분 | Bin Pack Orderer | FIFO Orderer |
|---|---|---|
| 대상 스토어 | Partial commit (Kinesis, ES) | Full commit (Kafka) 또는 개별 API |
| 처리량 | Bulk 전송으로 높음 | 개별 전송 시 낮음, idempotent producer로 개선 |
| 복잡도 | Bin 생성 로직 필요 | 단순 (produce + flush) |
| 순서 보장 범위 | 단일 실행 내 | 단일 실행 내 |
| 적합 시나리오 | 대량 데이터 + partial commit 스토어 | 저지연 개별 전달 또는 Kafka |
05. 추가적으로 읽어볼만한 주제들
- Kafka Idempotent Producer 내부 동작 : PID, epoch, sequence number로 중복/순서를 관리하는 메커니즘 (KIP-98)
- Total Order vs Partial Order : 전체 순서 보장 vs 키 단위 순서 보장의 차이와 트레이드오프
- Event Sourcing : 이벤트 순서가 핵심인 아키텍처 패턴
Concept
- Bin Pack Orderer : partial commit 환경에서 같은 키가 bin당 1개만 포함되도록 분류하여 순서를 보장하는 패턴
- FIFO Orderer : 가장 오래된 레코드부터 하나씩(또는 동시성 제한된 bulk로) 순차 전달하는 패턴
- Partial Commit : bulk 요청에서 일부만 성공하는 시맨틱. Kinesis PutRecords, DynamoDB BatchWriteItem, Elasticsearch bulk 등
- Full Commit : bulk 요청이 전부 성공하거나 전부 실패하는 시맨틱. Kafka가 대표적
- Delivery Bin : 같은 그룹핑 키가 최대 1개만 포함된 전송 단위. 순차 전송으로 순서 보장
- In-flight Requests : 응답을 기다리지 않고 동시에 보내는 요청. 처리량은 높지만 순서 역전 위험
- Idempotent Producer : Kafka에서 시퀀스 넘버 기반으로 최대 5개 동시 요청에서도 순서와 중복 제거를 보장하는 기능
- SequenceNumberForOrdering : Kinesis에서 이전 레코드의 시퀀스를 참조하여 순서를 강제하는 매개변수
- sortWithinPartitions : Spark에서 shuffle 없이 파티션 내부만 정렬하는 연산. Bin Pack의 전처리 단계
06. 패턴별 가져갈만한 지점들
| 카테고리 | 패턴 | 핵심 키워드 |
|---|---|---|
| Data Enrichment | Static Joiner | SCD, 정적 참조 JOIN |
| Dynamic Joiner | 스트림 간 JOIN, GC Watermark, 버퍼링 | |
| Data Decoration | Wrapper | 원본/계산값 분리, 4가지 구현 |
| Metadata Decorator | 기술 메타데이터 은닉 | |
| Data Aggregation | Distributed Aggregator | Shuffle, Salting, Data Skew |
| Local Aggregator | Shuffle 회피, Co-partitioning | |
| Sessionization | Incremental Sessionizer | 배치, 3개 저장 공간, Forward Dependency |
| Stateful Sessionizer | 스트리밍, State Store, Session Window | |
| Data Ordering | Bin Pack Orderer | Partial commit, Delivery Bin |
| FIFO Orderer | 순차 전달, Idempotent Producer |
[DE Design Pattern]05-9. Data Ordering
https://yjinheon.netlify.app/posts/02de/de-design-pattern/05_data_value/05-09-data-ordering/