오늘의 인기 글
최근 글
최근 댓글
Today
Total
05-09 00:00
관리 메뉴

우노

[Helm] Helm Chart를 사용해 GKE에 Airflow 설치하기 본문

DevOps/Helm

[Helm] Helm Chart를 사용해 GKE에 Airflow 설치하기

운호(Noah) 2023. 7. 21. 17:54

들어가기 앞서,

  • 데이터 엔지니어가 파이프라인에 집중하기 위해서는 인프라 업무 및 시스템 업무에서 분리되는 것이 가장 좋습니다.
  • 이런 환경을 제공해 주는 것이 Google Cloud의 Cloud Composer입니다.
  • Cloud Composer는 Apache Airflow를 기반으로 하는 완전 관리형 워크플로 조정 서비스입니다.
  • 이러한 완전 관리형 서비스는 운영 부담이 적지만 비용이 많이 발생한다는 특징이 있습니다.
  • 만약, 비용 효율적인 운영을 목표로 하고 있다면 Airflow 직접 운영과 Cloud Composer 사용을 비교함으로써 적절한 방식을 채택하는 게 좋을 수 있습니다.
  • 따라서, 해당 포스팅에선 Airflow 직접 운영을 위해,
  • Airflow Helm Chart를 사용해 Google Kubernetes Engine(GKE)에 Airflow를 설치하고
  • Google Cloud Load Balancer(GCLB)를 통해 Airflow Dashboard에 접근하는 방법에 대해서 다뤄보겠습니다.
  • Google Service Account, GCLB 관련 설정은 Terraform을 통해 진행하며,
  • Airflow 설치를 위한 GKE 생성 및 연결은 사전 구성되어 있다고 가정하겠습니다.

진행 순서

  • Terraform
    • Google Service Account 생성
    • Kubernetes Service Account 생성 및 Google Service Account와 매핑
  • Airflow Helm Chart
    • Airflow Helm Chart 저장소 추가 및 Value 파일 다운로드
    • Value - Config 설정
      • Kubernetes ServiceAccount
      • Airflow Config
      • Airflow DAGs
    • Value - Database 설정
      • PgBouncer, Postgres
    • Value - Component 설정
      • Airflow Scheduler
      • Airflow Workers
      • Triggerer, Flower
      • Airflow Webserver
    • Airflow Helm Chart Install
  • Terraform
    • Google Cloud Load Balancer(GCLB) 백엔드 서비스 및 라우팅 설정

Terraform - Google Service Account 생성

  • Airflow가 Google Service Account를 기반으로 GCP Service에 접근할 수 있도록 Google Service Account 및 IAM을 생성합니다.

      resource "google_service_account" "airflow_sa" {
        account_id = "사용자지정"
        display_name = "사용자지정"
        description = "Airflow에서 사용되는 Service Account입니다."
      }
    
      resource "google_service_account_key" "airflow_sa_key" {
        service_account_id = google_service_account.airflow_sa.name
      }
    
      resource "google_project_iam_member" "project_iam_member" {
        project = "프로젝트ID"
          role = "사용자지정역할"
        member  = "serviceAccount:${google_service_account.airflow_sa.email}"
      }

Terraform - Kubernetes Service Account 생성 및 Google Service Account와 매핑

  • Airflow가 Google Service Account를 기반으로 GCP Service에 접근할 수 있도록, GKE 내부에 Kubernetes Service Account를 생성하고 이를 Google Service Account와 매핑합니다.

      # Google Service Account에 부여된 권한을 이용할 수 있는 Kubernetes Service Account 생성
      resource "kubernetes_service_account" "airflow_ksa" {
        metadata {
          name = "airflow-ksa"
          namespace = "네임스페이스명"
          annotations = {
            "iam.gke.io/gcp-service-account" = google_service_account.airflow_sa.email
          }
        }
      }
    
      # Google Service Account와 Kubernetes Service Account를 매핑
      resource "google_service_account_iam_member" "airflow_ksa_mapping" {
        member             = "serviceAccount:${프로젝트ID}.svc.id.goog[${네임스페이스명}/${kubernetes_service_account.airflow_ksa.metadata[0].name}]"
        role               = "roles/iam.workloadIdentityUser"
        service_account_id = google_service_account.airflow_sa.id
      }

Helm Chart - Airflow Helm Chart 저장소 추가 및 Value 파일 다운로드

  • Airflow 설치를 위한 GKE 생성 및 연결은 사전 구성되어 있다고 가정했으므로,

  • 이제 GKE에 Airflow Helm Chart를 설치하기 위해 Airflow Helm Chart 저장소를 추가하고 Value 파일을 다운로드합니다.

  • Airflow Helm Chart 저장소 추가

      helm repo add airflow-stable https://airflow-helm.github.io/charts
  • Airflow Helm Chart Value 파일 다운로드

      helm show values airflow-stable/airflow > airflow-stable-values.yaml
  • Airflow Helm Chart Skeleton Code

  • 해당 포스팅에선, Airflow Helm Chart Value 파일의 overwrite 부분만 다뤄보겠습니다.

