Airflow
インストール(Docker)
docker pull puckel/docker-airflow:1.10.7
Docker コンテナのタグは次を参照:
Docker を使ってローカルで動かす
Ref:
puckel/docker-airflow: Docker Apache Airflow
docker-compose.yml
を作成する。
version: "2.1"
services:
redis:
image: "redis:5.0.5"
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
webserver:
image: puckel/docker-airflow:1.10.7
restart: always
depends_on:
- postgres
- redis
environment:
- LOAD_EX=n
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Celery
- AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=5
volumes:
- ./dags:/usr/local/airflow/dags
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
flower:
image: puckel/docker-airflow:1.10.7
restart: always
depends_on:
- redis
environment:
- EXECUTOR=Celery
ports:
- "5555:5555"
command: flower
scheduler:
image: puckel/docker-airflow:1.10.7
restart: always
depends_on:
- webserver
volumes:
- ./dags:/usr/local/airflow/dags
environment:
- LOAD_EX=n
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Celery
command: scheduler
worker:
image: puckel/docker-airflow:1.10.7
restart: always
depends_on:
- scheduler
volumes:
- ./dags:/usr/local/airflow/dags
environment:
- FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
- EXECUTOR=Celery
command: worker
dags
ディレクトリを作る。
mkdir dags
そのままdocker-compose run
しても自動的にdags
ディレクトリは作られるが、
その場合は root 権限で作成されてしまうため、あらかじめ手動で作成しておいた方がいい。
ここまででファイル構成は次のようになっている。
dags
docker-compose.yml
Docker コンテナを起動する。
docker-compose run -d
localhost:8080
で Airflow の Web サーバーに繋がる。
aifrlow:10.0.3 を使う場合の注意点
requirements.txt
を使ってwerkzeug
をバージョン指定でインストールする。
werkzeug == 0.15.4
airflow CLI の利用
次のようにentrypoint.sh
を介してbash
を起動し、airflow
コマンドを使う。
docker exec -it local_webserver_1 /entrypoint.sh bash
entrypoint.sh
の中で Airflow 用の変数定義をしているため、そのままbash
セッションを開始するとairflow
コマンド実行時にエラーとなってしまうので注意。
DAG の追加
dags
ディレクトリに*.py
を追加すれば自動的に読み込まれる。
CLI
Dag Rus の一覧を取得する
DAG_ID="mydag"
airflow list_dag_runs $DAG_ID
DAG を手動実行する
DAG_ID="mydag"
# UTCの2000-01-01 01:00 としてDAGを実行する
airflow trigger_dag $DAG_ID -e 2001-01-01T01:00:00+00:00 --run_id "test_2001-01-01T01:00:00+00:00"
過去の DAG を実行する
airflow backfill -s 2015-01-01 mydag
スケジューリング
Ref:
Scheduling & Triggers — Airflow Documentation
Airflow スケジューラが 1 分おきに DAG を監視し、スケジュールの実行対象になった DAG を起動させている。
1 日間隔のスケジュールで DAG を実行する場合、2016-01-01
の DAG は2016-01-01T23:59
の後にすぐ実行される。
つまり、ジョブは対象の期間が終了した時点で実行される。
DAG のスケジュール設定
スケジュール設定値は 3 通りある
- cron 形式の文字列
datetime.timedelta
オブジェクト- cron プリセット
preset | 意味 | cron |
---|---|---|
None | スケジュールなし、外部(CLI からなど)起動専用の DAG。 | |
@once | 一度だけ実行する。 | |
@hourly | 1 時間ごとに実行する。 | 0 * * * * |
@daily | 1 日 1 回、0 時に実行する。 | 0 0 * * * |
@weekly | 日曜日の朝 0 時に実行する | 0 0 * * 0 |
@monthly | 毎月最初の日の朝 0 時に実行する | 0 0 1 * * |
@yearly | 毎年 1 月 1 日の朝 0 時に実行する | 0 0 1 1 * |
# cron形式の文字列
dag = DAG('mydag', schedule_interval="0 * * * *")
# `datetime.timedelta`オブジェクト
dag = DAG('mydag', schedule_interval=timedelta(days=1))
# cronプリセット
dag = DAG('mydag', schedule_interval=None)
dag = DAG('mydag', schedule_interval="@once")
スケジュールなしのNone
は文字列としてではなくそのまま設定する(schedule_interval=None
)ことに注意。
バックフィルとキャッチアップ
Airflow スケジューラは、各 DAG についてstart_date
からend_date
までの期間内に対してschedule_interval
の間隔ごとに実行させる。
Airflow スケジューラが処理するとき、まだ未実行の過去のスケジュールが存在すればそれも実行する。これをキャッチアップと呼ぶ。
キャッチアップを無効にするには、catchup=False
を設定する。
dag = DAG('tutorial', catchup=False)
DAG の手動トリガー(trigger_dag)
Airflow の Web サーバで対象の DAG がON
になっていないと実行されないので注意。
airflow trigger_dag <DAG> -e <EXECUTION_DATE>
# ex)
airflow trigger_dag hoge -e 2015-01-01
Ref:
https://airflow.apache.org/docs/stable/cli.html#trigger_dag
DAG の手動トリガー(backfill)
START_DATE
からEND_DATE
までにスケジュールされていた DAG を実行する。
DAG の起動時間は本来のスケジュール実行される時刻となる。
Web サーバで対象の DAG がOFF
になっていても実行される。
airflow backfill <DAG> -s <START_DATE> -e <END_DATE>
# ex)
airflow backfill hoge -s 2015-01-01 -e 2015-01-02
テンプレートマクロ
Ref:
Macros reference — Airflow Documentation
airflow.cfg について
Ref:
https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg
airflow backfill hoge -s 2015-01-01
Tips
DAG Runs を削除する
trigger_dag
で実行済みの DAG Runs を再実行したい場合など、Dag Runs オブジェクトを削除したい場合がある。
現状は Airflow UI からのみ削除が可能。
- Browse > DAG Runs を選択する。
- 削除したい DAG Runs の行で、一番左のチェックボックスを ON にする。
- With selected > Delete で DagRuns を削除する
- 再度
trigger_dag
コマンドでジョブが実行できる
Ref:
How to delete a DAG run in Apache Airflow? - Stack Overflow
DAG を完全に削除する
DAG ファイルを消すだけでなく、次のコマンドを実行するか、Airflow UI の削除ボタンを利用してメタデータも削除する必要がある。
airflow delete_dag
Ref:
Apache Airflow のヒントとベストプラクティス-データサイエンスに向けて
start_date
を変えるときは DAG 名を変える
同じ名前で別のスケジュールな DAG が存在することになり、スケジューラを混乱させるため。
Ref:
Apache Airflow での DAG 作成のベストプラクティス
FAQ
タスクが scheduled のまま実行されない
https://stackoverflow.com/questions/49021055/airflow-1-9-0-is-queuing-but-not-launching-tasks