GAミント至上主義

Web Monomaniacal Developer.

Cloud BuildのSlack通知をFirebase Functions + TypeScriptで作る

Cloud Buildは同じGCP製品だけあって非常にシンプルにFirebase(WEBしかやったことない)のデプロイを行えますが、ビルド成功、失敗時の通知機能はついてません。

cloud.google.com

Circle CIのように標準ではついていないので、下記の公式ドキュメントのように自分でFunctionsの関数作って行うのが一般的のようです。

https://cloud.google.com/cloud-build/docs/configure-third-party-notifications?hl=ja


でも今回はFirebaseを使ったプロジェクトだったため、Firebase Functions + TypeScriptで行いたいのでTypeScriptの勉強も兼ねて作ってみました。

Firebaseのプロジェクトについては省略
Cloud Functions に TypeScript を使用する  |  Firebase

公式のと違う点は、終了時にビルドにかかった時間を知りたかったので追加した点。
メッセージは面倒だったのでただのテキストにしてます。

Webook URL

Slack側でWebhookのURLを発行します。
slack.com


発行したURLはFirebaseなのでprocess.envは使わずconfigを使って格納しています。

詳しくは下記
環境の構成  |  Firebase

事前に下記のコマンドでセットしておきます。

firebase functions:config:set slack.webhook_url="https://hooks.slack.com/services/***/***/***"

必要なパッケージ

slackの他、ビルド時間を計算するためにdate-fnsを使用しているのでインストールします。

npm install @slack/webhook date-fns

date-fns - modern JavaScript date utility library

ソース

まだいろいろ改善の余地があるけど動いたやつ。
WORKINGはQUEUEDの後にすぐ来るのでスキップしてます。
swich文やめようかな。

slack.ts

import { pubsub, config } from 'firebase-functions'
import { IncomingWebhook } from '@slack/webhook'
import { differenceInSeconds, parseISO } from 'date-fns'

const WEBHOOK_URL = config().slack.webhook_url
const webhook = new IncomingWebhook(WEBHOOK_URL)

function createMessage (data: any): string {
  const status = data.status
  const repoName = data.substitutions?.REPO_NAME
  const startTime = data.startTime
  const finishTime = data.finishTime
  const buildSeconds = differenceInSeconds(parseISO(finishTime), parseISO(startTime))
  const logUrl = data.logUrl
  switch (status) {
    case 'QUEUED':
      return `${status}, repo: ${repoName}`
    case 'SUCCESS':
    case 'FAILURE':
    case 'INTERNAL_ERROR':
    case 'TIMEOUT':
    case 'CANCELLED':
      return `${status}, repo: ${repoName}, buildtime: ${Math.floor(buildSeconds / 60)}min ${buildSeconds % 60}sec \n${logUrl}`
    case 'WORKING':
    default:
      return ''
  }
}

export async function notifySlack (message: pubsub.Message): Promise<void> {
  const body = message.data ? Buffer.from(message.data, 'base64').toString() : null
  if (!body) {
    console.log('body is null')
    return
  }
  const data = JSON.parse(body)
  const text = createMessage(data)
  if (!text) {
    console.log(`${data.status} skipped`)
    return
  }
  await webhook.send({
    text: text
  })
}


↑notifySlackをindex.tsで読み込んでonPublish()に渡します。
topic名'cloud-builds'はCloud Buildを有効にすると勝手にできるやつなので変えることはなさそうなので直書きしてます。

index.tsの抜粋

import { pubsub } from 'firebase-functions'
import { notifySlack } from './slack'

//ほかのやつ
export const notifyBuild = pubsub.topic('cloud-builds').onPublish(notifySlack)

できたやつ

こんな感じで通知が来るようになりました
f:id:uyamazak:20200513181023p:plain



プログラミングTypeScript ―スケールするJavaScriptアプリケーション開発

プログラミングTypeScript ―スケールするJavaScriptアプリケーション開発

  • 作者:Boris Cherny
  • 発売日: 2020/03/16
  • メディア: 単行本(ソフトカバー)

@google-cloud/pubsubをTypeScriptで使おうとしたらエラー

@google-cloud/pubsub": "^1.7.2"を使い、tscしたところ下記のエラーが出た。

www.npmjs.com