Helm Chart - Kubernetes Service Account 설정

  • Airflow가 앞서 생성한 Kubernetes Service Account(Google Service Account와 매핑된)를 사용해 GCP Service에 접근할 수 있도록 설정합니다.

      ## CONFIG | Kubernetes ServiceAccount
      serviceAccount:
        create : false
        name: "airflow-ksa" # Terraform 통해 사전 생성된 KSA를 사용

Helm Chart - Airflow Config 설정

  • Airflow의 image, config, connections, pools, extraPipPackages를 설정합니다.

  • Airflow의 config 설정은 아래 링크를 참고하시면 됩니다.

  • 각 항목에 대한 추가적인 세부 내용은 아래 예제 코드의 주석으로 설명되어 있습니다.

  • 암호화가 필요한 값은 helm install 시, --set-string으로 값을 할당받도록 구성했습니다.

      ## CONFIG | Airflow Configs
      airflow:
    
        ## configs for the airflow container image
        image:
          # airflow worker에서 gcloud 실행이 필요해, airflow image에 gcloud를 설치해서 사용하는 방식으로 진행 (iamge는 GCP Artifact Registry에 저장)
          repository : "GCP Artifact Registry Repo 경로"
          tag : latest
          pullPolicy: Always
          pullSecret: ""
    
        ## the fernet encryption key (sets `AIRFLOW__CORE__FERNET_KEY`)
        # 데이터 암호화 및 복호화에 사용되는 보안 키
        fernetKey: "사용자지정"                                  # helm install 시, --set-string 으로 값 할당
    
        ## the secret_key for flask (sets `AIRFLOW__WEBSERVER__SECRET_KEY`)
        # 세션 관리에 사용되는 보안 키
        webserverSecretKey: "사용자지정"                         # helm install 시, --set-string 으로 값 할당
    
        ## environment variables for airflow configs
        # Airflow 구성 요소별 Config 설정 예제입니다.
        # https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#
        config:
    
          # webserver
          # 슬랙을 통해 DAG 실행 에러를 받을 경우, 해당 BASE_URL을 기반으로 곧바로 Webserver Dashboard로 접근할 수 있도록 설정
          AIRFLOW__WEBSERVER__BASE_URL: "Airflow Dashboard 주소"             # helm install 시, --set-string 으로 값 할당
    
          # logging
          # DAG 실행 로그를 Worker 내부에 쌓을 경우, Worker Pod가 재실행되는경우엔 기존 로그들이 삭제 되어 Airflow Dashboard에서 이전 로그들을 확인할 수 없는 문제가 있음
          # 따라서, Airflow 로그를 cloud storage에 저장하도록 변경
          AIRFLOW__LOGGING__REMOTE_LOGGING : 'true'
          AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID : "google_cloud_default"          # 하단에 생성한 GCP connection 명
          AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER : "cloud storage 경로"         # helm install 시, --set-string 으로 값 할당
    
          # ETC (모든 파드 컨테이너에 환경변수로 삽입할 경우)
          KEYCLOAK_CLIENT_AIRFLOW_SECRET : "사용자지정"          # helm install 시, --set-string 으로 값 할당
          KEYCLOAK_REALM : "사용자지정"                          # helm install 시, --set-string 으로 값 할당
    
        # GCP connections 연결 예제입니다.
        connections:
          - id: google_cloud_default
            type: google_cloud_platform
            description : "connection 설명"
            extra: |
              {
                "extra__google_cloud_platform__project": "프로젝트ID",
                "extra__google_cloud_platform__num_retries": 5
              }
    
        # pool 생성 예제입니다.
        pools:
          - name: "pool"
            description: "pool 설명"
            slots: 24
    
        ## extra pip packages to install in airflow Pods 
        # pip package 설치 예제입니다. 
        # 아래와 같은 방식으로 원하는 PIP 패키지를 Worker 파드를 포함한 모든 파드에 설치할 수 있습니다.
        extraPipPackages:
          - "sqlalchemy-repr==0.1.0"
  • 위 구성에서, Airflow의 Image는 커스텀 이미지를 사용했습니다.

    • Airflow Worker는 gcloud가 설치된 환경이어야 했기 때문에, Airflow Image에 gcloud를 설치한 뒤 Image를 GCP Artifact Registry에서 가져오는 방식으로 구성했습니다.
  • Airflow Image (Dockerfile)

      ARG AIRFLOW_VERSION=2.4.3
      ARG PYTHON_VERSION=3.8
    
      FROM apache/airflow:${AIRFLOW_VERSION}-python${PYTHON_VERSION}
    
      # gcloud 사용을 위해 google-cloud-sdk 설치
      RUN curl https://sdk.cloud.google.com > install.sh && \
          bash install.sh --disable-prompts --install-dir="/home/airflow"&& \
          echo "source /home/airflow/google-cloud-sdk/completion.bash.inc" >> /home/airflow/.bashrc && \
          echo "source /home/airflow/google-cloud-sdk/path.bash.inc" >> /home/airflow/.bashrc && \
          source /home/airflow/.bashrc
    
      # subprocess 모듈은 /usr/bin에서 gcloud를 실행하고 있음
      # 따라서, /usr/bin/gcloud가 /home/airflow/google-cloud-sdk/bin/gcloud를 기반으로 실행되도록 symbolic link 선언
      USER root
      RUN apt-get update && \
          apt-get install vim -y && \
          ln -s /home/airflow/google-cloud-sdk/bin/gcloud /usr/bin/gcloud

