1274 words
6 minutes
[DE Design Pattern]05-7. Incremental Sessionizer
7. Sessionization: Incremental Sessionizer
01. Pattern Overview
- Incremental Sessionizer는 배치 파이프라인에서 이벤트를 세션으로 묶는 패턴입니다.
- 세션은 동일 엔티티(사용자, 방문 등)의 연속된 활동을 하나의 단위로 집계한 것
핵심 도전: 하나의 세션이 여러 파티션에 걸쳐 존재할 수 있습니다. 예를 들어 시간당 파티션에서 한 사용자의 세션이 3시간 지속되면, 3개 파티션에 레코드가 분산됩니다. 이를 증분 처리로 해결합니다.
02. 3개의 저장 공간
패턴의 구조적 핵심은 세 가지 저장 공간의 분리
- Input Dataset: 원본 이벤트 (시간 파티션별 방문 기록)
- Pending Sessions: 아직 완료되지 않은 진행 중 세션. 비공개 공간 — 내부 로직 전용
- Completed Sessions: 완료된 세션. 다운스트림 소비자에게 공개
03. 세션 라이프사이클: 3단계
Initialization → Accumulation → Finalization(세션 시작) (이벤트 축적) (세션 종료)- Initialization: 새 세션 엔티티 첫 이벤트 도착 시 생성 (예: 홈페이지 방문)
- Accumulation: 후속 이벤트를 세션에 추가 (예: 방문 페이지 목록 누적)
- Finalization: 비활동 기간 초과 또는 특정 이벤트 발생 시 세션 종료
04. 실행 흐름: Airflow 오케스트레이션
# Airflow DAG: 3단계 실행clean_previous_sessions = PostgresOperator(...) # 멱등성: 이전 실행 결과 삭제clean_previous_pending = PostgresOperator(...) # 멱등성: 이전 pending 삭제generate_sessions = PostgresOperator(...) # 세션 생성 로직
([clean_previous_sessions, clean_previous_pending] >> generate_sessions)-- 멱등성 보장: 현재 실행 이후의 결과를 모두 삭제DELETE FROM dedp.sessions WHERE execution_time_id >= '{{ ds }}';DELETE FROM dedp.pending_sessions WHERE execution_time_id >= '{{ ds }}';Airflow의 불변 ds 파라미터를 사용하여 재실행/백필 시에도 동일 결과를 보장
05. 세션 생성 SQL: 핵심 로직
-- 1) 입력 데이터 로딩CREATE TEMPORARY TABLE visits_{{ ds_nodash }} AS ...;COPY visits_{{ ds_nodash }} FROM '/data/visits_{{ ds }}.csv';
-- 2) 새 데이터 + 기존 pending sessions 결합CREATE TEMPORARY TABLE sessions_to_classify ASSELECT COALESCE(p.session_id, n.session_id) AS session_id, LEAST(p.start_time, n.start_time) AS start_time, GREATEST(p.last_visit_time, n.start_time) AS last_visit_time, ARRAY_CAT(p.pages, n.pages) AS pages, -- 페이지 목록 병합 CASE WHEN n.user_id IS NULL -- 새 데이터 없음 THEN p.expiration_batch_id -- 기존 만료 유지 ELSE '{{ macros.ds_add(ds, 2) }}' -- 새 데이터 있으면 만료 연장 END AS expiration_batch_idFROM new_visits nFULL OUTER JOIN dedp.pending_sessions p ON n.session_id = p.session_id;
-- 3) 분류 후 저장-- 아직 만료 안 된 세션 → pendingINSERT INTO dedp.pending_sessions SELECT ... FROM sessions_to_classify WHERE expiration_batch_id != '{{ ds }}';
-- 만료된 세션 → 완료INSERT INTO dedp.sessions SELECT ... FROM sessions_to_classify WHERE expiration_batch_id = '{{ ds }}';FULL OUTER JOIN: 새 데이터만 있는 경우(새 세션), pending만 있는 경우(새 이벤트 없는 기존 세션), 둘 다 있는 경우(기존 세션에 이벤트 추가) 모두 처리COALESCE,LEAST,GREATEST,ARRAY_CAT: 두 소스의 값을 병합하는 유틸리티 함수들expiration_batch_id: 새 데이터가 없으면 기존 만료를 유지, 있으면 2일 연장
주요 트레이드오프
- 비활동 기간(Inactivity Period): 길게 잡으면 late data 포함 가능하지만 리소스/비용 증가. 짧으면 세션이 조기 종료될 수 있음
- Partial Sessions: 미완료 세션도 output에 쓸 수 있지만,
is_completed: false플래그 필수. 소비자가 partial을 final로 오인하면 잘못된 판단 가능 (예: 사기 탐지에서 부분 세션이 “안전”으로 분류되었다가 이후 “위험”으로 변경) - Data Freshness: 배치 기반이므로 인사이트가 늦음. 최소 지연 = 파티션 간격 (시간당 파티션이면 최소 1시간)
- Late Data + Backfilling: 세션은 순방향 의존성을 가짐. 09:00 세션이 10:00에 영향, 10:00이 11:00에 영향. 따라서 하나의 파티션을 백필하면 이후 모든 파티션도 재실행 필요 → 비용이 급격히 증가
Concept
- Incremental Sessionizer : 배치 파이프라인에서 증분 처리로 세션을 생성하는 패턴. 3개의 저장 공간(Input, Pending, Completed)을 활용
- Session Lifecycle : Initialization(생성) → Accumulation(축적) → Finalization(종료)의 3단계 세션 라이프사이클
- Pending Sessions : 아직 완료되지 않은 진행 중 세션을 저장하는 비공개 공간. 다음 실행에서 다시 로드됨
- Inactivity Period : 세션 종료를 판단하는 비활동 기간 임계값. 이 시간 동안 새 이벤트가 없으면 세션 완료
- Forward Dependency : 이전 파티션의 세션 결과가 다음 파티션에 영향을 주는 의존성. 백필을 어렵게 만드는 원인
- Partial Session : 아직 완료되지 않은 세션을 output에 내보내는 것. is_completed 플래그로 구분 필수
- Expiration Batch ID : 세션의 만료 시점을 배치 실행 ID로 표현하여 세션 종료를 판단하는 메커니즘
- FULL OUTER JOIN : 새 데이터와 기존 pending을 결합할 때, 양쪽 모두에서 누락 없이 처리하기 위한 조인 방식
- Gap Detection in SQL : LAG/LEAD 함수로 이벤트 간 시간 차이를 계산하여 세션 경계를 탐지하는 기법
[DE Design Pattern]05-7. Incremental Sessionizer
https://yjinheon.netlify.app/posts/02de/de-design-pattern/05_data_value/05-07-incremental-sessionizer/