1274 words
6 minutes
[DE Design Pattern]05-7. Incremental Sessionizer

7. Sessionization: Incremental Sessionizer#

01. Pattern Overview#

  • Incremental Sessionizer는 배치 파이프라인에서 이벤트를 세션으로 묶는 패턴입니다.
  • 세션은 동일 엔티티(사용자, 방문 등)의 연속된 활동을 하나의 단위로 집계한 것

핵심 도전: 하나의 세션이 여러 파티션에 걸쳐 존재할 수 있습니다. 예를 들어 시간당 파티션에서 한 사용자의 세션이 3시간 지속되면, 3개 파티션에 레코드가 분산됩니다. 이를 증분 처리로 해결합니다.

02. 3개의 저장 공간#

패턴의 구조적 핵심은 세 가지 저장 공간의 분리

  1. Input Dataset: 원본 이벤트 (시간 파티션별 방문 기록)
  2. Pending Sessions: 아직 완료되지 않은 진행 중 세션. 비공개 공간 — 내부 로직 전용
  3. Completed Sessions: 완료된 세션. 다운스트림 소비자에게 공개

03. 세션 라이프사이클: 3단계#

Initialization → Accumulation → Finalization
(세션 시작) (이벤트 축적) (세션 종료)
  • Initialization: 새 세션 엔티티 첫 이벤트 도착 시 생성 (예: 홈페이지 방문)
  • Accumulation: 후속 이벤트를 세션에 추가 (예: 방문 페이지 목록 누적)
  • Finalization: 비활동 기간 초과 또는 특정 이벤트 발생 시 세션 종료

04. 실행 흐름: Airflow 오케스트레이션#

# Airflow DAG: 3단계 실행
clean_previous_sessions = PostgresOperator(...) # 멱등성: 이전 실행 결과 삭제
clean_previous_pending = PostgresOperator(...) # 멱등성: 이전 pending 삭제
generate_sessions = PostgresOperator(...) # 세션 생성 로직
([clean_previous_sessions, clean_previous_pending]
>> generate_sessions)
-- 멱등성 보장: 현재 실행 이후의 결과를 모두 삭제
DELETE FROM dedp.sessions WHERE execution_time_id >= '{{ ds }}';
DELETE FROM dedp.pending_sessions WHERE execution_time_id >= '{{ ds }}';

Airflow의 불변 ds 파라미터를 사용하여 재실행/백필 시에도 동일 결과를 보장

05. 세션 생성 SQL: 핵심 로직#

-- 1) 입력 데이터 로딩
CREATE TEMPORARY TABLE visits_{{ ds_nodash }} AS ...;
COPY visits_{{ ds_nodash }} FROM '/data/visits_{{ ds }}.csv';
-- 2) 새 데이터 + 기존 pending sessions 결합
CREATE TEMPORARY TABLE sessions_to_classify AS
SELECT
COALESCE(p.session_id, n.session_id) AS session_id,
LEAST(p.start_time, n.start_time) AS start_time,
GREATEST(p.last_visit_time, n.start_time) AS last_visit_time,
ARRAY_CAT(p.pages, n.pages) AS pages, -- 페이지 목록 병합
CASE
WHEN n.user_id IS NULL -- 새 데이터 없음
THEN p.expiration_batch_id -- 기존 만료 유지
ELSE '{{ macros.ds_add(ds, 2) }}' -- 새 데이터 있으면 만료 연장
END AS expiration_batch_id
FROM new_visits n
FULL OUTER JOIN dedp.pending_sessions p
ON n.session_id = p.session_id;
-- 3) 분류 후 저장
-- 아직 만료 안 된 세션 → pending
INSERT INTO dedp.pending_sessions
SELECT ... FROM sessions_to_classify
WHERE expiration_batch_id != '{{ ds }}';
-- 만료된 세션 → 완료
INSERT INTO dedp.sessions
SELECT ... FROM sessions_to_classify
WHERE expiration_batch_id = '{{ ds }}';
  • FULL OUTER JOIN: 새 데이터만 있는 경우(새 세션), pending만 있는 경우(새 이벤트 없는 기존 세션), 둘 다 있는 경우(기존 세션에 이벤트 추가) 모두 처리
  • COALESCE, LEAST, GREATEST, ARRAY_CAT: 두 소스의 값을 병합하는 유틸리티 함수들
  • expiration_batch_id: 새 데이터가 없으면 기존 만료를 유지, 있으면 2일 연장

주요 트레이드오프#

  • 비활동 기간(Inactivity Period): 길게 잡으면 late data 포함 가능하지만 리소스/비용 증가. 짧으면 세션이 조기 종료될 수 있음
  • Partial Sessions: 미완료 세션도 output에 쓸 수 있지만, is_completed: false 플래그 필수. 소비자가 partial을 final로 오인하면 잘못된 판단 가능 (예: 사기 탐지에서 부분 세션이 “안전”으로 분류되었다가 이후 “위험”으로 변경)
  • Data Freshness: 배치 기반이므로 인사이트가 늦음. 최소 지연 = 파티션 간격 (시간당 파티션이면 최소 1시간)
  • Late Data + Backfilling: 세션은 순방향 의존성을 가짐. 09:00 세션이 10:00에 영향, 10:00이 11:00에 영향. 따라서 하나의 파티션을 백필하면 이후 모든 파티션도 재실행 필요 → 비용이 급격히 증가

Concept

  • Incremental Sessionizer : 배치 파이프라인에서 증분 처리로 세션을 생성하는 패턴. 3개의 저장 공간(Input, Pending, Completed)을 활용
  • Session Lifecycle : Initialization(생성) → Accumulation(축적) → Finalization(종료)의 3단계 세션 라이프사이클
  • Pending Sessions : 아직 완료되지 않은 진행 중 세션을 저장하는 비공개 공간. 다음 실행에서 다시 로드됨
  • Inactivity Period : 세션 종료를 판단하는 비활동 기간 임계값. 이 시간 동안 새 이벤트가 없으면 세션 완료
  • Forward Dependency : 이전 파티션의 세션 결과가 다음 파티션에 영향을 주는 의존성. 백필을 어렵게 만드는 원인
  • Partial Session : 아직 완료되지 않은 세션을 output에 내보내는 것. is_completed 플래그로 구분 필수
  • Expiration Batch ID : 세션의 만료 시점을 배치 실행 ID로 표현하여 세션 종료를 판단하는 메커니즘
  • FULL OUTER JOIN : 새 데이터와 기존 pending을 결합할 때, 양쪽 모두에서 누락 없이 처리하기 위한 조인 방식
  • Gap Detection in SQL : LAG/LEAD 함수로 이벤트 간 시간 차이를 계산하여 세션 경계를 탐지하는 기법

[DE Design Pattern]05-7. Incremental Sessionizer
https://yjinheon.netlify.app/posts/02de/de-design-pattern/05_data_value/05-07-incremental-sessionizer/
Author
Datamind
Published at
2025-03-14
License
CC BY-NC-SA 4.0