목록Data (66)
우노
Xcoms란? XComs(Cross-Communications)는 Apache Airflow에서 태스크 간에 메시지나 데이터를 교환하기 위한 메커니즘입니다. Xcoms를 통해 한 태스크가 생성한 데이터를 다른 태스크가 사용할 수 있으며, 이는 태스크 간의 의존성 관리와 데이터 공유에 매우 유용합니다. XComs는 Key-Value 쌍으로 데이터를 저장하며, Airflow의 메타데이터 데이터베이스에 저장됩니다. 예제 코드 태스크에서 데이터를 XCom에 push하는 가장 간단한 방법은 태스크의 실행 함수에서 값을 반환하는 것입니다. Airflow는 반환된 값을 자동으로 XCom에 push합니다. @task def push_task(): # XCom으로 메시지를 push return 'Hello fro..
ShortCircuitOperator란? 지정된 조건이 참(True)일 때만 다운스트림 작업을 실행하는 Operator입니다. 지정된 조건이 거짓(False)이라면, ShortCircuitOperator는 다운스트림 작업의 실행을 건너뜁니다. ignore_downstream_trigger_rules 설정 ShortCircuitOperator가 거짓(False)된 조건으로 인하여 다운스트림 작업을 건너뛸 때, 다운스트림 작업들의 트리거 규칙을 무시할 것인지 결정하는 설정입니다. ignore_downstream_trigger_rules=True를 적용한 경우 (기본 설정) 조건이 참(True)일 때 : ShortCircuitOperator 뒤에 오는 모든 다운스트림 작업을 실행합니다. 조건이 거짓(False..
들어가기 앞서, Airflow 운영에 있어서 Airflow 모니터링은 중요한 요소이며, 모니터링은 크게 두 가지 측면으로 구분할 수 있습니다. 서비스 측면 (Airflow 컨테이너 동작 여부, DAG 정상 실행 여부 등) 시스템 측면 (CPU, Memory 등) 해당 포스팅에선 Airflow의 서비스 측면 모니터링을 위해, StatsD Exporter + Prometheus + Grafana를 구성하는 방법에 대해서 다뤄보겠습니다. StatsD Exporter StatsD는 애플리케이션에서 발생하는 이벤트 및 성능 지표를 수집하고, 이를 모니터링 시스템에 전송하는 데 사용되는 네트워크 데몬 및 프로토콜입니다. Airflow 내부적으로 생성된 Metric은, StatsD를 통해 StatsD Exporter..
들어가기 앞서, Airflow 환경 airflow-stable helm chart를 사용해 GKE에 airflow가 설치된 상황 문제 상황 DAG에서 Airflow REST API를 호출하고 싶음 해결 방법 DAG는 Worker에서 동작하고, Airflow REST API 서버는 Webserver에서 동작하기 때문에 DAG에서 Airflow REST API를 호출하고 싶다면 Worker에서 Webserver로 REST API 내부 호출이 가능해야합니다. 따라서, basic auth를 사용해 Pod간 내부 호출을 할 수 있도록 API 관련 Airflow Configuration을 변경했습니다. 기타 Worker Pod에서 Webserver Pod에 접근하기 위해 Webserver Pod의 Service를 ..
들어가기앞서, Airflow는 Airflow CLI를 사용해, Airflow에 설정된 Configuration을 확인할 수 있습니다. Airflow CLI 전체 Configuration 목록 확인 $ airflow config list 특정 Configuration Option 확인 # airflow config get-value [Section] [Option] $ airflow config get-value core executor SequentialExecutor $ airflow config get-value metrics statsd_host localhost 참고 사이트 https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-var..
Executor 설정이란? Task 실행 방식이라고 이해하시면 될 것 같습니다 ㅎㅎ 적용된 Executor 확인 명령어 airflow config get-value core executor
schedule_interval이란? schedule_interval은 DAG 작업의 실행 주기를 의미합니다. Airflow는 이 정보를 기반으로 작업 스케줄링을 수행하며, 지정된 빈도에 따라 작업을 예약하고 실행합니다. 이 값은 cron 표현식 또는 timedelta 객체를 사용하여 지정할 수 있습니다. 해당 포스트에선, cron 표현식에 대해서 다뤄보겠습니다. CRON 표현식이란? cron 표현식은 유닉스 계열 운영체제에서 일정한 시간 간격으로 작업을 실행하기 위해 사용되는 표현식입니다. cron 표현식은 일반적으로 다음과 같은 형식으로 구성됩니다. * * * * * 각각의 별표(*)는 다음과 같은 의미를 가집니다. 분(Minute) : 0부터 59까지의 값을 가집니다. 시간(Hour) : 0부터 2..
들어가기 앞서, Breeze 가 제공하는 CSCMatrix Multiply 는, 곱셈전에 결과 Matrix 의 NNZ 를 먼저 구합니다. 결과 Matrix 의 NNZ 를 구한 이후에는, NNZ 에 따라 결과 Matrix 를 표현하기 위한 배열 공간을 정적으로 할당합니다. 해당 포스트에서는, 기본적인 Breeze CSCMatrix Multiply 와 달리, 곱셈전에 임의의 배열 공간을 정적으로 할당한 뒤, 곱셈 과정에서 부족한 배열 공간을 동적으로 할당하는 코드를 다뤄보겠습니다. 함수 선언 Multiply 함수 import java.util.Arrays import breeze.linalg.CSCMatrix import java.util.concurrent.TimeUnit.NANOSECONDS def dy..
들어가기 앞서, Breeze 가 제공하는 CSCMatrix Multiply 는, 곱셈전에 결과 Matrix 의 NNZ 를 먼저 구합니다. 결과 Matrix 의 NNZ 를 구한 이후에는, NNZ 에 따라 결과 Matrix 를 표현하기 위한 배열 공간을 정적 할당합니다. 함수 선언 결과 Matrix NNZ 계산 함수 // 결과 Matrix 의 NNZ 를 미리 계산 def computeNnz(a: CSCMatrix[Double], b: CSCMatrix[Double], workIndex: Array[Double]) = { // nnz 를 0 으로 초기화 var nnz = 0 // col = 우측 행렬의 col index 순서대로 진행 for (col
Tensor 란? Tensor 란, d 차원의 배열을 의미한다. d 가 1 이라면, First Order Tensor 이며, 이는 Vector 라고 불린다. d 가 2 라면, Second Order Tensor 이며, 이는 Matrix 라고 불린다. d 가 3 이라면, Third Order Tensor 이며, 이는 Tensor 라고 불린다. Tensor Decomposition 이란? 우리 주위에는 아주 다양한 Tensor 데이터들이 존재한다. 추천 시스템 영화 추천, 친구 추천, 쇼핑 추천 등 예를 들어, x 축은 사람, y 축은 영화, z 축은 시간, value 는 평점으로 이루어진 어떤 사람이 어떤 영화에 어떤 시간에 어떤 평점을 주었는지에 대한 정보를 담고 있는 Tensor 가 있을 수 있다. 데..