601 words
3 minutes
[Airflow]TaskFlow API의 XCom 생성 및 DB 저장 흐름
2025-04-03
2026-01-09

Overview#

01. TaskFlow API에서 XCom 생성 및 DB 저장 흐름#

사전개념#

  • TaskFlow API는 함수의 리턴값이 자동으로 XCom에 저장됨
  • 일반 Airflow에서는 명시적 xcom_push()가 필요하지만, TaskFlow API에서는 자동화됨
# 일반 Airflow 접근
def traditional_task(**context):
value = get_data()
context['ti'].xcom_push(key='my_data', value=value)
# TaskFlow API 접근
@task
def taskflow_task():
value = get_data()
return value

실행절차#

1. Decorator처리

  • @task 데코레이터가 함수를 감싸 특수 오퍼레이터로 변환
def task(
python_callable=None,
multiple_outputs=False,
**kwargs
):
def wrapper(f):
# 태스크 함수를 특수 오퍼레이터로 래핑
return _PythonDecoratedOperator(
python_callable=f,
multiple_outputs=multiple_outputs,
**kwargs
)
if python_callable:
return wrapper(python_callable)
else:
return wrapper

2. Operator 변환

  • 데코레이터된 함수는 _PythonDecoratedOperator로 변환됨
# _PythonDecoratedOperator 내부 (간략화)
class _PythonDecoratedOperator(PythonOperator):
def execute(self, context):
# 부모 클래스의 execute 호출
return super().execute(context)

3. 함수 실행 및 결과 캡처

  • PythonOperatorexecute() 메서드가 함수를 실행하고 결과를 캡처
def execute(self, context):
# run collable
result = self.execute_callable(context)
# XCom 푸시 설정이 켜져 있으면 결과 반환 (자동 XCom 저장용)
if self.do_xcom_push:
return result
return None
def execute_callable(self, context):
# 사용자 함수 실행
result = self.python_callable(*self.op_args, **self.op_kwargs)
# 결과가 None이 아니면 XCom 푸시 활성화
if result is not None:
self.do_xcom_push = True
return result
return None

4. Xcom생성

  • execute()에서 반환된 값은 태스크 인스턴스의 xcom_push()를 통해 XCom으로 저장
  • Airflow 내부적으로 이 과정이 자동 진행됨
# 내부 동작 (직접 호출할 필요 없음)
context['ti'].xcom_push(key='return_value', value=result)

5. TaskInstance에서 XCom으로 변환

  • TaskInstance.xcom_push()는 XCom 모델의 set() 메서드를 호출
  • airflow/models/taskinstance.py에 정의됨
# TaskInstance 클래스 내부 (간략화)
@provide_session
def xcom_push(
self,
key: str,
value: Any,
session: Session = NEW_SESSION,
) -> None:
# XCom 모델의 set() 메서드 호출
XCom.set(
key=key,
value=value,
task_id=self.task_id,
dag_id=self.dag_id,
execution_date=self.execution_date,
session=session,
)

6. 메타데이터 데이터베이스 저장 단계

  • XCom.set() 메서드가 실제로 값을 데이터베이스에 저장
  • airflow/models/xcom.py에 정의됨
# XCom 클래스
@classmethod
@provide_session
def set(
cls,
key: str,
value: Any,
task_id: str,
dag_id: str,
execution_date: datetime,
session: Session = NEW_SESSION,
) -> None:
# XCom 객체 생성 및 병합
session.merge(
XCom(
key=key,
value=value,
task_id=task_id,
dag_id=dag_id,
execution_date=execution_date,
)
)
# 변경사항 커밋
session.commit()

7 데이터베이스 구조

  • XCom 값은 Airflow 메타데이터 데이터베이스의 xcom 테이블에 저장
  • 일반적인 테이블 구조:
    • dag_id: DAG 식별자
    • task_id: 태스크 식별자
    • execution_date: 실행 날짜/시간
    • key: XCom 키 (TaskFlow에서는 보통 ‘return_value’)
    • value: 직렬화된 반환값
    • timestamp: 저장 시간
[Airflow]TaskFlow API의 XCom 생성 및 DB 저장 흐름
https://yjinheon.netlify.app/posts/02de/airflow/de-airflow-04_taskflow_xcom/
Author
Datamind
Published at
2025-04-03
License
CC BY-NC-SA 4.0