GCP: Cloud Composer
CLI
gcloud composer
が使えないときはgcloud components update
する。
DAG 関連
DAG の一覧を表示する
gcloud composer environments storage dags list \
--environment <NAME> \
--location <REGION>
# ex)
gcloud composer environments storage dags list \
--environment my-composer \
--location asia-northeast1
ジョブの開始時刻を取得する
https://stackoverflow.com/questions/47096829/airflow-get-start-time-of-dag-run
DAG を日付指定で手動実行する
gcloud composer environments run <ENVIRONMENT> \
--location asia-northeast1 \
trigger_dag -- \
<DAG_NAME> -e <YYYYMMDD>
指定した日付の範囲で過去のジョブを再実行
gcloud composer environments run <ENVIRONMENT> \
--location asia-northeast1 \
backfill -- \
<DAG_NAME> -s 2015-01-01 -e 2015-01-02
backfill
は失敗したジョブのみ再実行するので、成功したジョブも再実行したい場合はclear
する。
gcloud composer environments run <ENVIRONMENT> \
--location asia-northeast1 \
clear -- \
<DAG_NAME> -c -s 2015-01-11 -e 2015-01-12
Ref:
エアフローの execution_date:変数としてアクセスする必要があります
DAG を手動実行する
事前に次のコマンドを実行する必要がある。
gcloud components install kubectl
gcloud composer environments run test-environment \
--location us-central1 trigger_dag -- sample_quickstart \
--run_id=1234
Ref:
Airflow コマンドライン インターフェース | Cloud Composer | Google Cloud
DAG をアップロードする
アップロード後に Airflow のコンソールに反映されるまで数分時間がかかる場合がある。
gcloud composer environments storage dags import \
--environment <NAME> \
--location <REGION> \
--source <DAG_FILE>
# ex)
gcloud composer environments storage dags import \
--environment my-composer \
--location asia-northeast1 \
--source quickstart.py
gcloud composer
Terraform で構築
Ref: https://www.terraform.io/docs/providers/google/r/composer_environment.html
resource "google_composer_environment" "test" {
name = "%s"
region = "us-central1"
config {
node_count = 3
node_config {
zone = "us-central1-a"
machine_type = "n1-standard-1"
# network = google_compute_network.test.self_link
# subnetwork = google_compute_subnetwork.test.self_link
# service_account = google_service_account.test.name
}
}
# depends_on = [google_project_iam_member.composer-worker]
}
DAG ファイルをアップロード
DAG は GCS にアップロードすると Airflow に読み込まれる。
gsutil cp <DAG FILE> gs://<BUCKET>/dags/
監視
https://cloud.google.com/composer/docs/how-to/managing/monitoring-environments?hl=en
Composer で使える指標の一覧:
https://cloud.google.com/monitoring/api/metrics_gcp#gcp-composer
環境のヘルスチェック
composer.googleapis.com/environment/healthy
メトリックを使用する。
airflow_monitoring
という名前の DAG を 5 分おきに実行してヘルスチェックされる。
Ref:
https://cloud.google.com/composer/docs/how-to/managing/monitoring-environments?hl=en#environment
データベースのヘルスチェック
composer.googleapis.com/environment/database_health
を使用する。
毎分 Airflow monitoring pod が DB に ping を送ってヘルスチェックされる。
Ref:
https://cloud.google.com/composer/docs/how-to/managing/monitoring-environments?hl=en#environment
Airflow
Hello world
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t1)
Ref:
https://airflow.apache.org/docs/stable/tutorial.html
環境変数で Connection を設定する
https://airflow.apache.org/docs/stable/howto/connection/index.html
Tips
Airflow UI の URL を取得する
ENVIRONMENT_NAME="my-composer"
LOCATION="asia-northeast1"
PROJECT_ID="my-project"
gcloud composer environments describe $ENVIRONMENT_NAME \
--project $PROJECT_ID \
--location $LOCATION \
--format "value(config.airflowUri)"
日付は datetime を使わずになどのテンプレートを使用する
DAG をbackfill
などで過去分の再実行をするときに、datetime を使ってしまうと、常に当日に対する処理をしてしまう。
Ref:
https://towardsdatascience.com/apache-airflow-tips-and-best-practices-ff64ce92ef8#4ba8