Helm Chart - Airflow DAGs 설정

  • Airflow Helm Chart는 gitsync 설정을 통해 github DAG repo에서 DAG를 주기적으로 가져오도록 설정할 수 있습니다.

    • Kubernetes의 Sidecar Pattern으로, dags-git-sync용 컨테이너가 따로 생겨서 계속 github에 있는 dag파일들을 당겨오는 방식입니다.
  • gitsync는 http 인증 또는 ssh 인증 방법을 사용해 설정할 수 있습니다.

  • http 인증 방법

    • http 인증 방식을 사용하기 위해선, DAG repo 접근 권한을 가진 user의 access token을 발급받은 뒤, Helm Chart에서 해당 토큰을 사용해 접근해야합니다.

    • access token을 발급받는 절차는 아래와 같습니다.

      • github → user profile → settings → Developer settings
      • Personal access tokens → Tokens (classic) → Generate new token (classic)
      • 토큰명, 토큰 만기 기한, 권한 범위 설정(repo 권한만 있어도 됨) → Generate token → token 값 복사
    • 이후, username과 access token 값을 base64로 인코딩한 뒤, 해당 값을 Kubernetes Secret에 저장하고, Airflow Helm Chart에서 해당 값을 가져오는 방식으로 사용할 수 있습니다.

    • Kubernetes Extra Manifests 설정 (Kubernetes Secret)

        ## CONFIG | Kubernetes Extra Manifests
        extraManifests:
          - apiVersion: v1
            kind: Secret
            metadata:
              name: git-credentials
            data:
              GIT_SYNC_USERNAME : "사용자지정" # helm install 시, --set-string 으로 값 할당
              GIT_SYNC_PASSWORD : "사용자지정" # helm install 시, --set-string 으로 값 할당
    • Airflow DAGs 설정

        ## CONFIG | Airflow DAGs
        dags:
          gitSync:
            enabled: true
            repo: "DAG 레포의 HTTPS 주소"
            branch : "타겟 브랜치명"              # helm install 시, --set-string 으로 값 할당
            revision: HEAD
      
            ## the name of a pre-created Secret with git http credentials
            httpSecret : "git-credentials"     # 상단에 생성한 Kubernetes Secret 명
      
            ## the key in `dags.gitSync.httpSecret` with your git username
            httpSecretUsernameKey: GIT_SYNC_USERNAME
      
            ## the key in `dags.gitSync.httpSecret` with your git password/token
            httpSecretPasswordKey: GIT_SYNC_PASSWORD
      
            ## the number of seconds between syncs
            syncWait: 60
      
            ## the max number of seconds allowed for a complete sync
            syncTimeout: 600
  • ssh 인증 방법

    • SSH 인증 방식을 사용하기 위해선, SSH 키를 발급받은 뒤, DAG Repo에 SSH 공개키를 등록하고, Helm Chart에선 SSH 비밀키를 사용해 접근해야합니다.

    • 먼저, SSH 키를 발급 받습니다.

        ssh-keygen -t rsa -b 4096 -C "your_email@example.com"
    • SSH 공개키를 DAG Repo → Settings → Deploy keys에 등록합니다.

      • SSH 공개키 위치 : $HOME/.ssh/[your_key_name].pub
    • SSH 비밀키를 base64로 인코딩합니다.

        # SSH 비밀키 위치 : $HOME/*.ssh/[your_key_name]*
        cat ~/.ssh/id_rsa | base64
    • base64로 인코딩된 SSH 비밀키 값을 Kubernetes Secret에 저장하고, Airflow Helm Chart에서 해당 값을 가져오는 방식으로 사용할 수 있습니다.

    • Kubernetes Extra Manifests 설정 (Kubernetes Secret)

        ## CONFIG | Kubernetes Extra Manifests
        extraManifests:
          - apiVersion: v1
            kind: Secret
            metadata:
              name: airflow-ssh-secret
            data:
              base64_encoded_id_rsa : "사용자지정" # helm install 시, --set-string 으로 값 할당
    • Airflow DAGs 설정

        ## CONFIG | Airflow DAGs
        dags:
          gitSync:
            enabled: true
            repo: "DAG 레포의 SSH 주소"
            branch : "타겟 브랜치명"              # helm install 시, --set-string 으로 값 할당
            revision: HEAD
      
                ## the name of a pre-created Secret with git ssh credentials
            sshSecret: "airflow-ssh-secret"
      
                ## the key in `dags.gitSync.sshSecret` with your ssh-key file 
            sshSecretKey: "base64_encoded_id_rsa"
      
            ## the number of seconds between syncs
            syncWait: 60
      
            ## the max number of seconds allowed for a complete sync
            syncTimeout: 600