node_modules/@grpc/grpc-js/build/src/call.d.ts:68:5 - error TS2416: Property '_write' in type 'ClientWritableStreamImpl<RequestType>' is not assignable to the same property in base type 'Writable'.
  Type '(chunk: RequestType, encoding: string, cb: WriteCallback) => void' is not assignable to type '(chunk: any, encoding: string, callback: (err?: Error | undefined) => void) => void'.
    Types of parameters 'cb' and 'callback' are incompatible.
      Types of parameters 'err' and 'error' are incompatible.
        Type 'Error | null | undefined' is not assignable to type 'Error | undefined'.
          Type 'null' is not assignable to type 'Error | undefined'.

68     _write(chunk: RequestType, encoding: string, cb: WriteCallback): void;
       ~~~~~~

node_modules/@grpc/grpc-js/build/src/call.d.ts:79:5 - error TS2416: Property '_write' in type 'ClientDuplexStreamImpl<RequestType, ResponseType>' is not assignable to the same property in base type 'Duplex'.
  Type '(chunk: RequestType, encoding: string, cb: WriteCallback) => void' is not assignable to type '(chunk: any, encoding: string, callback: (err?: Error | undefined) => void) => void'.
    Types of parameters 'cb' and 'callback' are incompatible.
      Types of parameters 'err' and 'error' are incompatible.
        Type 'Error | null | undefined' is not assignable to type 'Error | undefined'.
          Type 'null' is not assignable to type 'Error | undefined'.

79     _write(chunk: RequestType, encoding: string, cb: WriteCallback): void;

バージョンを確認した所、1.7.2が新しめだったので1.6.0に戻し、念の為node_modulesを削除して再インストールしたところ問題なくコンパイルできた。
www.npmjs.com


package.json

