GCP: Cloud Composer

CLI

gcloud composerが使えないときはgcloud components updateする。

DAG 関連

DAG の一覧を表示する

gcloud composer environments storage dags list \
  --environment <NAME> \
  --location <REGION>

# ex)
gcloud composer environments storage dags list \
  --environment my-composer \
  --location asia-northeast1

ジョブの開始時刻を取得する

https://stackoverflow.com/questions/47096829/airflow-get-start-time-of-dag-run

DAG を日付指定で手動実行する

gcloud composer environments run <ENVIRONMENT> \
  --location asia-northeast1 \
  trigger_dag -- \
  <DAG_NAME> -e <YYYYMMDD>

指定した日付の範囲で過去のジョブを再実行

gcloud composer environments run <ENVIRONMENT> \
  --location asia-northeast1 \
  backfill -- \
  <DAG_NAME> -s 2015-01-01 -e 2015-01-02

backfillは失敗したジョブのみ再実行するので、成功したジョブも再実行したい場合はclearする。

gcloud composer environments run <ENVIRONMENT> \
  --location asia-northeast1 \
  clear -- \
  <DAG_NAME> -c -s 2015-01-11 -e 2015-01-12

Ref:
エアフローの execution_date:変数としてアクセスする必要があります

DAG を手動実行する

事前に次のコマンドを実行する必要がある。

gcloud components install kubectl
gcloud composer environments run test-environment \
    --location us-central1 trigger_dag -- sample_quickstart \
    --run_id=1234

Ref:
Airflow コマンドライン インターフェース  |  Cloud Composer  |  Google Cloud

DAG をアップロードする

アップロード後に Airflow のコンソールに反映されるまで数分時間がかかる場合がある。

gcloud composer environments storage dags import \
  --environment <NAME> \
  --location <REGION> \
  --source <DAG_FILE>

# ex)
gcloud composer environments storage dags import \
  --environment my-composer \
  --location asia-northeast1 \
  --source quickstart.py
gcloud composer

Terraform で構築

Ref: https://www.terraform.io/docs/providers/google/r/composer_environment.html

resource "google_composer_environment" "test" {
  name   = "%s"
  region = "us-central1"
  config {
    node_count = 3

    node_config {
      zone         = "us-central1-a"
      machine_type = "n1-standard-1"

      # network    = google_compute_network.test.self_link
      # subnetwork = google_compute_subnetwork.test.self_link

      # service_account = google_service_account.test.name
    }
  }

  # depends_on = [google_project_iam_member.composer-worker]
}

DAG ファイルをアップロード

DAG は GCS にアップロードすると Airflow に読み込まれる。

gsutil cp <DAG FILE> gs://<BUCKET>/dags/

監視

https://cloud.google.com/composer/docs/how-to/managing/monitoring-environments?hl=en

Composer で使える指標の一覧:
https://cloud.google.com/monitoring/api/metrics_gcp#gcp-composer

環境のヘルスチェック

composer.googleapis.com/environment/healthyメトリックを使用する。
airflow_monitoringという名前の DAG を 5 分おきに実行してヘルスチェックされる。

Ref:
https://cloud.google.com/composer/docs/how-to/managing/monitoring-environments?hl=en#environment

データベースのヘルスチェック

composer.googleapis.com/environment/database_healthを使用する。
毎分 Airflow monitoring pod が DB に ping を送ってヘルスチェックされる。

Ref:
https://cloud.google.com/composer/docs/how-to/managing/monitoring-environments?hl=en#environment

Airflow

Hello world

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

Ref:
https://airflow.apache.org/docs/stable/tutorial.html

環境変数で Connection を設定する

https://airflow.apache.org/docs/stable/howto/connection/index.html

Tips

Airflow UI の URL を取得する

ENVIRONMENT_NAME="my-composer"
LOCATION="asia-northeast1"
PROJECT_ID="my-project"

gcloud composer environments describe $ENVIRONMENT_NAME \
  --project $PROJECT_ID \
  --location $LOCATION \
  --format "value(config.airflowUri)"

Ref:
https://cloud.google.com/composer/docs/how-to/accessing/airflow-web-interface?hl=ja#retrieving_the_web_interface_url_via_the_gcloud_command-line_tool

日付は datetime を使わずになどのテンプレートを使用する

DAG をbackfillなどで過去分の再実行をするときに、datetime を使ってしまうと、常に当日に対する処理をしてしまう。

Ref:
https://towardsdatascience.com/apache-airflow-tips-and-best-practices-ff64ce92ef8#4ba8