우노
[Helm] Helm Chart를 사용해 GKE에 Airflow 설치하기 본문
들어가기 앞서,
- 데이터 엔지니어가 파이프라인에 집중하기 위해서는 인프라 업무 및 시스템 업무에서 분리되는 것이 가장 좋습니다.
- 이런 환경을 제공해 주는 것이 Google Cloud의 Cloud Composer입니다.
- Cloud Composer는 Apache Airflow를 기반으로 하는 완전 관리형 워크플로 조정 서비스입니다.
- 이러한 완전 관리형 서비스는 운영 부담이 적지만 비용이 많이 발생한다는 특징이 있습니다.
- 만약, 비용 효율적인 운영을 목표로 하고 있다면 Airflow 직접 운영과 Cloud Composer 사용을 비교함으로써 적절한 방식을 채택하는 게 좋을 수 있습니다.
- 따라서, 해당 포스팅에선 Airflow 직접 운영을 위해,
- Airflow Helm Chart를 사용해 Google Kubernetes Engine(GKE)에 Airflow를 설치하고
- Google Cloud Load Balancer(GCLB)를 통해 Airflow Dashboard에 접근하는 방법에 대해서 다뤄보겠습니다.
- Google Service Account, GCLB 관련 설정은 Terraform을 통해 진행하며,
- Airflow 설치를 위한 GKE 생성 및 연결은 사전 구성되어 있다고 가정하겠습니다.
진행 순서
- Terraform
- Google Service Account 생성
- Kubernetes Service Account 생성 및 Google Service Account와 매핑
- Airflow Helm Chart
- Airflow Helm Chart 저장소 추가 및 Value 파일 다운로드
- Value - Config 설정
- Kubernetes ServiceAccount
- Airflow Config
- Airflow DAGs
- Value - Database 설정
- PgBouncer, Postgres
- Value - Component 설정
- Airflow Scheduler
- Airflow Workers
- Triggerer, Flower
- Airflow Webserver
- Airflow Helm Chart Install
- Terraform
- Google Cloud Load Balancer(GCLB) 백엔드 서비스 및 라우팅 설정
Terraform - Google Service Account 생성
Airflow가 Google Service Account를 기반으로 GCP Service에 접근할 수 있도록 Google Service Account 및 IAM을 생성합니다.
resource "google_service_account" "airflow_sa" { account_id = "사용자지정" display_name = "사용자지정" description = "Airflow에서 사용되는 Service Account입니다." } resource "google_service_account_key" "airflow_sa_key" { service_account_id = google_service_account.airflow_sa.name } resource "google_project_iam_member" "project_iam_member" { project = "프로젝트ID" role = "사용자지정역할" member = "serviceAccount:${google_service_account.airflow_sa.email}" }
Terraform - Kubernetes Service Account 생성 및 Google Service Account와 매핑
Airflow가 Google Service Account를 기반으로 GCP Service에 접근할 수 있도록, GKE 내부에 Kubernetes Service Account를 생성하고 이를 Google Service Account와 매핑합니다.
# Google Service Account에 부여된 권한을 이용할 수 있는 Kubernetes Service Account 생성 resource "kubernetes_service_account" "airflow_ksa" { metadata { name = "airflow-ksa" namespace = "네임스페이스명" annotations = { "iam.gke.io/gcp-service-account" = google_service_account.airflow_sa.email } } } # Google Service Account와 Kubernetes Service Account를 매핑 resource "google_service_account_iam_member" "airflow_ksa_mapping" { member = "serviceAccount:${프로젝트ID}.svc.id.goog[${네임스페이스명}/${kubernetes_service_account.airflow_ksa.metadata[0].name}]" role = "roles/iam.workloadIdentityUser" service_account_id = google_service_account.airflow_sa.id }
Helm Chart - Airflow Helm Chart 저장소 추가 및 Value 파일 다운로드
Airflow 설치를 위한 GKE 생성 및 연결은 사전 구성되어 있다고 가정했으므로,
이제 GKE에 Airflow Helm Chart를 설치하기 위해 Airflow Helm Chart 저장소를 추가하고 Value 파일을 다운로드합니다.
Airflow Helm Chart 저장소 추가
helm repo add airflow-stable https://airflow-helm.github.io/charts
Airflow Helm Chart Value 파일 다운로드
helm show values airflow-stable/airflow > airflow-stable-values.yaml
Airflow Helm Chart Skeleton Code
해당 포스팅에선, Airflow Helm Chart Value 파일의 overwrite 부분만 다뤄보겠습니다.
Helm Chart - Kubernetes Service Account 설정
Airflow가 앞서 생성한 Kubernetes Service Account(Google Service Account와 매핑된)를 사용해 GCP Service에 접근할 수 있도록 설정합니다.
## CONFIG | Kubernetes ServiceAccount serviceAccount: create : false name: "airflow-ksa" # Terraform 통해 사전 생성된 KSA를 사용
Helm Chart - Airflow Config 설정
Airflow의 image, config, connections, pools, extraPipPackages를 설정합니다.
Airflow의 config 설정은 아래 링크를 참고하시면 됩니다.
각 항목에 대한 추가적인 세부 내용은 아래 예제 코드의 주석으로 설명되어 있습니다.
암호화가 필요한 값은 helm install 시, --set-string으로 값을 할당받도록 구성했습니다.
## CONFIG | Airflow Configs airflow: ## configs for the airflow container image image: # airflow worker에서 gcloud 실행이 필요해, airflow image에 gcloud를 설치해서 사용하는 방식으로 진행 (iamge는 GCP Artifact Registry에 저장) repository : "GCP Artifact Registry Repo 경로" tag : latest pullPolicy: Always pullSecret: "" ## the fernet encryption key (sets `AIRFLOW__CORE__FERNET_KEY`) # 데이터 암호화 및 복호화에 사용되는 보안 키 fernetKey: "사용자지정" # helm install 시, --set-string 으로 값 할당 ## the secret_key for flask (sets `AIRFLOW__WEBSERVER__SECRET_KEY`) # 세션 관리에 사용되는 보안 키 webserverSecretKey: "사용자지정" # helm install 시, --set-string 으로 값 할당 ## environment variables for airflow configs # Airflow 구성 요소별 Config 설정 예제입니다. # https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html# config: # webserver # 슬랙을 통해 DAG 실행 에러를 받을 경우, 해당 BASE_URL을 기반으로 곧바로 Webserver Dashboard로 접근할 수 있도록 설정 AIRFLOW__WEBSERVER__BASE_URL: "Airflow Dashboard 주소" # helm install 시, --set-string 으로 값 할당 # logging # DAG 실행 로그를 Worker 내부에 쌓을 경우, Worker Pod가 재실행되는경우엔 기존 로그들이 삭제 되어 Airflow Dashboard에서 이전 로그들을 확인할 수 없는 문제가 있음 # 따라서, Airflow 로그를 cloud storage에 저장하도록 변경 AIRFLOW__LOGGING__REMOTE_LOGGING : 'true' AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID : "google_cloud_default" # 하단에 생성한 GCP connection 명 AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER : "cloud storage 경로" # helm install 시, --set-string 으로 값 할당 # ETC (모든 파드 컨테이너에 환경변수로 삽입할 경우) KEYCLOAK_CLIENT_AIRFLOW_SECRET : "사용자지정" # helm install 시, --set-string 으로 값 할당 KEYCLOAK_REALM : "사용자지정" # helm install 시, --set-string 으로 값 할당 # GCP connections 연결 예제입니다. connections: - id: google_cloud_default type: google_cloud_platform description : "connection 설명" extra: | { "extra__google_cloud_platform__project": "프로젝트ID", "extra__google_cloud_platform__num_retries": 5 } # pool 생성 예제입니다. pools: - name: "pool" description: "pool 설명" slots: 24 ## extra pip packages to install in airflow Pods # pip package 설치 예제입니다. # 아래와 같은 방식으로 원하는 PIP 패키지를 Worker 파드를 포함한 모든 파드에 설치할 수 있습니다. extraPipPackages: - "sqlalchemy-repr==0.1.0"
위 구성에서, Airflow의 Image는 커스텀 이미지를 사용했습니다.
- Airflow Worker는 gcloud가 설치된 환경이어야 했기 때문에, Airflow Image에 gcloud를 설치한 뒤 Image를 GCP Artifact Registry에서 가져오는 방식으로 구성했습니다.
Airflow Image (Dockerfile)
ARG AIRFLOW_VERSION=2.4.3 ARG PYTHON_VERSION=3.8 FROM apache/airflow:${AIRFLOW_VERSION}-python${PYTHON_VERSION} # gcloud 사용을 위해 google-cloud-sdk 설치 RUN curl https://sdk.cloud.google.com > install.sh && \ bash install.sh --disable-prompts --install-dir="/home/airflow"&& \ echo "source /home/airflow/google-cloud-sdk/completion.bash.inc" >> /home/airflow/.bashrc && \ echo "source /home/airflow/google-cloud-sdk/path.bash.inc" >> /home/airflow/.bashrc && \ source /home/airflow/.bashrc # subprocess 모듈은 /usr/bin에서 gcloud를 실행하고 있음 # 따라서, /usr/bin/gcloud가 /home/airflow/google-cloud-sdk/bin/gcloud를 기반으로 실행되도록 symbolic link 선언 USER root RUN apt-get update && \ apt-get install vim -y && \ ln -s /home/airflow/google-cloud-sdk/bin/gcloud /usr/bin/gcloud
Helm Chart - Airflow DAGs 설정
Airflow Helm Chart는 gitsync 설정을 통해 github DAG repo에서 DAG를 주기적으로 가져오도록 설정할 수 있습니다.
- Kubernetes의 Sidecar Pattern으로, dags-git-sync용 컨테이너가 따로 생겨서 계속 github에 있는 dag파일들을 당겨오는 방식입니다.
gitsync는 http 인증 또는 ssh 인증 방법을 사용해 설정할 수 있습니다.
http 인증 방법
http 인증 방식을 사용하기 위해선, DAG repo 접근 권한을 가진 user의 access token을 발급받은 뒤, Helm Chart에서 해당 토큰을 사용해 접근해야합니다.
access token을 발급받는 절차는 아래와 같습니다.
- github → user profile → settings → Developer settings
- Personal access tokens → Tokens (classic) → Generate new token (classic)
- 토큰명, 토큰 만기 기한, 권한 범위 설정(repo 권한만 있어도 됨) → Generate token → token 값 복사
이후, username과 access token 값을 base64로 인코딩한 뒤, 해당 값을 Kubernetes Secret에 저장하고, Airflow Helm Chart에서 해당 값을 가져오는 방식으로 사용할 수 있습니다.
Kubernetes Extra Manifests 설정 (Kubernetes Secret)
## CONFIG | Kubernetes Extra Manifests extraManifests: - apiVersion: v1 kind: Secret metadata: name: git-credentials data: GIT_SYNC_USERNAME : "사용자지정" # helm install 시, --set-string 으로 값 할당 GIT_SYNC_PASSWORD : "사용자지정" # helm install 시, --set-string 으로 값 할당
Airflow DAGs 설정
## CONFIG | Airflow DAGs dags: gitSync: enabled: true repo: "DAG 레포의 HTTPS 주소" branch : "타겟 브랜치명" # helm install 시, --set-string 으로 값 할당 revision: HEAD ## the name of a pre-created Secret with git http credentials httpSecret : "git-credentials" # 상단에 생성한 Kubernetes Secret 명 ## the key in `dags.gitSync.httpSecret` with your git username httpSecretUsernameKey: GIT_SYNC_USERNAME ## the key in `dags.gitSync.httpSecret` with your git password/token httpSecretPasswordKey: GIT_SYNC_PASSWORD ## the number of seconds between syncs syncWait: 60 ## the max number of seconds allowed for a complete sync syncTimeout: 600
ssh 인증 방법
SSH 인증 방식을 사용하기 위해선, SSH 키를 발급받은 뒤, DAG Repo에 SSH 공개키를 등록하고, Helm Chart에선 SSH 비밀키를 사용해 접근해야합니다.
먼저, SSH 키를 발급 받습니다.
ssh-keygen -t rsa -b 4096 -C "your_email@example.com"
SSH 공개키를 DAG Repo → Settings → Deploy keys에 등록합니다.
- SSH 공개키 위치 : $HOME/.ssh/[your_key_name].pub
SSH 비밀키를 base64로 인코딩합니다.
# SSH 비밀키 위치 : $HOME/*.ssh/[your_key_name]* cat ~/.ssh/id_rsa | base64
base64로 인코딩된 SSH 비밀키 값을 Kubernetes Secret에 저장하고, Airflow Helm Chart에서 해당 값을 가져오는 방식으로 사용할 수 있습니다.
Kubernetes Extra Manifests 설정 (Kubernetes Secret)
## CONFIG | Kubernetes Extra Manifests extraManifests: - apiVersion: v1 kind: Secret metadata: name: airflow-ssh-secret data: base64_encoded_id_rsa : "사용자지정" # helm install 시, --set-string 으로 값 할당
Airflow DAGs 설정
## CONFIG | Airflow DAGs dags: gitSync: enabled: true repo: "DAG 레포의 SSH 주소" branch : "타겟 브랜치명" # helm install 시, --set-string 으로 값 할당 revision: HEAD ## the name of a pre-created Secret with git ssh credentials sshSecret: "airflow-ssh-secret" ## the key in `dags.gitSync.sshSecret` with your ssh-key file sshSecretKey: "base64_encoded_id_rsa" ## the number of seconds between syncs syncWait: 60 ## the max number of seconds allowed for a complete sync syncTimeout: 600
Helm Chart - PgBouncer, Postgres 설정
사용하지 않는 데이터베이스는 enabled:false 처리합니다.
## DATABASE | PgBouncer pgbouncer: enabled: false ## DATABASE | Embedded Postgres postgresql: enabled: true
Helm Chart - Airflow Scheduler 설정
scheduler의 livenessProbe를 설정합니다.
## COMPONENT | Airflow Scheduler scheduler: ## configs for the scheduler Pods' liveness probe livenessProbe: enabled: true initialDelaySeconds: 2100 periodSeconds: 30 timeoutSeconds: 60 failureThreshold: 5
Helm Chart - Airflow Workers 설정
Airflow Worker의 수를 3개로 지정하고, podDisruptionBudget를 사용해 Airflow Workers의 파드가 클러스터의 노드에서 안정적으로 유지되도록 설정합니다. (파드가 임의로 재배치되거나 종료되는 것을 방지합니다.)
## COMPONENT | Airflow Workers workers: ## if the airflow workers StatefulSet should be deployed enabled: true ## the number of worker Pods to run ## - if you set this >1 we recommend defining a `workers.podDisruptionBudget` ## - this is the minimum when `workers.autoscaling.enabled` is true replicas: 3 ## configs for the PodDisruptionBudget of the worker StatefulSet podDisruptionBudget: ## if a PodDisruptionBudget resource is created for the worker StatefulSet enabled: true
Helm Chart - Triggerer, Flower 설정
Triggerer의 livenessProbe를 설정하며, 사용하지 않는 컴포넌트는 enabled:false 처리합니다.
## COMPONENT | Triggerer triggerer: ## configs for the triggerer Pods' liveness probe livenessProbe: enabled: true initialDelaySeconds: 2100 periodSeconds: 30 timeoutSeconds: 60 failureThreshold: 5 ## COMPONENT | Flower flower: enabled: false
Helm Chart - Airflow Webserver 설정
Airflow Webserver에선 3가지 설정을 진행합니다.
- Airflow Webserver에 Keycloak(Single Sign-on 솔루션) 적용
- Airflow Webserver는 Flask App Builder(FAB) 기반으로 동작합니다.
- FAB 관련 설정을 변경(webserver_config.py 수정)하여 OAuth 설정을 진행할 수 있습니다.
- 따라서, 사전 생성되어 있는 Keycloak을 Airflow Webserver에 적용하는 단계가 필요합니다.
- Airflow Service에 networkEndpointGroups Annotation 할당
- Airflow Webserver는, GCLB -> Backend Service -> networkEndpointGroups -> Service -> Webserver 순서로 접근하는 것을 목표로 하고 있습니다.
- 이때, Backend Service가 networkEndpointGroups(neg)를 인식하기 위해선, Service의 Annotation에 neg명을 선언해 주는 단계가 필요합니다.
- readinessProbe, livenessProbe 설정
- Airflow Webserver에 Keycloak(Single Sign-on 솔루션) 적용
세부 설정은 주석을 통해 확인할 수 있습니다.
## COMPONENT | Airflow Webserver web: webserverConfig: ## if the `webserver_config.py` file is mounted ## - set to false if you wish to mount your own `webserver_config.py` file enabled: true ## the full content of the `webserver_config.py` file (as a string) stringOverride: | import os import logging import jwt from flask import redirect, session from flask_appbuilder import expose from flask_appbuilder.security.manager import AUTH_OAUTH from flask_appbuilder.security.views import AuthOAuthView from airflow.www.security import AirflowSecurityManager basedir = os.path.abspath(os.path.dirname(__file__)) # 현재 파일의 절대 경로 log = logging.getLogger(__name__) # 로깅을 위한 로거 객체 생성 AUTH_TYPE = AUTH_OAUTH # 인증 유형 설정 AUTH_USER_REGISTRATION = True # 사용자 등록 허용 여부 설정 AUTH_ROLES_SYNC_AT_LOGIN = True # 로그인 시 Keycloak 역할 변경 사항 동기화 여부 설정 PERMANENT_SESSION_LIFETIME = 86400 # 세션 지속 시간(초) # Airflow 역할과 OAuth 제공자 역할 간의 매핑 정의 AUTH_ROLES_MAPPING = { "viewer": [ "Admin" ] } # OAuth 제공자 정보 설정 PROVIDER_NAME = 'keycloak' CLIENT_ID = 'airflow' CLIENT_SECRET = os.environ.get('KEYCLOAK_CLIENT_AIRFLOW_SECRET') # helm install 시, --set-string 으로 값 할당 KEYCLOAK_REALM = os.environ.get('KEYCLOAK_REALM') # helm install 시, --set-string 으로 값 할당 OIDC_ISSUER = f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}" OAUTH_PROVIDERS = [ { 'name':PROVIDER_NAME, 'icon':'fa-address-card', # Icon for the provider 'token_key':'access_token', # Name of the token in the response of access_token_url 'remote_app': { 'client_id': CLIENT_ID, # Client Id (Identify Airflow application) 'client_secret': CLIENT_SECRET, # Secret for this Client Id (Identify Airflow application) 'api_base_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect", 'client_kwargs': { 'scope': 'openid profile email' # ENL 에서는 실제로 OpenID Connect 를 사용 중. }, 'access_token_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token", 'authorize_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/auth", 'request_token_url': None, 'jwks_uri':f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/certs" } } ] # 로그아웃 시 액세스 토큰을 삭제하는 메서드 class CustomAuthRemoteUserView(AuthOAuthView): @expose("/logout/") def logout(self): """로그아웃 전 액세스 토큰 삭제""" return super().logout() # OAuth에서 사용자 정보를 가져오고 Airflow 역할로 매핑하는 메서드 class CustomSecurityManager(AirflowSecurityManager): authoauthview = CustomAuthRemoteUserView def oauth_user_info(self, provider, response): if provider == PROVIDER_NAME: token = response["access_token"] # OAuth 인증 서버로부터 받은 응답에서 액세스 토큰 추출 me = jwt.decode(token, algorithms="RS256", options={"verify_signature": False}) # 추출한 액세스 토큰을 사용하여 JWT 디코딩 groups = me["resource_access"]["airflow"]["roles"] # JWT에서 추출한 정보 중에서 Airflow 역할만 추출 # resource_access 예제 # { # "resource_access": { "airflow": { "roles": ["viewer", "admin"] }} # } # Airflow에 할당된 역할이 존재하며 "viewer" 역할을 가지고 있을 경우 if (len(groups) >= 1 and "viewer" in groups): # JWT에서 추출한 사용자 정보와 Airflow에 할당된 역할 정보를 userinfo 딕셔너리에 저장 userinfo = { "username": me.get("preferred_username"), "first_name": me.get("given_name"), "last_name": me.get("family_name"), "email": me.get("email"), "role_keys": groups, } return userinfo # 사용자 정보 반환 else: return {} # Airflow는 커스텀 보안 관리자를 사용하여 인증 및 권한 부여를 처리 SECURITY_MANAGER_CLASS = CustomSecurityManager ## configs for the Service of the web Pods service: # Webserver 접근 시, GCLB -> Backend Service -> networkEndpointGroups -> Service -> Webserver 접근이 가능하도록 Service에 networkEndpointGroups을 지정 annotations: "cloud.google.com/neg": '{"exposed_ports": {"8080": {"name": "Airflow networkEndpointGroups으로 지정할 이름"}}}' sessionAffinity: "None" sessionAffinityConfig: {} type: ClusterIP externalPort: 8080 loadBalancerIP: "" loadBalancerSourceRanges: [] nodePort: http: "" ## configs for the web Pods' readiness probe readinessProbe: enabled: false initialDelaySeconds: 2100 periodSeconds: 10 timeoutSeconds: 5 failureThreshold: 6 ## configs for the web Pods' liveness probe livenessProbe: enabled: true initialDelaySeconds: 2100 periodSeconds: 10 timeoutSeconds: 5 failureThreshold: 6
Airflow Helm Chart Install
Airflow Helm Chart Value 설정이 완료되었다면, 이제 해당 Value를 기반으로 Airflow를 설치합니다.
Airflow Helm Chart 내부의 암호화가 필요한 값은 helm install 시, --set-string으로 값을 할당할 수 있습니다.
# helm install --set-string 예제 코드 helm install airflow airflow-stable/airflow -f airflow-stable-values.yaml \ --set-string 'airflow.fernetKey=사용자지정' \ --set-string 'airflow.webserverSecretKey=사용자지정' \ --set-string 'extraManifests[0].data.base64_encoded_id_rsa=사용자지정' \ --set-string 'airflow.config.AIRFLOW__WEBSERVER__BASE_URL=사용자지정' \ --set-string 'airflow.config.AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=사용자지정' \ --set-string 'airflow.config.KEYCLOAK_CLIENT_AIRFLOW_SECRET=사용자지정' \ --set-string 'airflow.config.KEYCLOAK_REALM=사용자지정' \ --set-string 'dags.gitSync.branch=사용자지정'
Google Cloud Load Balancer(GCLB) 백엔드 서비스 및 라우팅 설정
Airflow가 정상적으로 설치되었다면, Airflow Webserver를 GCLB를 통해 접근할 수 있도록 설정합니다.
- Airflow 도메인 주소 및 GCLB는 사전 생성되어 있다고 가정하겠습니다.
사전 생성된 GCLB는 아래 구성요소로 이루어져있습니다. (Terraform 기준)
- 백엔드
- google_compute_http_health_check
- google_compute_backend_service
- 부하 분산기
- google_compute_url_map
- 프론트엔드
- google_compute_global_address
- google_compute_target_https_proxy
- google_compute_global_forwarding_rule
- 백엔드
따라서, 새로운 도메인 주소에 따른 라우팅을 위해 아래 구성 요소를 수정합니다.
- 백엔드
- google_compute_http_health_check
- google_compute_backend_service
- 부하 분산기
- google_compute_url_map
- 백엔드
Airflow 백엔드 서비스 생성
resource "google_compute_health_check" "airflow_backend_service_health_check" { name = "airflow-health-check" description = "사용자지정" check_interval_sec = 30 timeout_sec = 5 log_config { enable = true } tcp_health_check { # Healthcheck 대상이 NetworkEndpointGroup 일 때, 아래 설정으로 하면 neg port 가 그대로 적용됨. port_specification = "USE_SERVING_PORT" } } resource "google_compute_backend_service" "airflow_backend_service" { provider = google-beta name = "airflow-backend-service" description = "사용자지정" protocol = "HTTP" load_balancing_scheme = "EXTERNAL_MANAGED" health_checks = [ google_compute_health_check.airflow_backend_service_health_check.id ] backend { balancing_mode = "RATE" # Possible values are UTILIZATION, RATE, and CONNECTION. But with standalone NEG, only RATE is allowed. max_rate_per_endpoint = 100 # Airflow Service 생성 시 Annotation으로 지정한 Airflow networkEndpointGroups을 대상으로 지정 group = "https://www.googleapis.com/compute/v1/projects/${프로젝트ID}/zones/${가용영역이름}/networkEndpointGroups/${Airflow networkEndpointGroups 이름}" } # Note that health check is not required for Serverless NEG log_config { enable = true sample_rate = 1 } }
Airflow 도메인 주소에 따라 백엔드 서비스로 라우팅되도록 설정합니다.
resource "google_compute_backend_service" "default" { load_balancing_scheme = "EXTERNAL_MANAGED" name = "default-backend" } resource "google_compute_url_map" "default" { name = "사용자지정" description = "사용자지정" default_service = google_compute_backend_service.default.id host_rule { hosts = [ "Airflow 도메인 주소" ] path_matcher = "airflow" } path_matcher { name = "airflow" # 상단에 생성한 Airflow 백엔드 서비스 이름 명시 default_service = "airflow-backend-service" } }
Airflow Pods 생성 결과
openlens로 확인한 Airflow pods 생성 결과입니다.
Airflow Dashboard 접근 확인
Airflow Helm Chart Value 전체 코드
## CONFIG | Airflow Configs
airflow:
## configs for the airflow container image
image:
# airflow worker에서 gcloud 실행이 필요해, airflow image에 gcloud를 설치해서 사용하는 방식으로 진행 (iamge는 GCP Artifact Registry에 저장)
repository : "GCP Artifact Registry Repo 경로"
tag : latest
pullPolicy: Always
pullSecret: ""
## the fernet encryption key (sets `AIRFLOW__CORE__FERNET_KEY`)
# 데이터 암호화 및 복호화에 사용되는 보안 키
fernetKey: "사용자지정" # helm install 시, --set-string 으로 값 할당
## the secret_key for flask (sets `AIRFLOW__WEBSERVER__SECRET_KEY`)
# 세션 관리에 사용되는 보안 키
webserverSecretKey: "사용자지정" # helm install 시, --set-string 으로 값 할당
## environment variables for airflow configs
# Airflow 구성 요소별 Config 설정 예제입니다.
# https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#
config:
# webserver
# 슬랙을 통해 DAG 실행 에러를 받을 경우, 해당 BASE_URL을 기반으로 곧바로 Webserver Dashboard로 접근할 수 있도록 설정
AIRFLOW__WEBSERVER__BASE_URL: "Airflow Dashboard 주소" # helm install 시, --set-string 으로 값 할당
# logging
# DAG 실행 로그를 Worker 내부에 쌓을 경우, Worker Pod가 재실행되는경우엔 기존 로그들이 삭제 되어 Airflow Dashboard에서 이전 로그들을 확인할 수 없는 문제가 있음
# 따라서, Airflow 로그를 cloud storage에 저장하도록 변경
AIRFLOW__LOGGING__REMOTE_LOGGING : 'true'
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID : "google_cloud_default" # 하단에 생성한 GCP connection 명
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER : "cloud storage 경로" # helm install 시, --set-string 으로 값 할당
# ETC (모든 파드 컨테이너에 환경변수로 삽입할 경우)
KEYCLOAK_CLIENT_AIRFLOW_SECRET : "사용자지정" # helm install 시, --set-string 으로 값 할당
KEYCLOAK_REALM : "사용자지정" # helm install 시, --set-string 으로 값 할당
# GCP connections 연결 예제입니다.
connections:
- id: google_cloud_default
type: google_cloud_platform
description : "connection 설명"
extra: |
{
"extra__google_cloud_platform__project": "프로젝트ID",
"extra__google_cloud_platform__num_retries": 5
}
# pool 생성 예제입니다.
pools:
- name: "pool"
description: "pool 설명"
slots: 24
## extra pip packages to install in airflow Pods
# pip package 설치 예제입니다.
# 아래와 같은 방식으로 원하는 PIP 패키지를 Worker 파드를 포함한 모든 파드에 설치할 수 있습니다.
extraPipPackages:
- "sqlalchemy-repr==0.1.0"
## CONFIG | Kubernetes Extra Manifests
extraManifests:
- apiVersion: v1
kind: Secret
metadata:
name: airflow-ssh-secret
data:
base64_encoded_id_rsa : "사용자지정" # helm install 시, --set-string 으로 값 할당
## CONFIG | Airflow DAGs
dags:
gitSync:
enabled: true
repo: "DAG 레포의 SSH 주소"
branch : "타겟 브랜치명" # helm install 시, --set-string 으로 값 할당
revision: HEAD
## the name of a pre-created Secret with git ssh credentials
sshSecret: "airflow-ssh-secret"
## the key in `dags.gitSync.sshSecret` with your ssh-key file
sshSecretKey: "base64_encoded_id_rsa"
## the number of seconds between syncs
syncWait: 60
## the max number of seconds allowed for a complete sync
syncTimeout: 600
## DATABASE | PgBouncer
pgbouncer:
enabled: false
## DATABASE | Embedded Postgres
postgresql:
enabled: true
## COMPONENT | Airflow Scheduler
scheduler:
## configs for the scheduler Pods' liveness probe
livenessProbe:
enabled: true
initialDelaySeconds: 2100
periodSeconds: 30
timeoutSeconds: 60
failureThreshold: 5
## COMPONENT | Airflow Workers
workers:
## if the airflow workers StatefulSet should be deployed
enabled: true
## the number of worker Pods to run
## - if you set this >1 we recommend defining a `workers.podDisruptionBudget`
## - this is the minimum when `workers.autoscaling.enabled` is true
replicas: 3
## configs for the PodDisruptionBudget of the worker StatefulSet
podDisruptionBudget:
## if a PodDisruptionBudget resource is created for the worker StatefulSet
enabled: true
## COMPONENT | Triggerer
triggerer:
## configs for the triggerer Pods' liveness probe
livenessProbe:
enabled: true
initialDelaySeconds: 2100
periodSeconds: 30
timeoutSeconds: 60
failureThreshold: 5
## COMPONENT | Flower
flower:
enabled: false
## COMPONENT | Airflow Webserver
web:
webserverConfig:
## if the `webserver_config.py` file is mounted
## - set to false if you wish to mount your own `webserver_config.py` file
enabled: true
## the full content of the `webserver_config.py` file (as a string)
stringOverride: |
import os
import logging
import jwt
from flask import redirect, session
from flask_appbuilder import expose
from flask_appbuilder.security.manager import AUTH_OAUTH
from flask_appbuilder.security.views import AuthOAuthView
from airflow.www.security import AirflowSecurityManager
basedir = os.path.abspath(os.path.dirname(__file__)) # 현재 파일의 절대 경로
log = logging.getLogger(__name__) # 로깅을 위한 로거 객체 생성
AUTH_TYPE = AUTH_OAUTH # 인증 유형 설정
AUTH_USER_REGISTRATION = True # 사용자 등록 허용 여부 설정
AUTH_ROLES_SYNC_AT_LOGIN = True # 로그인 시 Keycloak 역할 변경 사항 동기화 여부 설정
PERMANENT_SESSION_LIFETIME = 86400 # 세션 지속 시간(초)
# Airflow 역할과 OAuth 제공자 역할 간의 매핑 정의
AUTH_ROLES_MAPPING = {
"viewer": [ "Admin" ]
}
# OAuth 제공자 정보 설정
PROVIDER_NAME = 'keycloak'
CLIENT_ID = 'airflow'
CLIENT_SECRET = os.environ.get('KEYCLOAK_CLIENT_AIRFLOW_SECRET') # helm install 시, --set-string 으로 값 할당
KEYCLOAK_REALM = os.environ.get('KEYCLOAK_REALM') # helm install 시, --set-string 으로 값 할당
OIDC_ISSUER = f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}"
OAUTH_PROVIDERS = [
{
'name':PROVIDER_NAME,
'icon':'fa-address-card', # Icon for the provider
'token_key':'access_token', # Name of the token in the response of access_token_url
'remote_app': {
'client_id': CLIENT_ID, # Client Id (Identify Airflow application)
'client_secret': CLIENT_SECRET, # Secret for this Client Id (Identify Airflow application)
'api_base_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect",
'client_kwargs': {
'scope': 'openid profile email' # ENL 에서는 실제로 OpenID Connect 를 사용 중.
},
'access_token_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token",
'authorize_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/auth",
'request_token_url': None,
'jwks_uri':f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/certs"
}
}
]
# 로그아웃 시 액세스 토큰을 삭제하는 메서드
class CustomAuthRemoteUserView(AuthOAuthView):
@expose("/logout/")
def logout(self):
"""로그아웃 전 액세스 토큰 삭제"""
return super().logout()
# OAuth에서 사용자 정보를 가져오고 Airflow 역할로 매핑하는 메서드
class CustomSecurityManager(AirflowSecurityManager):
authoauthview = CustomAuthRemoteUserView
def oauth_user_info(self, provider, response):
if provider == PROVIDER_NAME:
token = response["access_token"] # OAuth 인증 서버로부터 받은 응답에서 액세스 토큰 추출
me = jwt.decode(token, algorithms="RS256", options={"verify_signature": False}) # 추출한 액세스 토큰을 사용하여 JWT 디코딩
groups = me["resource_access"]["airflow"]["roles"] # JWT에서 추출한 정보 중에서 Airflow 역할만 추출
# resource_access 예제
# {
# "resource_access": { "airflow": { "roles": ["viewer", "admin"] }}
# }
# Airflow에 할당된 역할이 존재하며 "viewer" 역할을 가지고 있을 경우
if (len(groups) >= 1 and "viewer" in groups):
# JWT에서 추출한 사용자 정보와 Airflow에 할당된 역할 정보를 userinfo 딕셔너리에 저장
userinfo = {
"username": me.get("preferred_username"),
"first_name": me.get("given_name"),
"last_name": me.get("family_name"),
"email": me.get("email"),
"role_keys": groups,
}
return userinfo # 사용자 정보 반환
else:
return {}
# Airflow는 커스텀 보안 관리자를 사용하여 인증 및 권한 부여를 처리
SECURITY_MANAGER_CLASS = CustomSecurityManager
## configs for the Service of the web Pods
service:
# Webserver 접근 시, GCLB -> Backend Service -> networkEndpointGroups -> Service -> Webserver 접근이 가능하도록 Service에 networkEndpointGroups을 지정
annotations:
"cloud.google.com/neg": '{"exposed_ports": {"8080": {"name": "Airflow networkEndpointGroups으로 지정할 이름"}}}'
sessionAffinity: "None"
sessionAffinityConfig: {}
type: ClusterIP
externalPort: 8080
loadBalancerIP: ""
loadBalancerSourceRanges: []
nodePort:
http: ""
## configs for the web Pods' readiness probe
readinessProbe:
enabled: false
initialDelaySeconds: 2100
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 6
## configs for the web Pods' liveness probe
livenessProbe:
enabled: true
initialDelaySeconds: 2100
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 6
'DevOps > Helm' 카테고리의 다른 글
[Helm] Helm Chart --set 사용 방법 (0) | 2023.06.16 |
---|---|
[Helm] Helm, Helm Chart란? (1) | 2023.05.24 |