要点を言えば、公式Pythonライブラリでメモリの問題が2回発生した上に、バージョンアップでさらにコントロールできないものになったので使うのをやめた。
自社用データ収集のプロジェクトoceanusでは、データをBigQueryに保存するだけでなく、データのリアルタイム処理、ストリーミング処理用にPub/Subにも送信している。
例えば、コンバージョンとか特定のイベントが来たらメールするとか、Googleスプレッドシートに書き込むとか。
開発してからしばらくは同一ネットワーク内(GKE)に自分で立てたRedisのPub/Subで行っていたが、Google Cloud Pub/Subの方が、自分でスケーリングとか考えなくていいし、どこからでもアクセスできたり、他のGCPサービス(Cloud Functions)と連携もしやすいので移した。
しかし、Cloud Pub/Subの公式Pythonライブラリは曲者(当時はバージョン0.25、2017/9/27現在0.28.3)で、メモリリークはしまくるし、特殊なライブラリが必要なようで、Dockerを使っているがイメージサイズの小さいalpine系のPythonでは動かくことができず、Debianベースのイメージを使う必要もあった。
メモリの使用量は同じマシンで稼働できるコンテナの数と直結するので、コスト面での影響もかなり大きい。
google-cloud-python/pubsub at master · GoogleCloudPlatform/google-cloud-python · GitHub
メモリリークは、別プロセスを立ち上げてしばらくしたら消して再起動する方法でなんとかなったけど、精神的に大分消耗した。
しばらくして、公式のライブラリのアップデート(0.28)があり、イメージ新しくした際に動かなくなったので、書き直すことにした。
これが大幅な変更で、理解するのにも時間がかかった。コミットメッセージは「Pub/Sub API Redesign」となっている。
Pub/Sub API Redesign (#3859) · GoogleCloudPlatform/google-cloud-python@4a8e155 · GitHub
特に大きいのが、メッセージを受け取る処理のSubscribeで、これまでは1件ずつメッセージ取り出してこちらで処理ができたが、コールバック関数を渡して処理させる形となった。
# Define the callback. # Note that the callback is defined *before* the subscription is opened. def callback(message): do_something_with(message) # Replace this with your actual logic. message.ack() # Open the subscription, passing the callback. subscription.open(callback)
その処理は非同期(non-blocking)のため、そのあとにスリープを行うことで、続けてメッセージを処理することができるとなっている。
サンプルだと下記のように、最後でsleepの無限ループを行っている。
最初スリーブしているのに処理は半永久的に進むのが不思議だった。
def receive_messages(project, subscription_name): """Receives messages from a pull subscription.""" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) def callback(message): print('Received message: {}'.format(message)) message.ack() subscriber.subscribe(subscription_path, callback=callback) # The subscriber is non-blocking, so we must keep the main thread from # exiting to allow it to process messages in the background. print('Listening for messages on {}'.format(subscription_path)) while True: time.sleep(60)
python-docs-samples/subscriber.py at master · GoogleCloudPlatform/python-docs-samples · GitHub
私の書いているoceanus/revelationでほぼ同じように処理を行ってみると、社内サーバーでは問題なく動いたが、本番環境のGKEに乗せるとメモリを1G以上も使い果たし、他のプロセスも停止させてしまった。
FlowControlという処理制御用のオブジェクトがあったので、max_messagesは1にしてみたが、変わらない。
FlowControl.__new__.__defaults__ = ( psutil.virtual_memory().total * 0.2, # max_bytes: 20% of total RAM float('inf'), # max_messages: no limit 0.8, # resume_threshold: 80% )
デフォルトでメモリ制限っぽい項目もあるようだが、3GB以下のマシンで最大2GBにも達していたから機能していなさそうだ。
google-cloud-python/types.py at master · GoogleCloudPlatform/google-cloud-python · GitHub
多い時で20/秒程度のメッセージが来ているので、おそらく大量のスレッドが立ち上がったのが原因だと思われる。が、そこの制御方法は見つからず、サービスを止めとくのも問題なので、調査はやめて、過去のRedis版に切り戻した。
大量のメッセージを受け取って、フィルターしてほとんど捨てる的な使い方は想定されていないのかもしれない。
Goで書いたPublisher側は特に問題がないので、Pythonクライアント特有なのかもしれないが、1年以内でメモリの問題が複数発生し、またクライアントライブラリも破壊的な変更が入るなど安定していないので、しばらくこの用途で使うのはやめようと思った。
Google Pub/Sub自体あまりユーザーがいないのかもしれない。