GAミント至上主義

Web Monomaniacal Developer.

AirflowでImportError: cannot import name 'HTMLString' from 'wtforms.widgets'

AirflowをCloud Buildでビルドして動かしたら突然、下記のようなエラーが起きた。

Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 26, in <module>
    from airflow.bin.cli import CLIFactory
  File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 71, in <module>
    from airflow.www_rbac.app import cached_app as cached_app_rbac
  File "/usr/local/lib/python3.7/site-packages/airflow/www_rbac/app.py", line 28, in <module>
    from flask_appbuilder import AppBuilder, SQLA
  File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/__init__.py", line 6, in <module>
    from .base import AppBuilder  # noqa: F401
  File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/base.py", line 8, in <module>
    from .api.manager import OpenApiManager
  File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/api/manager.py", line 7, in <module>
    from flask_appbuilder.baseviews import BaseView
  File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/baseviews.py", line 21, in <module>
    from .forms import GeneralModelConverter
  File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/forms.py", line 17, in <module>
    from .fieldwidgets import (
  File "/usr/local/lib/python3.7/site-packages/flask_appbuilder/fieldwidgets.py", line 3, in <module>
    from wtforms.widgets import html_params, HTMLString
ImportError: cannot import name 'HTMLString' from 'wtforms.widgets' (/usr/local/lib/python3.7/site-packages/wtforms/widgets/__init__.py)


Airflowというより、Flask-AppBuilderが使っているWTFormsというライブラリが原因の模様。
2.3.0のリリースが2020/4/22、今日だった。確信。

pypi.org

それぞれのバージョンを確認したところ間違いない。

動いてたバージョン
WTForms==2.2.1

エラーになったバージョン
WTForms-2.3.0

とりあえず、2.2.1を指定してインストールさせることで一時的に解決した。

Flask Web Development: Developing Web Applications with Python

Flask Web Development: Developing Web Applications with Python

AirflowのSlack通知で実行にかかった時間を表示する

Airflowを使っていてタスク完了時、失敗時にSlack通知を自作する必要があるけど、そこで実行にかかった時間を表示したかった。

結論としてコールバックに渡されるcontextのdag_runにstart_date, end_dateが入っているので引き算すれば出せる。

Airflowのバージョンは1.10.10
f-stringを使っているので3.6未満の場合はformatなどへの書き直しが必要。

lib/test.py

from slack_webhook import Slack

class MySlack:

    def __init__(self, url):
        self.slack = Slack(url=url)

    def post_slack(self, context):
        c = context
        url = c['conf'].get('webserver', 'base_url')
        self.slack.post(text=f'''{c["reason"]} {c["dag"].dag_id} {c["dag_run"].end_date - c["dag_run"].start_date}''',
        blocks=[
            {
                "type": "section",
                "text": {
                    "type": "mrkdwn",
                    "text": f'''*{c["reason"]}* {c["dag"].dag_id}
実行時間: {c['dag_run'].end_date - c['dag_run'].start_date}
{url}/admin/airflow/tree?dag_id={c["dag"].dag_id}'''
                }
            }
        ])

実際の通知はこんな感じ

f:id:uyamazak:20200422102238p:plain

まだエラー時などはテストしていない。

airflow.cfgのwebserber -> base_urlを環境変数で上書きしている場合、webserverだけでなく、schedulerの環境変数にも入れる必要がある。なかなか変えられなくてハマった。

通知テストにはただechoするだけみたいなテスト用DAGを置いておくと便利。

参考までにテスト用DAG.
SlackのWebhookURLはVariablesにSLACK_WEBHOOK_URLを作って保存している。

test.py

from datetime import timedelta
from airflow.models import DAG, Variable
from airflow.utils.dates import days_ago
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from libs.my_slack import MySlack

SLACK_WEBHOOK_URL = Variable.get('SLACK_WEBHOOK_URL', '')

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(seconds=900)
}

myslack = MySlack(SLACK_WEBHOOK_URL)

with DAG(
    'test_echo',
    catchup=False,
    default_args=default_args,
    description='test dag echo',
    schedule_interval=None,
    on_success_callback=myslack.post_slack,
    on_failure_callback=myslack.post_slack
) as dag:
    t1 = DummyOperator(
            task_id='start'
         )
    t2 = BashOperator(
            task_id='echo',
            bash_command='echo 1',
         )
    t1 >> t2

AirflowのwebserverでLBを通すとhttpにリダイレクトされてしまう問題

Cloud Composerを使わず、GKEでAirflowを構築し、ロードバランサーGCP HTTPS Load Balancer+Google管理の証明書を使用した。

通常の管理画面表示には問題なかったものの、VariablesなどのPOST系のアクションをすると、http接続に戻ってしまう問題があった。

どうやら、LBがSSLを行い、webserver側にはhttpで通信しているため、そちらに戻してしまうようだった。
github.com


下記のenable_proxy_fixを使えば大丈夫だった。
https://airflow.apache.org/docs/stable/howto/run-behind-proxy.html

cfgファイルで行う場合は下記のように入れる。

airflow.cfg

[webserver]
enable_proxy_fix = True


今回はローカルはそのままにしておきたかったので、k8s環境変数で設定する方式を利用した。

k8s deploymentのyaml抜粋

  template:
    metadata:
      creationTimestamp: null
      labels:
        app: airflow
        tier: webserver
    spec:
      containers:
      - args:
        - webserver
        env:
        - name: AIRFLOW__WEBSERVER__ENABLE_PROXY_FIX