GAミント至上主義

Web Monomaniacal Developer.

AirflowのGoogleCloudStorageToBigQueryOperatorでAlready Existsエラー

差分だけ追記しようとしたら下記のエラーがでた。

Exception: BigQuery job failed. Final error was: {'reason': 'duplicate', 'message': 'Already Exists: Table project_id:airflow_local.table'}

AirflowというよりはBigQueryの設定でwrite_dispositionを渡す必要がある模様。
airflow.apache.org

デフォルトはWRITE_EMPTYなので、WRITE_APPENDを入れれば良さそう。
https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition

抜粋だけどこんな感じ。

gcs2bq = GoogleCloudStorageToBigQueryOperator(
            task_id=f'{name}_gcs_to_bq',
            bucket=GCS_BUCKET,
            source_objects=[dump_filename(name),],
            destination_project_dataset_table=bq_dataset_path(name),
            source_format='NEWLINE_DELIMITED_JSON',
            # https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition
            write_disposition='WRITE_APPEND',
            schema_object=schema_filename(name),
            google_cloud_storage_conn_id=GCP_CONN_ID,
            encoding='utf8',
            bigquery_conn_id=GCP_CONN_ID
        )

これでエラーなく追記された