Helm Chart - PgBouncer, Postgres 설정

  • 사용하지 않는 데이터베이스는 enabled:false 처리합니다.

      ## DATABASE | PgBouncer
      pgbouncer:
        enabled: false
    
      ## DATABASE | Embedded Postgres
      postgresql:
        enabled: true

Helm Chart - Airflow Scheduler 설정

  • scheduler의 livenessProbe를 설정합니다.

      ## COMPONENT | Airflow Scheduler
      scheduler:
        ## configs for the scheduler Pods' liveness probe
        livenessProbe:
          enabled: true
          initialDelaySeconds: 2100
          periodSeconds: 30
          timeoutSeconds: 60
          failureThreshold: 5

Helm Chart - Airflow Workers 설정

  • Airflow Worker의 수를 3개로 지정하고, podDisruptionBudget를 사용해 Airflow Workers의 파드가 클러스터의 노드에서 안정적으로 유지되도록 설정합니다. (파드가 임의로 재배치되거나 종료되는 것을 방지합니다.)

      ## COMPONENT | Airflow Workers
      workers:
        ## if the airflow workers StatefulSet should be deployed
        enabled: true
    
        ## the number of worker Pods to run
        ## - if you set this >1 we recommend defining a `workers.podDisruptionBudget`
        ## - this is the minimum when `workers.autoscaling.enabled` is true
        replicas: 3
    
        ## configs for the PodDisruptionBudget of the worker StatefulSet
        podDisruptionBudget:
          ## if a PodDisruptionBudget resource is created for the worker StatefulSet
          enabled: true

Helm Chart - Triggerer, Flower 설정

  • Triggerer의 livenessProbe를 설정하며, 사용하지 않는 컴포넌트는 enabled:false 처리합니다.

      ## COMPONENT | Triggerer
      triggerer:
        ## configs for the triggerer Pods' liveness probe
        livenessProbe:
          enabled: true
          initialDelaySeconds: 2100
          periodSeconds: 30
          timeoutSeconds: 60
          failureThreshold: 5
    
      ## COMPONENT | Flower
      flower:
        enabled: false

