601 words
3 minutes
[Airflow]TaskFlow API의 XCom 생성 및 DB 저장 흐름
Overview
01. TaskFlow API에서 XCom 생성 및 DB 저장 흐름
사전개념
- TaskFlow API는 함수의 리턴값이 자동으로 XCom에 저장됨
- 일반 Airflow에서는 명시적
xcom_push()가 필요하지만, TaskFlow API에서는 자동화됨
# 일반 Airflow 접근def traditional_task(**context): value = get_data() context['ti'].xcom_push(key='my_data', value=value)
# TaskFlow API 접근@taskdef taskflow_task(): value = get_data() return value실행절차
1. Decorator처리
@task데코레이터가 함수를 감싸 특수 오퍼레이터로 변환
def task( python_callable=None, multiple_outputs=False, **kwargs): def wrapper(f): # 태스크 함수를 특수 오퍼레이터로 래핑 return _PythonDecoratedOperator( python_callable=f, multiple_outputs=multiple_outputs, **kwargs )
if python_callable: return wrapper(python_callable) else: return wrapper2. Operator 변환
- 데코레이터된 함수는
_PythonDecoratedOperator로 변환됨
# _PythonDecoratedOperator 내부 (간략화)class _PythonDecoratedOperator(PythonOperator): def execute(self, context): # 부모 클래스의 execute 호출 return super().execute(context)3. 함수 실행 및 결과 캡처
PythonOperator의execute()메서드가 함수를 실행하고 결과를 캡처
def execute(self, context): # run collable result = self.execute_callable(context)
# XCom 푸시 설정이 켜져 있으면 결과 반환 (자동 XCom 저장용) if self.do_xcom_push: return result
return None
def execute_callable(self, context): # 사용자 함수 실행 result = self.python_callable(*self.op_args, **self.op_kwargs)
# 결과가 None이 아니면 XCom 푸시 활성화 if result is not None: self.do_xcom_push = True return result return None4. Xcom생성
execute()에서 반환된 값은 태스크 인스턴스의xcom_push()를 통해 XCom으로 저장- Airflow 내부적으로 이 과정이 자동 진행됨
# 내부 동작 (직접 호출할 필요 없음)context['ti'].xcom_push(key='return_value', value=result)5. TaskInstance에서 XCom으로 변환
TaskInstance.xcom_push()는 XCom 모델의set()메서드를 호출airflow/models/taskinstance.py에 정의됨
# TaskInstance 클래스 내부 (간략화)@provide_sessiondef xcom_push( self, key: str, value: Any, session: Session = NEW_SESSION,) -> None: # XCom 모델의 set() 메서드 호출 XCom.set( key=key, value=value, task_id=self.task_id, dag_id=self.dag_id, execution_date=self.execution_date, session=session, )6. 메타데이터 데이터베이스 저장 단계
XCom.set()메서드가 실제로 값을 데이터베이스에 저장airflow/models/xcom.py에 정의됨
# XCom 클래스@classmethod@provide_sessiondef set( cls, key: str, value: Any, task_id: str, dag_id: str, execution_date: datetime, session: Session = NEW_SESSION,) -> None: # XCom 객체 생성 및 병합 session.merge( XCom( key=key, value=value, task_id=task_id, dag_id=dag_id, execution_date=execution_date, ) ) # 변경사항 커밋 session.commit()7 데이터베이스 구조
- XCom 값은 Airflow 메타데이터 데이터베이스의
xcom테이블에 저장 - 일반적인 테이블 구조:
dag_id: DAG 식별자task_id: 태스크 식별자execution_date: 실행 날짜/시간key: XCom 키 (TaskFlow에서는 보통 ‘return_value’)value: 직렬화된 반환값timestamp: 저장 시간
[Airflow]TaskFlow API의 XCom 생성 및 DB 저장 흐름
https://yjinheon.netlify.app/posts/02de/airflow/de-airflow-04_taskflow_xcom/