Airflow

インストール(Docker)

docker pull puckel/docker-airflow:1.10.7

Docker コンテナのタグは次を参照:

Docker を使ってローカルで動かす

Ref:
puckel/docker-airflow: Docker Apache Airflow

docker-compose.ymlを作成する。

version: "2.1"
services:
  redis:
    image: "redis:5.0.5"
  postgres:
    image: postgres:9.6
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
  webserver:
    image: puckel/docker-airflow:1.10.7
    restart: always
    depends_on:
      - postgres
      - redis
    environment:
      - LOAD_EX=n
      - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
      - EXECUTOR=Celery
      - AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL=5
    volumes:
      - ./dags:/usr/local/airflow/dags
    ports:
      - "8080:8080"
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

  flower:
    image: puckel/docker-airflow:1.10.7
    restart: always
    depends_on:
      - redis
    environment:
      - EXECUTOR=Celery
    ports:
      - "5555:5555"
    command: flower

  scheduler:
    image: puckel/docker-airflow:1.10.7
    restart: always
    depends_on:
      - webserver
    volumes:
      - ./dags:/usr/local/airflow/dags
    environment:
      - LOAD_EX=n
      - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
      - EXECUTOR=Celery
    command: scheduler

  worker:
    image: puckel/docker-airflow:1.10.7
    restart: always
    depends_on:
      - scheduler
    volumes:
      - ./dags:/usr/local/airflow/dags
    environment:
      - FERNET_KEY=46BKJoQYlPPOexq0OhDZnIlNepKFf87WFwLbfzqDDho=
      - EXECUTOR=Celery
    command: worker

dagsディレクトリを作る。

mkdir dags

そのままdocker-compose runしても自動的にdagsディレクトリは作られるが、
その場合は root 権限で作成されてしまうため、あらかじめ手動で作成しておいた方がいい。

ここまででファイル構成は次のようになっている。

dags
docker-compose.yml

Docker コンテナを起動する。

docker-compose run -d

localhost:8080で Airflow の Web サーバーに繋がる。

aifrlow:10.0.3 を使う場合の注意点

requirements.txtを使ってwerkzeugをバージョン指定でインストールする。

werkzeug == 0.15.4

Ref:
https://stackoverflow.com/questions/56933523/apache-airflow-airflow-initdb-throws-modulenotfounderror-no-module-named-wer

airflow CLI の利用

次のようにentrypoint.shを介してbashを起動し、airflowコマンドを使う。

docker exec -it local_webserver_1 /entrypoint.sh bash

entrypoint.shの中で Airflow 用の変数定義をしているため、そのままbashセッションを開始するとairflowコマンド実行時にエラーとなってしまうので注意。

Ref:
DAG not running straight out of the box using LocalExecutor with docker-compose? · Issue #446 · puckel/docker-airflow

DAG の追加

dagsディレクトリに*.pyを追加すれば自動的に読み込まれる。

CLI

Dag Rus の一覧を取得する

DAG_ID="mydag"
airflow list_dag_runs $DAG_ID

DAG を手動実行する

DAG_ID="mydag"
# UTCの2000-01-01 01:00 としてDAGを実行する
airflow trigger_dag $DAG_ID -e 2001-01-01T01:00:00+00:00 --run_id "test_2001-01-01T01:00:00+00:00"

過去の DAG を実行する

airflow backfill -s 2015-01-01 mydag

スケジューリング

Ref:
Scheduling & Triggers — Airflow Documentation

Airflow スケジューラが 1 分おきに DAG を監視し、スケジュールの実行対象になった DAG を起動させている。

1 日間隔のスケジュールで DAG を実行する場合、2016-01-01の DAG は2016-01-01T23:59の後にすぐ実行される。
つまり、ジョブは対象の期間が終了した時点で実行される

DAG のスケジュール設定

スケジュール設定値は 3 通りある

  • cron 形式の文字列
  • datetime.timedeltaオブジェクト
  • cron プリセット
preset 意味 cron
None スケジュールなし、外部(CLI からなど)起動専用の DAG。
@once 一度だけ実行する。
@hourly 1 時間ごとに実行する。 0 * * * *
@daily 1 日 1 回、0 時に実行する。 0 0 * * *
@weekly 日曜日の朝 0 時に実行する 0 0 * * 0
@monthly 毎月最初の日の朝 0 時に実行する 0 0 1 * *
@yearly 毎年 1 月 1 日の朝 0 時に実行する 0 0 1 1 *
# cron形式の文字列
dag = DAG('mydag', schedule_interval="0 * * * *")

# `datetime.timedelta`オブジェクト
dag = DAG('mydag', schedule_interval=timedelta(days=1))

# cronプリセット
dag = DAG('mydag', schedule_interval=None)
dag = DAG('mydag', schedule_interval="@once")

スケジュールなしのNoneは文字列としてではなくそのまま設定する(schedule_interval=None)ことに注意。

バックフィルとキャッチアップ

Airflow スケジューラは、各 DAG についてstart_dateからend_dateまでの期間内に対してschedule_intervalの間隔ごとに実行させる。
Airflow スケジューラが処理するとき、まだ未実行の過去のスケジュールが存在すればそれも実行する。これをキャッチアップと呼ぶ。

キャッチアップを無効にするには、catchup=Falseを設定する。

dag = DAG('tutorial', catchup=False)

DAG の手動トリガー(trigger_dag)

Airflow の Web サーバで対象の DAG がONになっていないと実行されないので注意。

airflow trigger_dag <DAG> -e <EXECUTION_DATE>
# ex)
airflow trigger_dag hoge -e 2015-01-01

Ref:
https://airflow.apache.org/docs/stable/cli.html#trigger_dag

DAG の手動トリガー(backfill)

START_DATEからEND_DATEまでにスケジュールされていた DAG を実行する。
DAG の起動時間は本来のスケジュール実行される時刻となる。
Web サーバで対象の DAG がOFFになっていても実行される。

airflow backfill <DAG> -s <START_DATE> -e <END_DATE>

# ex)
airflow backfill hoge -s 2015-01-01 -e 2015-01-02

テンプレートマクロ

Ref:
Macros reference — Airflow Documentation

airflow.cfg について

Ref:
https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg

airflow backfill hoge -s 2015-01-01

Tips

DAG Runs を削除する

trigger_dagで実行済みの DAG Runs を再実行したい場合など、Dag Runs オブジェクトを削除したい場合がある。
現状は Airflow UI からのみ削除が可能。

  1. Browse > DAG Runs を選択する。
  2. 削除したい DAG Runs の行で、一番左のチェックボックスを ON にする。
  3. With selected > Delete で DagRuns を削除する
  4. 再度trigger_dagコマンドでジョブが実行できる

Ref:
How to delete a DAG run in Apache Airflow? - Stack Overflow

DAG を完全に削除する

DAG ファイルを消すだけでなく、次のコマンドを実行するか、Airflow UI の削除ボタンを利用してメタデータも削除する必要がある。

airflow delete_dag

Ref:
Apache Airflow のヒントとベストプラクティス-データサイエンスに向けて

start_dateを変えるときは DAG 名を変える

同じ名前で別のスケジュールな DAG が存在することになり、スケジューラを混乱させるため。

Ref:
Apache Airflow での DAG 作成のベストプラクティス

FAQ

タスクが scheduled のまま実行されない

https://stackoverflow.com/questions/49021055/airflow-1-9-0-is-queuing-but-not-launching-tasks