Helm Chart - Airflow Webserver 설정

  • Airflow Webserver에선 3가지 설정을 진행합니다.

    • Airflow Webserver에 Keycloak(Single Sign-on 솔루션) 적용
      • Airflow Webserver는 Flask App Builder(FAB) 기반으로 동작합니다.
      • FAB 관련 설정을 변경(webserver_config.py 수정)하여 OAuth 설정을 진행할 수 있습니다.
      • 따라서, 사전 생성되어 있는 Keycloak을 Airflow Webserver에 적용하는 단계가 필요합니다.
    • Airflow Service에 networkEndpointGroups Annotation 할당
      • Airflow Webserver는, GCLB -> Backend Service -> networkEndpointGroups -> Service -> Webserver 순서로 접근하는 것을 목표로 하고 있습니다.
      • 이때, Backend Service가 networkEndpointGroups(neg)를 인식하기 위해선, Service의 Annotation에 neg명을 선언해 주는 단계가 필요합니다.
    • readinessProbe, livenessProbe 설정
  • 세부 설정은 주석을 통해 확인할 수 있습니다.

      ## COMPONENT | Airflow Webserver
      web:
    
        webserverConfig:
          ## if the `webserver_config.py` file is mounted
          ## - set to false if you wish to mount your own `webserver_config.py` file
          enabled: true
    
          ## the full content of the `webserver_config.py` file (as a string)
          stringOverride: |
    
            import os
            import logging
            import jwt
            from flask import redirect, session
            from flask_appbuilder import expose
            from flask_appbuilder.security.manager import AUTH_OAUTH
            from flask_appbuilder.security.views import AuthOAuthView
            from airflow.www.security import AirflowSecurityManager
    
            basedir = os.path.abspath(os.path.dirname(__file__))  # 현재 파일의 절대 경로
            log = logging.getLogger(__name__)                     # 로깅을 위한 로거 객체 생성
            AUTH_TYPE = AUTH_OAUTH                                # 인증 유형 설정
            AUTH_USER_REGISTRATION = True                         # 사용자 등록 허용 여부 설정
            AUTH_ROLES_SYNC_AT_LOGIN = True                       # 로그인 시 Keycloak 역할 변경 사항 동기화 여부 설정
            PERMANENT_SESSION_LIFETIME = 86400                     # 세션 지속 시간(초)
    
            # Airflow 역할과 OAuth 제공자 역할 간의 매핑 정의
            AUTH_ROLES_MAPPING = {
              "viewer": [ "Admin" ]
            }
    
            # OAuth 제공자 정보 설정
            PROVIDER_NAME = 'keycloak'
            CLIENT_ID = 'airflow'
            CLIENT_SECRET = os.environ.get('KEYCLOAK_CLIENT_AIRFLOW_SECRET')  # helm install 시, --set-string 으로 값 할당
            KEYCLOAK_REALM = os.environ.get('KEYCLOAK_REALM')                 # helm install 시, --set-string 으로 값 할당
            OIDC_ISSUER = f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}"
    
            OAUTH_PROVIDERS = [
              {
                'name':PROVIDER_NAME,
                'icon':'fa-address-card',             # Icon for the provider
                'token_key':'access_token',           # Name of the token in the response of access_token_url
                'remote_app': {
                  'client_id': CLIENT_ID,             # Client Id (Identify Airflow application)
                  'client_secret': CLIENT_SECRET,     # Secret for this Client Id (Identify Airflow application)
                  'api_base_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect",
                  'client_kwargs': {
                    'scope': 'openid profile email'   # ENL 에서는 실제로 OpenID Connect 를 사용 중.
                  },
                  'access_token_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token",
                  'authorize_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/auth",
                  'request_token_url': None,
                  'jwks_uri':f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/certs"
                }
              }
            ]
    
            # 로그아웃 시 액세스 토큰을 삭제하는 메서드
            class CustomAuthRemoteUserView(AuthOAuthView):
              @expose("/logout/")
              def logout(self):
                """로그아웃 전 액세스 토큰 삭제"""
                return super().logout()
    
            # OAuth에서 사용자 정보를 가져오고 Airflow 역할로 매핑하는 메서드
            class CustomSecurityManager(AirflowSecurityManager):
    
              authoauthview = CustomAuthRemoteUserView
    
              def oauth_user_info(self, provider, response):
                if provider == PROVIDER_NAME:
                  token = response["access_token"]                                                        # OAuth 인증 서버로부터 받은 응답에서 액세스 토큰 추출
                  me = jwt.decode(token, algorithms="RS256", options={"verify_signature": False})         # 추출한 액세스 토큰을 사용하여 JWT 디코딩
                  groups = me["resource_access"]["airflow"]["roles"]                                      # JWT에서 추출한 정보 중에서 Airflow 역할만 추출
    
                  # resource_access 예제
                  # {
                  #   "resource_access": { "airflow": { "roles": ["viewer", "admin"] }}
                  # }
    
                  # Airflow에 할당된 역할이 존재하며 "viewer" 역할을 가지고 있을 경우
                  if (len(groups) >= 1 and "viewer" in groups):
    
                    # JWT에서 추출한 사용자 정보와 Airflow에 할당된 역할 정보를 userinfo 딕셔너리에 저장
                    userinfo = {
                      "username": me.get("preferred_username"),
                      "first_name": me.get("given_name"),
                      "last_name": me.get("family_name"),
                      "email": me.get("email"),
                      "role_keys": groups,
                    }
                    return userinfo # 사용자 정보 반환
                else:
                  return {}
    
            # Airflow는 커스텀 보안 관리자를 사용하여 인증 및 권한 부여를 처리
            SECURITY_MANAGER_CLASS = CustomSecurityManager
    
        ## configs for the Service of the web Pods
        service:
          # Webserver 접근 시, GCLB -> Backend Service -> networkEndpointGroups -> Service -> Webserver 접근이 가능하도록 Service에 networkEndpointGroups을 지정
          annotations:
            "cloud.google.com/neg": '{"exposed_ports": {"8080": {"name": "Airflow networkEndpointGroups으로 지정할 이름"}}}'
          sessionAffinity: "None"
          sessionAffinityConfig: {}
          type: ClusterIP
          externalPort: 8080
          loadBalancerIP: ""
          loadBalancerSourceRanges: []
          nodePort:
            http: ""
    
        ## configs for the web Pods' readiness probe
        readinessProbe:
          enabled: false
          initialDelaySeconds: 2100
          periodSeconds: 10
          timeoutSeconds: 5
          failureThreshold: 6
    
        ## configs for the web Pods' liveness probe
        livenessProbe:
          enabled: true
          initialDelaySeconds: 2100
          periodSeconds: 10
          timeoutSeconds: 5
          failureThreshold: 6

Airflow Helm Chart Install

  • Airflow Helm Chart Value 설정이 완료되었다면, 이제 해당 Value를 기반으로 Airflow를 설치합니다.

  • Airflow Helm Chart 내부의 암호화가 필요한 값은 helm install 시, --set-string으로 값을 할당할 수 있습니다.

      # helm install --set-string 예제 코드
      helm install airflow airflow-stable/airflow -f airflow-stable-values.yaml \
      --set-string 'airflow.fernetKey=사용자지정' \
      --set-string 'airflow.webserverSecretKey=사용자지정' \
      --set-string 'extraManifests[0].data.base64_encoded_id_rsa=사용자지정' \
      --set-string 'airflow.config.AIRFLOW__WEBSERVER__BASE_URL=사용자지정' \
      --set-string 'airflow.config.AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=사용자지정' \
      --set-string 'airflow.config.KEYCLOAK_CLIENT_AIRFLOW_SECRET=사용자지정' \
      --set-string 'airflow.config.KEYCLOAK_REALM=사용자지정' \
      --set-string 'dags.gitSync.branch=사용자지정'

