Airflowを使っていてタスク完了時、失敗時にSlack通知を自作する必要があるけど、そこで実行にかかった時間を表示したかった。
結論としてコールバックに渡されるcontextのdag_runにstart_date, end_dateが入っているので引き算すれば出せる。
Airflowのバージョンは1.10.10
f-stringを使っているので3.6未満の場合はformatなどへの書き直しが必要。
lib/test.py
from slack_webhook import Slack class MySlack: def __init__(self, url): self.slack = Slack(url=url) def post_slack(self, context): c = context url = c['conf'].get('webserver', 'base_url') self.slack.post(text=f'''{c["reason"]} {c["dag"].dag_id} {c["dag_run"].end_date - c["dag_run"].start_date}''', blocks=[ { "type": "section", "text": { "type": "mrkdwn", "text": f'''*{c["reason"]}* {c["dag"].dag_id} 実行時間: {c['dag_run'].end_date - c['dag_run'].start_date} {url}/admin/airflow/tree?dag_id={c["dag"].dag_id}''' } } ])
実際の通知はこんな感じ
まだエラー時などはテストしていない。
airflow.cfgのwebserber -> base_urlを環境変数で上書きしている場合、webserverだけでなく、schedulerの環境変数にも入れる必要がある。なかなか変えられなくてハマった。
通知テストにはただechoするだけみたいなテスト用DAGを置いておくと便利。
参考までにテスト用DAG.
SlackのWebhookURLはVariablesにSLACK_WEBHOOK_URLを作って保存している。
test.py
from datetime import timedelta from airflow.models import DAG, Variable from airflow.utils.dates import days_ago from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from libs.my_slack import MySlack SLACK_WEBHOOK_URL = Variable.get('SLACK_WEBHOOK_URL', '') default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'execution_timeout': timedelta(seconds=900) } myslack = MySlack(SLACK_WEBHOOK_URL) with DAG( 'test_echo', catchup=False, default_args=default_args, description='test dag echo', schedule_interval=None, on_success_callback=myslack.post_slack, on_failure_callback=myslack.post_slack ) as dag: t1 = DummyOperator( task_id='start' ) t2 = BashOperator( task_id='echo', bash_command='echo 1', ) t1 >> t2