Data/Airflow
[Airflow] Xcoms란?
운호(Noah)
2024. 3. 8. 14:20
Xcoms란?
- XComs(Cross-Communications)는 Apache Airflow에서 태스크 간에 메시지나 데이터를 교환하기 위한 메커니즘입니다.
- Xcoms를 통해 한 태스크가 생성한 데이터를 다른 태스크가 사용할 수 있으며, 이는 태스크 간의 의존성 관리와 데이터 공유에 매우 유용합니다.
- XComs는 Key-Value 쌍으로 데이터를 저장하며, Airflow의 메타데이터 데이터베이스에 저장됩니다.
예제 코드
태스크에서 데이터를 XCom에 push하는 가장 간단한 방법은 태스크의 실행 함수에서 값을 반환하는 것입니다. Airflow는 반환된 값을 자동으로 XCom에 push합니다.
@task def push_task(): # XCom으로 메시지를 push return 'Hello from the push side!' @task def pull_task(**kwargs): # XCom에서 메시지를 pull message = kwargs['ti'].xcom_pull(task_ids="push_task") print(f"Received message: {message}")
보다 세밀한 데이터 관리가 필요할 땐, 키-값 쌍을 지정해서 XCom에 Push 할 수도 있습니다.
@task def push_task(**kwargs): # 키를 지정하여 XCom으로 메시지를 push message = 'Hello from the push side!' kwargs['ti'].xcom_push(key='message', value=message) @task def pull_task(**kwargs): # 지정된 키로 XCom에서 메시지를 pull message = kwargs['ti'].xcom_pull(task_ids="push_task", key='message') print(f"Received message: {message}")