GAミント至上主義

Web Monomaniacal Developer.

AirflowのSlack通知で実行にかかった時間を表示する

Airflowを使っていてタスク完了時、失敗時にSlack通知を自作する必要があるけど、そこで実行にかかった時間を表示したかった。

結論としてコールバックに渡されるcontextのdag_runにstart_date, end_dateが入っているので引き算すれば出せる。

Airflowのバージョンは1.10.10
f-stringを使っているので3.6未満の場合はformatなどへの書き直しが必要。

lib/test.py

from slack_webhook import Slack

class MySlack:

    def __init__(self, url):
        self.slack = Slack(url=url)

    def post_slack(self, context):
        c = context
        url = c['conf'].get('webserver', 'base_url')
        self.slack.post(text=f'''{c["reason"]} {c["dag"].dag_id} {c["dag_run"].end_date - c["dag_run"].start_date}''',
        blocks=[
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f'''*{c["reason"]}* {c["dag"].dag_id}
実行時間: {c['dag_run'].end_date - c['dag_run'].start_date}
{url}/admin/airflow/tree?dag_id={c["dag"].dag_id}'''
                }
            }
        ])

実際の通知はこんな感じ

f:id:uyamazak:20200422102238p:plain

まだエラー時などはテストしていない。

airflow.cfgのwebserber -> base_urlを環境変数で上書きしている場合、webserverだけでなく、schedulerの環境変数にも入れる必要がある。なかなか変えられなくてハマった。

通知テストにはただechoするだけみたいなテスト用DAGを置いておくと便利。

参考までにテスト用DAG.
SlackのWebhookURLはVariablesにSLACK_WEBHOOK_URLを作って保存している。

test.py

from datetime import timedelta
from airflow.models import DAG, Variable
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from libs.my_slack import MySlack

SLACK_WEBHOOK_URL = Variable.get('SLACK_WEBHOOK_URL', '')

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(seconds=900)
}

myslack = MySlack(SLACK_WEBHOOK_URL)

with DAG(
    'test_echo',
    catchup=False,
    default_args=default_args,
    description='test dag echo',
    schedule_interval=None,
    on_success_callback=myslack.post_slack,
    on_failure_callback=myslack.post_slack
) as dag:
    t1 = DummyOperator(
            task_id='start'
         )
    t2 = BashOperator(
            task_id='echo',
            bash_command='echo 1',
         )
    t1 >> t2