"dependencies": {
    "@google-cloud/pubsub": "^1.6.0",

2系がもう少しで出るらしいので、バタバタしてるのかな。

シニアジョブのデータ基盤のETLとしてApache AirflowをGKEで構築した理由

これまで小ネタは書いたけど、なぜApache Airflow (以下Airflow)使ってるのか、Cloud Composerを使わずGoogle Kubernetes Engine(以下GKE)を使っているのか等そもそも論的なことは書いてなかったので、忘れないうちにまとめ。

データ基盤を作る理由

シニアジョブでは社長を筆頭に営業上に必要な数字を管理画面で見れるようにしてデータを活用しています。

でもアプリケーションと同じDBを使っているため、負荷の問題などもあり、いろいろと限界が来てました。

そこで、イマドキの会社では当たり前になっているデータ基盤を構築し、アプリケーションとは分けてデータを利用できるようにしようと思いました。

参考にしたのは下記の記事。

yuzutas0.hatenablog.com

データ分析基盤構築プロジェクトをスムーズに遂行するための知識を整理してみる。 - Qiita

ワークフローエンジンの選定

BigQueryは以前ビズオーシャンでも使ってたけど、DBからのデータ移行は開発サーバーでバッチ処理を行っていました。

しかし、それはあんまりイマドキではなく、上記の記事であるように、最近はGCPのCloud Composer等、ワークフローエンジンの利用がいろんなところでおすすめされているため、調査をしました。

選択肢としては下記のようなものがありました。

参考:
qiita.com

ワークフローエンジンの選択肢

Apache Airflow

元はAirbnbが開発し、現在はApache配下のオープンソースプロジェクト。タスクをPythonで書くのが特徴でしょうか。

Cloud Composer

GCPが提供するマネージドのAirflow。複雑なAirflowをサクッと動かせますが後述するように、動かすだけで月4万円以上かかります。

Cloud Data Fusion

こちらもGCPのマネージドETL。コードではなくGUI上でできるようですが、むしろコードで管理したいので除外。情報も少ない。
Cloud Data Fusion  |  Google Cloud

Digdag

Fluentdでおなじみの Treasure Data社製のワークフローエンジン。オープンソース。タスクはyamlで書きます。
www.digdag.io

Argo

Kubernetes向けに作られたもの。オープンソース。こちらもyamlでタスクを書きます。

Jenkins、Rundeck

イマドキ感不足。前職で使ったけどもう触りたくない

Airflow を選んだ理由

GCP, BigQuery との親和性

まず大前提としてBigQueryを使うというのがあるので、GCPのマネージドがあるということが一番大きいです。
BigQueryはもちろんCloud Storageなどを使うライブラリも充実しており、情報も多いです。
GCP系であれば驚くほど少ないコードでタスクを書けます。

タスクをPythonで書ける、本体もPython

まずDAGと呼ばれるタスク定義を書くのがPython
なにか困っても大抵のことはできそうだし、一番好きな言語なのでポイント高いです。yamlは仕方なく書くけど好きじゃない。
本体もPythonなので詳しく知りたくなっても読みやすいです。

また、WEBサーバーにはFlask, タスクキューにはCelery, メタデータDBの接続にはSQLAlchemy, 時間系にはPendulumなど有名どころのライブラリの組み合わせでもあるのでPythonアプリケーションの教材としても優秀だと思います。

Airbnbのブランド力

名前もおそらくAirbnbのWorkflowだからAirflowなのでしょうか。

airbnb.io

私がPaul Grahamのファンであり、現在は引退したもののAirbnbがY Combinatorの出資先でもあるのでポイント高いです。

検索情報の多さ

Cloud Composerのユーザーも多くみられ、検索数でもDigdagよりも多いようです。

f:id:uyamazak:20200422140232p:plain

実際構築する際に困ったことはだいたいググって解決できました。

公式ドキュメントも充実しています。

画面がかっこいい

大したことやってなくても見た目がかっこいいです。
f:id:uyamazak:20200422161403p:plain

Cloud Composerではなく自分でAirflowをGKEで動かすことにした理由

お金の問題

Airflowはかなり複雑なAirflowをポチっと構築、運用してくれるCloud Composerは超便利ですが、対価としてお金がかかります。

アメリカのリージョンを使い最低にしてもCloud Composerだけで月400ドルの見積もりとなってしまいました。プラスでディスクやネットワーク代もかかり、東京リージョンも使いたいので現実的には600ドル前後でしょうか。

リージョンやマシン数によっても違うので下記から計算できます。
Google Cloud Platform 料金計算ツール

しかし、導入段階の自社においては、どれだけ使うかもわからないし、そんなに性能も信頼性も必要ありません。

Cloud Composerではノード数を3以下にできないかったり、メタデータDBにCloud SQLを使ったりと、現時点では圧倒的オーバースペックです。

全部GKEで動かせばおそらく月150ドル程度、BigQuery、ネットワーク代などを加えてもトータルで300ドル程度に抑えられそうです。

また一旦動かせたら、そんなに手もかからないはず。

ざっと日本円で月3万円ぐらいは節約できるので、本番環境における直接的なコストだけ考えても年約36万円節約できます。

まだ動かして数日で、テスト的なタスクしかやってないけど2週間で1万5千円くらい。

GKEのノードn1-standard-2(CPU:2、メモリ:7.5GB)1台で動かせています。

f:id:uyamazak:20200422151301p:plain

知識の問題

正直最初Cloud Composerを使った時は、DAGは何となく分かるけど、Airflowがどんな構造をしているのか、何をしているのかが全くわかりませんでした。

しかし、自分で構築してみるといろんなところでハマりまくり、いろんな設定を触ったり、本体のソースコードを読んだり、そのおかげで感覚的につかめてきました。

また下記のDockerで動かせるサンプルがあったのが大きいです。
github.com

また、今回は外向けのIPを固定する必要があったのでCloud NATを使った設定も実践することができました。

ローカル環境で開発したい

新規に構築するとのなると開発環境がほしいですが、Cloud Composerで開発環境も作ってしまうとまた月数万が飛んでいってしまうことになります。

社内にデータエンジニアが何人もいるような大きな会社であれば全然安いと思いますが、シニアジョブは現状エンジニア職2人(募集中)で、データまわりをやるのは現状私一人という状況では超高い。

またCloud ComposerはいろいろとGoogleが手を付けており、ローカルで同じようなAirflow環境を再現するのは難しいです。

Docker+GKEであれば同じコンテナを使用することができますので、環境差異も少なくストレスなく開発できます。

必要になったらCloud Composerへの移行できる

今後、データの活用や人数が増えて、自前での運用が大変になったとしてもマネージドであるCloud Composerに移すのは簡単そうです。
ConnectionsやVariableに入れた設定は、WEB上のGUIで入れられますし、DAGのファイルはAirflow本体とは疎結合になっているのでそのまま移行できるはずです。

これから

まだ一部のデータですが、BigQueryに入ったので、Google Data Portalなどを使ってサンプル的になにか作ってみようと思います。
今後差分更新を行えばかなりリアルタイムになると思うので、既存の重い処理の差し替えもやっていきたいです。