Google Cloud Load Balancer(GCLB) 백엔드 서비스 및 라우팅 설정

  • Airflow가 정상적으로 설치되었다면, Airflow Webserver를 GCLB를 통해 접근할 수 있도록 설정합니다.

    • Airflow 도메인 주소 및 GCLB는 사전 생성되어 있다고 가정하겠습니다.
  • 사전 생성된 GCLB는 아래 구성요소로 이루어져있습니다. (Terraform 기준)

    • 백엔드
      • google_compute_http_health_check
      • google_compute_backend_service
    • 부하 분산기
      • google_compute_url_map
    • 프론트엔드
      • google_compute_global_address
      • google_compute_target_https_proxy
      • google_compute_global_forwarding_rule
  • 따라서, 새로운 도메인 주소에 따른 라우팅을 위해 아래 구성 요소를 수정합니다.

    • 백엔드
      • google_compute_http_health_check
      • google_compute_backend_service
    • 부하 분산기
      • google_compute_url_map
  • Airflow 백엔드 서비스 생성

      resource "google_compute_health_check" "airflow_backend_service_health_check" {
        name = "airflow-health-check"
        description = "사용자지정"
    
        check_interval_sec = 30
        timeout_sec = 5
    
        log_config {
          enable = true
        }
    
        tcp_health_check {
          # Healthcheck 대상이 NetworkEndpointGroup 일 때, 아래 설정으로 하면 neg port 가 그대로 적용됨.
          port_specification = "USE_SERVING_PORT"
        }
      }
    
      resource "google_compute_backend_service" "airflow_backend_service" {
        provider = google-beta
    
        name = "airflow-backend-service"
        description = "사용자지정"
    
        protocol = "HTTP"
        load_balancing_scheme = "EXTERNAL_MANAGED"
    
        health_checks = [
          google_compute_health_check.airflow_backend_service_health_check.id
        ]
    
        backend {
          balancing_mode = "RATE" # Possible values are UTILIZATION, RATE, and CONNECTION. But with standalone NEG, only RATE is allowed.
          max_rate_per_endpoint = 100
    
              # Airflow Service 생성 시 Annotation으로 지정한 Airflow networkEndpointGroups을 대상으로 지정
          group = "https://www.googleapis.com/compute/v1/projects/${프로젝트ID}/zones/${가용영역이름}/networkEndpointGroups/${Airflow networkEndpointGroups 이름}"
        }
    
        # Note that health check is not required for Serverless NEG
        log_config {
          enable = true
          sample_rate = 1
        }
    
      }
  • Airflow 도메인 주소에 따라 백엔드 서비스로 라우팅되도록 설정합니다.

      resource "google_compute_backend_service" "default" {
        load_balancing_scheme = "EXTERNAL_MANAGED"
        name = "default-backend"
      }
    
      resource "google_compute_url_map" "default" {
        name = "사용자지정"
        description = "사용자지정"
    
        default_service = google_compute_backend_service.default.id
    
        host_rule {
          hosts = [
            "Airflow 도메인 주소"
          ]
          path_matcher = "airflow"
        }
    
        path_matcher {
          name            = "airflow"
              # 상단에 생성한 Airflow 백엔드 서비스 이름 명시
          default_service = "airflow-backend-service"
        }
    
      }

Airflow Pods 생성 결과

  • openlens로 확인한 Airflow pods 생성 결과입니다.


Airflow Dashboard 접근 확인


Airflow Helm Chart Value 전체 코드

