601 words
3 minutes
[Airflow]TaskFlow API의 XCom 생성 및 DB 저장 흐름

Overview#

01. TaskFlow API에서 XCom 생성 및 DB 저장 흐름#

사전개념#

  • TaskFlow API는 함수의 리턴값이 자동으로 XCom에 저장됨
  • 일반 Airflow에서는 명시적 xcom_push()가 필요하지만, TaskFlow API에서는 자동화됨
# 일반 Airflow 접근 def traditional_task(**context): value = get_data() context['ti'].xcom_push(key='my_data', value=value) # TaskFlow API 접근 @task def taskflow_task(): value = get_data() return value

실행절차#

1. Decorator처리

  • @task 데코레이터가 함수를 감싸 특수 오퍼레이터로 변환
def task( python_callable=None, multiple_outputs=False, **kwargs ): def wrapper(f): # 태스크 함수를 특수 오퍼레이터로 래핑 return _PythonDecoratedOperator( python_callable=f, multiple_outputs=multiple_outputs, **kwargs ) if python_callable: return wrapper(python_callable) else: return wrapper

2. Operator 변환

  • 데코레이터된 함수는 _PythonDecoratedOperator로 변환됨
# _PythonDecoratedOperator 내부 (간략화) class _PythonDecoratedOperator(PythonOperator): def execute(self, context): # 부모 클래스의 execute 호출 return super().execute(context)

3. 함수 실행 및 결과 캡처

  • PythonOperatorexecute() 메서드가 함수를 실행하고 결과를 캡처
def execute(self, context): # run collable result = self.execute_callable(context) # XCom 푸시 설정이 켜져 있으면 결과 반환 (자동 XCom 저장용) if self.do_xcom_push: return result return None def execute_callable(self, context): # 사용자 함수 실행 result = self.python_callable(*self.op_args, **self.op_kwargs) # 결과가 None이 아니면 XCom 푸시 활성화 if result is not None: self.do_xcom_push = True return result return None

4. Xcom생성

  • execute()에서 반환된 값은 태스크 인스턴스의 xcom_push()를 통해 XCom으로 저장
  • Airflow 내부적으로 이 과정이 자동 진행됨
# 내부 동작 (직접 호출할 필요 없음) context['ti'].xcom_push(key='return_value', value=result)

5. TaskInstance에서 XCom으로 변환

  • TaskInstance.xcom_push()는 XCom 모델의 set() 메서드를 호출
  • airflow/models/taskinstance.py에 정의됨
# TaskInstance 클래스 내부 (간략화) @provide_session def xcom_push( self, key: str, value: Any, session: Session = NEW_SESSION, ) -> None: # XCom 모델의 set() 메서드 호출 XCom.set( key=key, value=value, task_id=self.task_id, dag_id=self.dag_id, execution_date=self.execution_date, session=session, )

6. 메타데이터 데이터베이스 저장 단계

  • XCom.set() 메서드가 실제로 값을 데이터베이스에 저장
  • airflow/models/xcom.py에 정의됨
# XCom 클래스 @classmethod @provide_session def set( cls, key: str, value: Any, task_id: str, dag_id: str, execution_date: datetime, session: Session = NEW_SESSION, ) -> None: # XCom 객체 생성 및 병합 session.merge( XCom( key=key, value=value, task_id=task_id, dag_id=dag_id, execution_date=execution_date, ) ) # 변경사항 커밋 session.commit()

7 데이터베이스 구조

  • XCom 값은 Airflow 메타데이터 데이터베이스의 xcom 테이블에 저장
  • 일반적인 테이블 구조:
    • dag_id: DAG 식별자
    • task_id: 태스크 식별자
    • execution_date: 실행 날짜/시간
    • key: XCom 키 (TaskFlow에서는 보통 ‘return_value’)
    • value: 직렬화된 반환값
    • timestamp: 저장 시간
[Airflow]TaskFlow API의 XCom 생성 및 DB 저장 흐름
https://yjinheon.netlify.app/posts/02de/airflow/de-airflow-04_taskflow_xcom/
Author
Datamind
Published at
2025-04-03