差分だけ追記しようとしたら下記のエラーがでた。
Exception: BigQuery job failed. Final error was: {'reason': 'duplicate', 'message': 'Already Exists: Table project_id:airflow_local.table'}
AirflowというよりはBigQueryの設定でwrite_dispositionを渡す必要がある模様。
airflow.apache.org
デフォルトはWRITE_EMPTYなので、WRITE_APPENDを入れれば良さそう。
https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
抜粋だけどこんな感じ。
gcs2bq = GoogleCloudStorageToBigQueryOperator( task_id=f'{name}_gcs_to_bq', bucket=GCS_BUCKET, source_objects=[dump_filename(name),], destination_project_dataset_table=bq_dataset_path(name), source_format='NEWLINE_DELIMITED_JSON', # https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition='WRITE_APPEND', schema_object=schema_filename(name), google_cloud_storage_conn_id=GCP_CONN_ID, encoding='utf8', bigquery_conn_id=GCP_CONN_ID )
これでエラーなく追記された
Google Cloud Platform実践ビッグデータ分析基盤開発 ストーリーで学ぶGoogle BigQuery
- 作者:トップゲート
- 発売日: 2019/12/01
- メディア: 単行本