## CONFIG | Airflow Configs
airflow:

  ## configs for the airflow container image
  image:
    # airflow worker에서 gcloud 실행이 필요해, airflow image에 gcloud를 설치해서 사용하는 방식으로 진행 (iamge는 GCP Artifact Registry에 저장)
    repository : "GCP Artifact Registry Repo 경로"
    tag : latest
    pullPolicy: Always
    pullSecret: ""

  ## the fernet encryption key (sets `AIRFLOW__CORE__FERNET_KEY`)
  # 데이터 암호화 및 복호화에 사용되는 보안 키
  fernetKey: "사용자지정"                                  # helm install 시, --set-string 으로 값 할당

  ## the secret_key for flask (sets `AIRFLOW__WEBSERVER__SECRET_KEY`)
  # 세션 관리에 사용되는 보안 키
  webserverSecretKey: "사용자지정"                         # helm install 시, --set-string 으로 값 할당

  ## environment variables for airflow configs
  # Airflow 구성 요소별 Config 설정 예제입니다.
  # https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#
  config:

    # webserver
    # 슬랙을 통해 DAG 실행 에러를 받을 경우, 해당 BASE_URL을 기반으로 곧바로 Webserver Dashboard로 접근할 수 있도록 설정
    AIRFLOW__WEBSERVER__BASE_URL: "Airflow Dashboard 주소"             # helm install 시, --set-string 으로 값 할당

    # logging
    # DAG 실행 로그를 Worker 내부에 쌓을 경우, Worker Pod가 재실행되는경우엔 기존 로그들이 삭제 되어 Airflow Dashboard에서 이전 로그들을 확인할 수 없는 문제가 있음
    # 따라서, Airflow 로그를 cloud storage에 저장하도록 변경
    AIRFLOW__LOGGING__REMOTE_LOGGING : 'true'
    AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID : "google_cloud_default"          # 하단에 생성한 GCP connection 명
    AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER : "cloud storage 경로"         # helm install 시, --set-string 으로 값 할당

    # ETC (모든 파드 컨테이너에 환경변수로 삽입할 경우)
    KEYCLOAK_CLIENT_AIRFLOW_SECRET : "사용자지정"          # helm install 시, --set-string 으로 값 할당
    KEYCLOAK_REALM : "사용자지정"                          # helm install 시, --set-string 으로 값 할당

  # GCP connections 연결 예제입니다.
  connections:
    - id: google_cloud_default
      type: google_cloud_platform
      description : "connection 설명"
      extra: |
        {
          "extra__google_cloud_platform__project": "프로젝트ID",
          "extra__google_cloud_platform__num_retries": 5
        }

  # pool 생성 예제입니다.
  pools:
    - name: "pool"
      description: "pool 설명"
      slots: 24

  ## extra pip packages to install in airflow Pods 
  # pip package 설치 예제입니다. 
  # 아래와 같은 방식으로 원하는 PIP 패키지를 Worker 파드를 포함한 모든 파드에 설치할 수 있습니다.
  extraPipPackages:
    - "sqlalchemy-repr==0.1.0"

## CONFIG | Kubernetes Extra Manifests
extraManifests:
  - apiVersion: v1
    kind: Secret
    metadata:
      name: airflow-ssh-secret
    data:
      base64_encoded_id_rsa : "사용자지정" # helm install 시, --set-string 으로 값 할당

## CONFIG | Airflow DAGs
dags:
  gitSync:
    enabled: true
    repo: "DAG 레포의 SSH 주소"
    branch : "타겟 브랜치명"              # helm install 시, --set-string 으로 값 할당
    revision: HEAD

        ## the name of a pre-created Secret with git ssh credentials
    sshSecret: "airflow-ssh-secret"

        ## the key in `dags.gitSync.sshSecret` with your ssh-key file 
    sshSecretKey: "base64_encoded_id_rsa"

    ## the number of seconds between syncs
    syncWait: 60

    ## the max number of seconds allowed for a complete sync
    syncTimeout: 600

## DATABASE | PgBouncer
pgbouncer:
  enabled: false

## DATABASE | Embedded Postgres
postgresql:
  enabled: true

## COMPONENT | Airflow Scheduler
scheduler:
  ## configs for the scheduler Pods' liveness probe
  livenessProbe:
    enabled: true
    initialDelaySeconds: 2100
    periodSeconds: 30
    timeoutSeconds: 60
    failureThreshold: 5

## COMPONENT | Airflow Workers
workers:
  ## if the airflow workers StatefulSet should be deployed
  enabled: true

  ## the number of worker Pods to run
  ## - if you set this >1 we recommend defining a `workers.podDisruptionBudget`
  ## - this is the minimum when `workers.autoscaling.enabled` is true
  replicas: 3

  ## configs for the PodDisruptionBudget of the worker StatefulSet
  podDisruptionBudget:
    ## if a PodDisruptionBudget resource is created for the worker StatefulSet
    enabled: true

## COMPONENT | Triggerer
triggerer:
  ## configs for the triggerer Pods' liveness probe
  livenessProbe:
    enabled: true
    initialDelaySeconds: 2100
    periodSeconds: 30
    timeoutSeconds: 60
    failureThreshold: 5

## COMPONENT | Flower
flower:
  enabled: false

