1167 words
6 minutes
[DE Design Pattern]05-8. Stateful Sessionizer
8. Sessionization: Stateful Sessionizer
01. Pattern Overview
Stateful Sessionizer는 스트리밍 파이프라인에서 세션을 생성하는 패턴입니다. Incremental Sessionizer의 “pending sessions 저장소” 역할을 State Store가 대신합니다.
일반 스트리밍 파이프라인은 stateless(상태 없음)라서 이전 이벤트를 기억하지 못합니다. Stateful Sessionizer는 state store를 통해 진행 중인 세션을 유지하면서 새 이벤트를 축적합니다.
02. 두 가지 구현 방식
1. Session Window
gap duration(비활동 허용 시간)만 지정하면 자동으로 세션을 관리
# Apache Flink: Session Window (개념 — Python 참고용)sessions = ( visits_stream .key_by(lambda v: v.visit_id) .window(EventTimeSessionWindows.with_gap(Time.minutes(10))) .allowed_lateness(Time.minutes(15)) # 윈도우 닫힌 후 15분간 late data 허용 .process(VisitToSessionConverter()))gap duration이 정적입니다. 모든 세션 키에 동일한 비활동 기간이 적용
2. Arbitrary Stateful Processing
직접 state store를 제어
from pyspark.sql import functions as Ffrom pyspark.sql.types import *
# 1단계: watermark + groupBy 선언grouped_visits = ( visits_from_kafka .withWatermark("event_time", "1 minute") .groupBy(F.col("visit_id")))
# 2단계: stateful mapping 정의sessions = grouped_visits.applyInPandasWithState( func=map_visits_to_session, # 세션 로직 함수 outputStructType=StructType([ # 완료된 세션 스키마 StructField("visit_id", StringType()), StructField("user_id", StringType()), StructField("start_time", TimestampType()), StructField("end_time", TimestampType()), StructField("visited_pages", ArrayType(StringType())), StructField("duration_ms", LongType()), ]), stateStructType=StructType([ # pending 세션(state store) 스키마 StructField("visits", ArrayType(StringType())), StructField("user_id", StringType()), ]), outputMode="update", timeoutConf="EventTimeTimeout", # 이벤트 시간 기반 만료)세션 로직 함수: 3개 분기
def map_visits_to_session(visit_id_tuple, input_rows, current_state): session_expiration_ms = 10 * 60 * 1000 # 10분 visit_id = visit_id_tuple[0]
# 분기 1: 타임아웃 → 세션 완료 if current_state.hasTimedOut: visits, user_id = current_state.get session = build_final_session(visit_id, visits, user_id) current_state.remove() # state store에서 제거 yield pandas.DataFrame(session) return
# 분기 2: 만료 기준 시간 결정 if current_state.getCurrentWatermarkMs() == 0: # 첫 iteration → event time 사용 base_watermark = max_event_time_from(input_rows) else: base_watermark = current_state.getCurrentWatermarkMs()
# 분기 3: 이벤트 축적 + state 갱신 previous_visits = current_state.get[0] if current_state.exists else [] new_visits = extract_visits(input_rows) all_visits = previous_visits + new_visits
current_state.update((all_visits, user_id))
# 만료 타이머 설정 timeout_ts = base_watermark + session_expiration_ms current_state.setTimeoutTimestamp(timeout_ts)hasTimedOut→ 비활동 기간 초과, 세션을 최종 output으로 내보내고 state 삭제- 타임아웃 아님 → 새 이벤트를 기존 state에 축적하고 만료 시간 갱신
Watermark vs Event Time 만료 전략
| 상태 키 | Event Time | Watermark 기반 만료 | Event-time 기반 만료 | 새 Watermark |
|---|---|---|---|---|
| A | 10:00 | 10:10 | 10:10 | 09:59 |
| A | 10:01 | 10:09 | 10:11 | 10:00 |
| A | 10:08 | 10:10 | 10:18 | 10:07 |
| B | 10:15 | 10:17 | 10:25 | 10:14 |
Watermark 기반 만료가 잡의 실제 진행 상황을 반영하기에 보다 정확한만료전략
주요 트레이드오프
- At-least-once: 체크포인팅이 매 상태 변경마다 발생하지 않으므로, 장애 시 마지막 체크포인트부터 재처리 → 중복 가능. 세션 키 생성에 실시간(processing time) 사용 금지 (재시작마다 값이 달라짐)
- Scaling 비용: 노드 추가/제거 시 state rebalancing 필요. 특정 키의 state가 새 워커에 할당될 때까지 처리 중단
- 비활동 기간 길이: 길면 하드웨어 부담 + output freshness 저하. 짧으면 세션이 조기 종료
- Processing Time 기반 만료의 위험: 쓰기 재시도 등 예상치 못한 지연이 발생하면 세션이 너무 일찍 만료될 수 있음. 이벤트 시간 기반 추론이 항상 더 안전
Concept
- Stateful Sessionizer : 스트리밍 파이프라인에서 state store를 활용하여 실시간 세션을 생성하는 패턴
- State Store : 진행 중인 세션을 유지하는 저장소. In-memory(빠른 접근) + 영구 스토리지(장애 복구) 이중 구조
- Session Window : gap duration만 지정하면 프레임워크가 자동 관리하는 선언적 세션 윈도우
- Gap Duration : 같은 세션 키의 두 이벤트 사이 허용되는 최대 비활동 시간. 초과 시 새 세션 생성
- Arbitrary Stateful Processing : state store를 직접 제어하는 방식. 키별 다른 만료 로직 등 유연한 구현 가능
- Checkpointing : state store를 영구 스토리지에 주기적으로 동기화. 장애 복구의 기반이지만 at-least-once 원인
- Watermark 기반 만료 : 잡의 진행 상황(watermark)을 기준으로 세션 만료를 판단하는 전략. Event time 기반보다 정확
- State Rebalancing : 스케일링 시 state 키를 새 워커에 재분배하는 과정. 처리 지연을 유발
- allowed_lateness : 윈도우가 닫힌 후에도 추가로 late data를 받아들이는 유예 기간
추가적으로 공부하면 좋은 컨셉, 주제들:
- RocksDB State Backend : Flink/Kafka Streams에서 대용량 state를 디스크 기반으로 관리하는 메커니즘
- Incremental Checkpointing : 전체가 아닌 변경된 state만 체크포인팅하여 오버헤드를 줄이는 최적화
- Exactly-once in Streaming : Kafka Transactions + 체크포인팅으로 정확히 한 번 처리를 달성하는 방법
- Spark flatMapGroupsWithState : applyInPandasWithState의 Scala 버전. 더 세밀한 state 제어 가능
- Flink ProcessFunction : Flink의 저수준 stateful 처리 API
[DE Design Pattern]05-8. Stateful Sessionizer
https://yjinheon.netlify.app/posts/02de/de-design-pattern/05_data_value/05-08-stateful-sessionizer/