Categories
Tags
AI airflow alias book build clang closure collection commandline config container DB decorator docker draft format functional generic git gradle intellij java JPA k3s k8s kafka kotlin linux loki monitoring msa neovim network nix poetry pointer python reflection shortcut Spring sql system-design testing web zero-copy
601 words
3 minutes
[Airflow]TaskFlow API의 XCom 생성 및 DB 저장 흐름
Overview
01. TaskFlow API에서 XCom 생성 및 DB 저장 흐름
사전개념
- TaskFlow API는 함수의 리턴값이 자동으로 XCom에 저장됨
- 일반 Airflow에서는 명시적
xcom_push()
가 필요하지만, TaskFlow API에서는 자동화됨
# 일반 Airflow 접근 def traditional_task(**context): value = get_data() context['ti'].xcom_push(key='my_data', value=value) # TaskFlow API 접근 @task def taskflow_task(): value = get_data() return value
실행절차
1. Decorator처리
@task
데코레이터가 함수를 감싸 특수 오퍼레이터로 변환
def task( python_callable=None, multiple_outputs=False, **kwargs ): def wrapper(f): # 태스크 함수를 특수 오퍼레이터로 래핑 return _PythonDecoratedOperator( python_callable=f, multiple_outputs=multiple_outputs, **kwargs ) if python_callable: return wrapper(python_callable) else: return wrapper
2. Operator 변환
- 데코레이터된 함수는
_PythonDecoratedOperator
로 변환됨
# _PythonDecoratedOperator 내부 (간략화) class _PythonDecoratedOperator(PythonOperator): def execute(self, context): # 부모 클래스의 execute 호출 return super().execute(context)
3. 함수 실행 및 결과 캡처
PythonOperator
의execute()
메서드가 함수를 실행하고 결과를 캡처
def execute(self, context): # run collable result = self.execute_callable(context) # XCom 푸시 설정이 켜져 있으면 결과 반환 (자동 XCom 저장용) if self.do_xcom_push: return result return None def execute_callable(self, context): # 사용자 함수 실행 result = self.python_callable(*self.op_args, **self.op_kwargs) # 결과가 None이 아니면 XCom 푸시 활성화 if result is not None: self.do_xcom_push = True return result return None
4. Xcom생성
execute()
에서 반환된 값은 태스크 인스턴스의xcom_push()
를 통해 XCom으로 저장- Airflow 내부적으로 이 과정이 자동 진행됨
# 내부 동작 (직접 호출할 필요 없음) context['ti'].xcom_push(key='return_value', value=result)
5. TaskInstance에서 XCom으로 변환
TaskInstance.xcom_push()
는 XCom 모델의set()
메서드를 호출airflow/models/taskinstance.py
에 정의됨
# TaskInstance 클래스 내부 (간략화) @provide_session def xcom_push( self, key: str, value: Any, session: Session = NEW_SESSION, ) -> None: # XCom 모델의 set() 메서드 호출 XCom.set( key=key, value=value, task_id=self.task_id, dag_id=self.dag_id, execution_date=self.execution_date, session=session, )
6. 메타데이터 데이터베이스 저장 단계
XCom.set()
메서드가 실제로 값을 데이터베이스에 저장airflow/models/xcom.py
에 정의됨
# XCom 클래스 @classmethod @provide_session def set( cls, key: str, value: Any, task_id: str, dag_id: str, execution_date: datetime, session: Session = NEW_SESSION, ) -> None: # XCom 객체 생성 및 병합 session.merge( XCom( key=key, value=value, task_id=task_id, dag_id=dag_id, execution_date=execution_date, ) ) # 변경사항 커밋 session.commit()
7 데이터베이스 구조
- XCom 값은 Airflow 메타데이터 데이터베이스의
xcom
테이블에 저장 - 일반적인 테이블 구조:
dag_id
: DAG 식별자task_id
: 태스크 식별자execution_date
: 실행 날짜/시간key
: XCom 키 (TaskFlow에서는 보통 ‘return_value’)value
: 직렬화된 반환값timestamp
: 저장 시간
[Airflow]TaskFlow API의 XCom 생성 및 DB 저장 흐름
https://yjinheon.netlify.app/posts/02de/airflow/de-airflow-04_taskflow_xcom/