## COMPONENT | Airflow Webserver
web:

  webserverConfig:
    ## if the `webserver_config.py` file is mounted
    ## - set to false if you wish to mount your own `webserver_config.py` file
    enabled: true

    ## the full content of the `webserver_config.py` file (as a string)
    stringOverride: |

      import os
      import logging
      import jwt
      from flask import redirect, session
      from flask_appbuilder import expose
      from flask_appbuilder.security.manager import AUTH_OAUTH
      from flask_appbuilder.security.views import AuthOAuthView
      from airflow.www.security import AirflowSecurityManager

      basedir = os.path.abspath(os.path.dirname(__file__))  # 현재 파일의 절대 경로
      log = logging.getLogger(__name__)                     # 로깅을 위한 로거 객체 생성
      AUTH_TYPE = AUTH_OAUTH                                # 인증 유형 설정
      AUTH_USER_REGISTRATION = True                         # 사용자 등록 허용 여부 설정
      AUTH_ROLES_SYNC_AT_LOGIN = True                       # 로그인 시 Keycloak 역할 변경 사항 동기화 여부 설정
      PERMANENT_SESSION_LIFETIME = 86400                     # 세션 지속 시간(초)

      # Airflow 역할과 OAuth 제공자 역할 간의 매핑 정의
      AUTH_ROLES_MAPPING = {
        "viewer": [ "Admin" ]
      }

      # OAuth 제공자 정보 설정
      PROVIDER_NAME = 'keycloak'
      CLIENT_ID = 'airflow'
      CLIENT_SECRET = os.environ.get('KEYCLOAK_CLIENT_AIRFLOW_SECRET')  # helm install 시, --set-string 으로 값 할당
      KEYCLOAK_REALM = os.environ.get('KEYCLOAK_REALM')                 # helm install 시, --set-string 으로 값 할당
      OIDC_ISSUER = f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}"

      OAUTH_PROVIDERS = [
        {
          'name':PROVIDER_NAME,
          'icon':'fa-address-card',             # Icon for the provider
          'token_key':'access_token',           # Name of the token in the response of access_token_url
          'remote_app': {
            'client_id': CLIENT_ID,             # Client Id (Identify Airflow application)
            'client_secret': CLIENT_SECRET,     # Secret for this Client Id (Identify Airflow application)
            'api_base_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect",
            'client_kwargs': {
              'scope': 'openid profile email'   # ENL 에서는 실제로 OpenID Connect 를 사용 중.
            },
            'access_token_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token",
            'authorize_url': f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/auth",
            'request_token_url': None,
            'jwks_uri':f"https://{KEYCLOAK도메인주소}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/certs"
          }
        }
      ]

      # 로그아웃 시 액세스 토큰을 삭제하는 메서드
      class CustomAuthRemoteUserView(AuthOAuthView):
        @expose("/logout/")
        def logout(self):
          """로그아웃 전 액세스 토큰 삭제"""
          return super().logout()

      # OAuth에서 사용자 정보를 가져오고 Airflow 역할로 매핑하는 메서드
      class CustomSecurityManager(AirflowSecurityManager):

        authoauthview = CustomAuthRemoteUserView

        def oauth_user_info(self, provider, response):
          if provider == PROVIDER_NAME:
            token = response["access_token"]                                                        # OAuth 인증 서버로부터 받은 응답에서 액세스 토큰 추출
            me = jwt.decode(token, algorithms="RS256", options={"verify_signature": False})         # 추출한 액세스 토큰을 사용하여 JWT 디코딩
            groups = me["resource_access"]["airflow"]["roles"]                                      # JWT에서 추출한 정보 중에서 Airflow 역할만 추출

            # resource_access 예제
            # {
            #   "resource_access": { "airflow": { "roles": ["viewer", "admin"] }}
            # }

            # Airflow에 할당된 역할이 존재하며 "viewer" 역할을 가지고 있을 경우
            if (len(groups) >= 1 and "viewer" in groups):

              # JWT에서 추출한 사용자 정보와 Airflow에 할당된 역할 정보를 userinfo 딕셔너리에 저장
              userinfo = {
                "username": me.get("preferred_username"),
                "first_name": me.get("given_name"),
                "last_name": me.get("family_name"),
                "email": me.get("email"),
                "role_keys": groups,
              }
              return userinfo # 사용자 정보 반환
          else:
            return {}

      # Airflow는 커스텀 보안 관리자를 사용하여 인증 및 권한 부여를 처리
      SECURITY_MANAGER_CLASS = CustomSecurityManager

  ## configs for the Service of the web Pods
  service:
    # Webserver 접근 시, GCLB -> Backend Service -> networkEndpointGroups -> Service -> Webserver 접근이 가능하도록 Service에 networkEndpointGroups을 지정
    annotations:
      "cloud.google.com/neg": '{"exposed_ports": {"8080": {"name": "Airflow networkEndpointGroups으로 지정할 이름"}}}'
    sessionAffinity: "None"
    sessionAffinityConfig: {}
    type: ClusterIP
    externalPort: 8080
    loadBalancerIP: ""
    loadBalancerSourceRanges: []
    nodePort:
      http: ""

  ## configs for the web Pods' readiness probe
  readinessProbe:
    enabled: false
    initialDelaySeconds: 2100
    periodSeconds: 10
    timeoutSeconds: 5
    failureThreshold: 6

  ## configs for the web Pods' liveness probe
  livenessProbe:
    enabled: true
    initialDelaySeconds: 2100
    periodSeconds: 10
    timeoutSeconds: 5
    failureThreshold: 6

'DevOps > Helm' 카테고리의 다른 글

[Helm] Helm Chart --set 사용 방법  (0) 2023.06.16
[Helm] Helm, Helm Chart란?  (1) 2023.05.24
Comments