Data/Airflow

[Airflow] Xcoms란?

운호(Noah) 2024. 3. 8. 14:20

Xcoms란?

  • XComs(Cross-Communications)는 Apache Airflow에서 태스크 간에 메시지나 데이터를 교환하기 위한 메커니즘입니다.
  • Xcoms를 통해 한 태스크가 생성한 데이터를 다른 태스크가 사용할 수 있으며, 이는 태스크 간의 의존성 관리와 데이터 공유에 매우 유용합니다.
  • XComs는 Key-Value 쌍으로 데이터를 저장하며, Airflow의 메타데이터 데이터베이스에 저장됩니다.

예제 코드

  • 태스크에서 데이터를 XCom에 push하는 가장 간단한 방법은 태스크의 실행 함수에서 값을 반환하는 것입니다. Airflow는 반환된 값을 자동으로 XCom에 push합니다.

    
      @task
      def push_task():
          # XCom으로 메시지를 push
          return 'Hello from the push side!'
    
      @task
      def pull_task(**kwargs):
          # XCom에서 메시지를 pull
          message = kwargs['ti'].xcom_pull(task_ids="push_task")
          print(f"Received message: {message}")

  • 보다 세밀한 데이터 관리가 필요할 땐, 키-값 쌍을 지정해서 XCom에 Push 할 수도 있습니다.

      @task
      def push_task(**kwargs):
          # 키를 지정하여 XCom으로 메시지를 push
          message = 'Hello from the push side!'
          kwargs['ti'].xcom_push(key='message', value=message)
    
      @task
      def pull_task(**kwargs):
          # 지정된 키로 XCom에서 메시지를 pull
          message = kwargs['ti'].xcom_pull(task_ids="push_task", key='message')
          print(f"Received message: {message}")