GAミント至上主義

Web Monomaniacal Developer.

AirflowのMySqlToGoogleCloudStorageOperatorで実行時刻などをSQLで使う

ローカルで立てたAirflowで開発中のアプリDB→BigQueryのDAGがざっくり動いたので、今度は前回の更新日以降を差分同期したいと思った。


DBの種類がMySQLなのでちょっと違うけどやりたいのはまさにこれ。

tech.enigmo.co.jp


前回実行時間と今回の実行時間があれば、SQLのWHERE句に入れればやりたいことできそう。

https://airflow.readthedocs.io/en/1.10.3post1/_modules/airflow/contrib/operators/mysql_to_gcs.html

ソースをみるとsqlがtemplate_fieldsに入っているのでSQLの中に{{ execution_date }}とやれば入りそうだけど、python側で条件分岐などを行いたいのと内部理解も含めてcontextを自分で使ってみる。

dev.classmethod.jp


下記のようにwith文で処理しているのでdagに入っているのかと思ったら、それっぽいのは見当たらず。

with DAG(
    'mysql2gcs2bq',
    default_args=default_args,
    description='admin tables to gcs to bq',
    schedule_interval=timedelta(days=1),
) as dag:
    t1 = DummyOperator(
            task_id='start'
         )
# ....


airflow.models.dag — Airflow Documentation


PythonOperatorはprovide_contextをTrueにすることでkwargsとして渡してくれるらしいのでそれでMySqlToGoogleCloudStorageOperatorを囲ってみることにした。
airflow.operators.python_operator — Airflow Documentation

まだデバッグ中のコードだけどこんな感じ。contextを受け取るためには**kwargsが必要。

def gcs2bqOp(name: str, **kwargs):
    print('gcs2bqOp {} kwargs:{}'.format(name, kwargs))
 

いろんなパス情報は関数化してる。

printの結果を見てみる。整形するとこんなの。
execution_date、prev_execution_date_successが今回必要なものっぽいが、datetimeではなく見慣れぬPendulumという型で入っているようす。直訳すると振り子。

[2020-04-14 01:56:32,510] {{logging_mixin.py:112}} INFO - mysql2gcsOp rhistories:
{
    'conf': <airflow.configuration.AirflowConfigParser object at 0x7ff6c4b9ef90>,
    'dag': <DAG: mysql2gcs2bq>,
    'ds': '2020-04-14',
    'next_ds': '2020-04-14',
    'next_ds_nodash': '20200414',
    'prev_ds': '2020-04-14',
    'prev_ds_nodash': '20200414',
    'ds_nodash': '20200414',
    'ts': '2020-04-14T01:56:20.076503+00:00',
    'ts_nodash': '20200414T015620',
    'ts_nodash_with_tz': '20200414T015620.076503+0000',
    'yesterday_ds': '2020-04-13',
    'yesterday_ds_nodash': '20200413',
    'tomorrow_ds': '2020-04-15',
    'tomorrow_ds_nodash': '20200415',
    'END_DATE': '2020-04-14',
    'end_date': '2020-04-14',
    'dag_run': <DagRun mysql2gcs2bq @ 2020-04-14 01:56:20.076503+00:00: manual__2020-04-14T01:56:20.076503+00:00, externally triggered: True>,
    'run_id': 'manual__2020-04-14T01:56:20.076503+00:00',
    'execution_date': <Pendulum [2020-04-14T01:56:20.076503+00:00]>,
    'prev_execution_date': <Pendulum [2020-04-14T01:56:20.076503+00:00]>,
    'prev_execution_date_success': <Proxy at 0x7ff6b10aa140 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7ff6b1030950>>,
    'prev_start_date_success': <Proxy at 0x7ff6b1037190 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7ff6b106d9e0>>,
    'next_execution_date': <Pendulum [2020-04-14T01:56:20.076503+00:00]>,
    'latest_date': '2020-04-14',
    'macros': <module 'airflow.macros' from '/usr/local/lib/python3.7/site-packages/airflow/macros/__init__.py'>,
    'params': {},
    'tables': None,
    'task': <Task(PythonOperator): py_rhistories_mysql_to_bq>,
    'task_instance': <TaskInstance: mysql2gcs2bq.py_rhistories_mysql_to_bq 2020-04-14T01:56:20.076503+00:00 [running]>,
    'ti': <TaskInstance: mysql2gcs2bq.py_rhistories_mysql_to_bq 2020-04-14T01:56:20.076503+00:00 [running]>,
    'task_instance_key_str': 'mysql2gcs2bq__py_rhistories_mysql_to_bq__20200414',
    'test_mode': False,
    'var': {'value': None,
    'json': None},
    'inlets': [],
    'outlets': [],
    'templates_dict': None
}

datetimeを便利にしたものらしい。公式サイトがPythonライブラリとは思えないほどおしゃれ。jsだとmoment.jsとかdayjsとか、PHPだとCarbonとか日付関係の便利ライブラリだいたいあるね。
pendulum.eustace.io

便利そう。

これをいい感じにSQLに入れたりすればできそう。
成功日時の無い初回だけはちょっとなにかする必要があるか。

追記:templateで十分だし、contextを使うのは難しかった

ここまでやったところで、PythonOperator内で更にMySqlToGoogleCloudStorageOperatorを使うのとそのままでは動かず、面倒そうだった。

更にどうやらsqlはAirflowによっていわゆるtemplatedされているため、変数だけでなくjinjaテンプレートでif文とかも使えるのだった。

SELECT id, rphase_id, meta, created_at, updated_at FROM resumes
{% if prev_execution_date_success %}WHERE updated_at >= "{{ prev_execution_date_success.fomart("%Y-%m-%d %H:%M:%S") }}"{% endif %}

今回はprev_execution_date_successが入っているときのみWhere句を入れたいのでこれで十分だった。

あれPendulumじゃない・・・・

上記のを試してみるけど、SQLが空になってしまうので試行錯誤してたら下記のようなエラーが。

[2020-04-14 05:51:21,872] {{taskinstance.py:1145}} ERROR - 'datetime.datetime object' has no attribute 'format'

え、Pendulumじゃなくて、datetime.datetime objectだ・・・。

contextとMySqlToGoogleCloudStorageOperatorでTemplateで使ってる変数が違うのだろうか。

あ、タイムゾーン・・・

とりあえずstrftimeにして動いたけど
```
SELECT id, rphase_id, meta, created_at, updated_at FROM resumes
{% if prev_execution_date_success %}WHERE updated_at >= "{{ prev_execution_date_success.strftime("%Y-%m-%d %H:%M:%S") }}"{% endif %}
```
実行ログをみると、当たり前だけどUTCのままの時間を使ってた
```[2020-04-14 05:57:34,313] {{mysql_to_gcs.py:92}} INFO - Executing:
SELECT id, rphase_id, meta, created_at, updated_at FROM resumes
WHERE updated_at >= "2020-04-14 05:47:45"```


ドキュメントを見てみるとensure_utcというそれっぽいオプションが。
airflow.contrib.operators.mysql_to_gcs — Airflow Documentation

Trueにして実行したところ、クエリの前にMySQL側のtime_zoneを変更してくれているよう。これなら大丈夫な予感。
```
[2020-04-14 06:08:15,877] {{mysql_to_gcs.py:90}} INFO - Executing: SET time_zone = '+00:00'
[2020-04-14 06:08:15,880] {{mysql_to_gcs.py:92}} INFO - Executing:
SELECT id, rphase_id, meta, created_at, updated_at FROM resumes
WHERE updated_at >= "2020-04-14 05:57:30"
```