ローカルで立てたAirflowで開発中のアプリDB→BigQueryのDAGがざっくり動いたので、今度は前回の更新日以降を差分同期したいと思った。
DBの種類がMySQLなのでちょっと違うけどやりたいのはまさにこれ。
前回実行時間と今回の実行時間があれば、SQLのWHERE句に入れればやりたいことできそう。
https://airflow.readthedocs.io/en/1.10.3post1/_modules/airflow/contrib/operators/mysql_to_gcs.html
ソースをみるとsqlがtemplate_fieldsに入っているのでSQLの中に{{ execution_date }}とやれば入りそうだけど、python側で条件分岐などを行いたいのと内部理解も含めてcontextを自分で使ってみる。
下記のようにwith文で処理しているのでdagに入っているのかと思ったら、それっぽいのは見当たらず。
with DAG( 'mysql2gcs2bq', default_args=default_args, description='admin tables to gcs to bq', schedule_interval=timedelta(days=1), ) as dag: t1 = DummyOperator( task_id='start' ) # ....
airflow.models.dag — Airflow Documentation
PythonOperatorはprovide_contextをTrueにすることでkwargsとして渡してくれるらしいのでそれでMySqlToGoogleCloudStorageOperatorを囲ってみることにした。
airflow.operators.python_operator — Airflow Documentation
まだデバッグ中のコードだけどこんな感じ。contextを受け取るためには**kwargsが必要。
def gcs2bqOp(name: str, **kwargs): print('gcs2bqOp {} kwargs:{}'.format(name, kwargs))
いろんなパス情報は関数化してる。
printの結果を見てみる。整形するとこんなの。
execution_date、prev_execution_date_successが今回必要なものっぽいが、datetimeではなく見慣れぬPendulumという型で入っているようす。直訳すると振り子。
[2020-04-14 01:56:32,510] {{logging_mixin.py:112}} INFO - mysql2gcsOp rhistories: { 'conf': <airflow.configuration.AirflowConfigParser object at 0x7ff6c4b9ef90>, 'dag': <DAG: mysql2gcs2bq>, 'ds': '2020-04-14', 'next_ds': '2020-04-14', 'next_ds_nodash': '20200414', 'prev_ds': '2020-04-14', 'prev_ds_nodash': '20200414', 'ds_nodash': '20200414', 'ts': '2020-04-14T01:56:20.076503+00:00', 'ts_nodash': '20200414T015620', 'ts_nodash_with_tz': '20200414T015620.076503+0000', 'yesterday_ds': '2020-04-13', 'yesterday_ds_nodash': '20200413', 'tomorrow_ds': '2020-04-15', 'tomorrow_ds_nodash': '20200415', 'END_DATE': '2020-04-14', 'end_date': '2020-04-14', 'dag_run': <DagRun mysql2gcs2bq @ 2020-04-14 01:56:20.076503+00:00: manual__2020-04-14T01:56:20.076503+00:00, externally triggered: True>, 'run_id': 'manual__2020-04-14T01:56:20.076503+00:00', 'execution_date': <Pendulum [2020-04-14T01:56:20.076503+00:00]>, 'prev_execution_date': <Pendulum [2020-04-14T01:56:20.076503+00:00]>, 'prev_execution_date_success': <Proxy at 0x7ff6b10aa140 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7ff6b1030950>>, 'prev_start_date_success': <Proxy at 0x7ff6b1037190 with factory <function TaskInstance.get_template_context.<locals>.<lambda> at 0x7ff6b106d9e0>>, 'next_execution_date': <Pendulum [2020-04-14T01:56:20.076503+00:00]>, 'latest_date': '2020-04-14', 'macros': <module 'airflow.macros' from '/usr/local/lib/python3.7/site-packages/airflow/macros/__init__.py'>, 'params': {}, 'tables': None, 'task': <Task(PythonOperator): py_rhistories_mysql_to_bq>, 'task_instance': <TaskInstance: mysql2gcs2bq.py_rhistories_mysql_to_bq 2020-04-14T01:56:20.076503+00:00 [running]>, 'ti': <TaskInstance: mysql2gcs2bq.py_rhistories_mysql_to_bq 2020-04-14T01:56:20.076503+00:00 [running]>, 'task_instance_key_str': 'mysql2gcs2bq__py_rhistories_mysql_to_bq__20200414', 'test_mode': False, 'var': {'value': None, 'json': None}, 'inlets': [], 'outlets': [], 'templates_dict': None }
datetimeを便利にしたものらしい。公式サイトがPythonライブラリとは思えないほどおしゃれ。jsだとmoment.jsとかdayjsとか、PHPだとCarbonとか日付関係の便利ライブラリだいたいあるね。
pendulum.eustace.io
便利そう。
これをいい感じにSQLに入れたりすればできそう。
成功日時の無い初回だけはちょっとなにかする必要があるか。
追記:templateで十分だし、contextを使うのは難しかった
ここまでやったところで、PythonOperator内で更にMySqlToGoogleCloudStorageOperatorを使うのとそのままでは動かず、面倒そうだった。
更にどうやらsqlはAirflowによっていわゆるtemplatedされているため、変数だけでなくjinjaテンプレートでif文とかも使えるのだった。
SELECT id, rphase_id, meta, created_at, updated_at FROM resumes {% if prev_execution_date_success %}WHERE updated_at >= "{{ prev_execution_date_success.fomart("%Y-%m-%d %H:%M:%S") }}"{% endif %}
今回はprev_execution_date_successが入っているときのみWhere句を入れたいのでこれで十分だった。
あれPendulumじゃない・・・・
上記のを試してみるけど、SQLが空になってしまうので試行錯誤してたら下記のようなエラーが。
[2020-04-14 05:51:21,872] {{taskinstance.py:1145}} ERROR - 'datetime.datetime object' has no attribute 'format'
え、Pendulumじゃなくて、datetime.datetime objectだ・・・。
contextとMySqlToGoogleCloudStorageOperatorでTemplateで使ってる変数が違うのだろうか。
あ、タイムゾーン・・・
とりあえずstrftimeにして動いたけど
```
SELECT id, rphase_id, meta, created_at, updated_at FROM resumes
{% if prev_execution_date_success %}WHERE updated_at >= "{{ prev_execution_date_success.strftime("%Y-%m-%d %H:%M:%S") }}"{% endif %}
```
実行ログをみると、当たり前だけどUTCのままの時間を使ってた
```[2020-04-14 05:57:34,313] {{mysql_to_gcs.py:92}} INFO - Executing:
SELECT id, rphase_id, meta, created_at, updated_at FROM resumes
WHERE updated_at >= "2020-04-14 05:47:45"```
ドキュメントを見てみるとensure_utcというそれっぽいオプションが。
airflow.contrib.operators.mysql_to_gcs — Airflow Documentation
Trueにして実行したところ、クエリの前にMySQL側のtime_zoneを変更してくれているよう。これなら大丈夫な予感。
```
[2020-04-14 06:08:15,877] {{mysql_to_gcs.py:90}} INFO - Executing: SET time_zone = '+00:00'
[2020-04-14 06:08:15,880] {{mysql_to_gcs.py:92}} INFO - Executing:
SELECT id, rphase_id, meta, created_at, updated_at FROM resumes
WHERE updated_at >= "2020-04-14 05:57:30"
```
ビッグデータを支える技術―刻々とデータが脈打つ自動化の世界 (WEB+DB PRESS plus)
- 作者:西田 圭介
- 発売日: 2017/09/22
- メディア: 単行本(ソフトカバー)