要約
- Google Pub/Subクライアントを使い、ずっと動かし続けてたらメモリリークが見つかった
- 調査したらクライアント自体のバグか仕様だった。そもそも動かし続けるように作られてない
- multiprocessingのProcessを使い、別プロセスで動かし、メッセージを一定数ごと受け取ったらそのプロセスごと再起動するようにした
oceanusのpubsubからデータを受け取り処理を行うrevelationで、pubusbをRedisからGoogle Cloud Pub/Subにしたところ、メモリリークが発生した。
データはbizoceanのイベントログが送られてきて、一つ数KB、~10メッセージ/秒が来る。
起動時は50MBだけど1時間に数MB増え、1週間で数百MBになり、他のPODにも影響してしまう。
本番のKubernetes(GKE)では下記コマンドでメモリの増加を確認。
% kubectl top pod NAME CPU(cores) MEMORY(bytes) revelation-1694192843-16c09 4m 74Mi
環境作成
調査のためにrevelationから原因と思われるCloud Pub/Sub以外の処理を削除したスクリプトを作成してローカルで動かす。
memorytest.py
#!/usr/bin/env python import logging from os import environ from google.cloud import pubsub from subscriber import (list_subscriptions, create_subscription) logger = logging.getLogger(__name__) logger.addHandler(logging.StreamHandler()) logger.setLevel(getattr(logging, "DEBUG")) TOPIC_NAME = environ.get("TOPIC_NAME") SUBSCRIPTION_NAME = environ.get("SUBSCRIPTION_NAME") class MemoryTest: def __init__(self): self.pubsub_client = pubsub.Client() self.topic = self.pubsub_client.topic(TOPIC_NAME) if SUBSCRIPTION_NAME not in list_subscriptions(TOPIC_NAME): create_subscription(TOPIC_NAME, SUBSCRIPTION_NAME) logger.debug("create_subscription. " "topic:{} " "sub:{}".format(TOPIC_NAME, SUBSCRIPTION_NAME)) def main(self): subscription = self.topic.subscription(SUBSCRIPTION_NAME) while True: results = subscription.pull(return_immediately=True, max_messages=100) logger.debug('Received {} messages.'.format(len(results))) if results: subscription.acknowledge([ack_id for ack_id, message in results]) if __name__ == '__main__': logger.info("starting memory test") mt = MemoryTest() mt.main()
実行はDockerを使う。
こちらも、本番で使っているDockerfileから不要なものを削る。
Dockerfile
FROM python:3.6 RUN pip install --upgrade -q \ pip \ google-api-python-client \ google-cloud-pubsub \ pympler \ objgraph RUN mkdir /oceanus/ COPY app /oceanus/app WORKDIR /oceanus/app CMD ["python3","memorytest.py"]
環境変数にGoogleの認証キー(json形式)等と下記が必要
TOPIC_NAMEはメッセージがたくさん来るやつを別途準備が必要。
GOOGLE_APPLICATION_CREDENTIALS=path/to/keyfile TOPIC_NAME=oceanus-combined SUBSCRIPTION_NAME=memory-test
pip listの結果は下記
% docker exec b75ef0aedd57 pip list --format=columns Package Version ---------------------------- --------- cachetools 2.0.0 certifi 2017.4.17 chardet 3.0.4 dill 0.2.6 future 0.16.0 gapic-google-cloud-pubsub-v1 0.15.4 google-api-python-client 1.6.2 google-auth 1.0.1 google-auth-httplib2 0.0.2 google-cloud-core 0.24.1 google-cloud-pubsub 0.25.0 google-gax 0.15.13 googleapis-common-protos 1.5.2 graphviz 0.7.1 grpc-google-iam-v1 0.11.1 grpcio 1.3.5 httplib2 0.10.3 idna 2.5 oauth2client 4.1.1 objgraph 3.1.0 pip 9.0.1 ply 3.8 proto-google-cloud-pubsub-v1 0.15.4 protobuf 3.3.0 pyasn1 0.2.3 pyasn1-modules 0.0.9 Pympler 0.5 requests 2.17.3 rsa 3.4.2 setuptools 36.0.1 six 1.10.0 uritemplate 3.0.0 urllib3 1.21.1 wheel 0.29.0
これをdocker runとかdocker-compose runで実行するとひたすら指定したトピックからメッセージ受け取って件数を表示しつづける。
Received 7 messages. Received 1 messages. Received 9 messages. Received 1 messages.
常に行っているの処理はmain()のwhile内、subscription.pullとsubscription.acknowledgeのみ。
pullはメッセージを引き出し、acknowledgeは無事受け取ったという通知を送る。
acknowledgeを送らないと、再送される。
docker psで名前かIDを確認し
% docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES c56258dcc9a2 projectoceanus_memorytest "python3 memorytes..." 15 minutes ago Up 15 minutes projectoceanus_memorytest_run_7
起動後、statsでメモリ使用量を見守る。
% docker stats projectoceanus_memorytest_run_7
#起動直後 CONTAINER CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS projectoceanus_memorytest_run_7 0.72% 40.43 MiB / 31.33 GiB 0.13% 448 kB / 143 kB 0 B / 0 B 1 #しばらくして CONTAINER CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS projectoceanus_memorytest_run_7 0.65% 45.1 MiB / 31.33 GiB 0.14% 25.4 MB / 12.6 MB 0 B / 0 B 1
これでメモリリークを再現できる最低限の環境ができた。
ローカルのDocker環境でも同じ現象が起きたので、原因がGKEではないことは分かる。
簡単に変えられるところを変えてみる
もしかしたら最新版だと解決している可能性もなくはないので、DockerfileのFROMを最新の3.6.1にして、推奨されていたslimに変更してみる。
OSとかパッケージを気軽に変更できるのもDockerのいいところ。
FROM python:3.6.1-slim
#起動直後 CONTAINER CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS projectoceanus_memorytest_run_9 0.83% 37.04 MiB / 31.33 GiB 0.12% 1.54 MB / 459 kB 0 B / 0 B 1 #しばらく CONTAINER CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS projectoceanus_memorytest_run_9 0.73% 38.16 MiB / 31.33 GiB 0.12% 6.12 MB / 3.2 MB 0 B / 0 B 1
slimのおかげで初期のメモリ使用量は3MBぐらい減ったけど、増加は治らなかった。
記事を書いていることで、NET I/OのINが5MB増えると、使用メモリが1MB弱増えるような関係が見えてきた。受け取ったデータの1/5がどっかに残っている?
あとsubscription.pull()の引数、return_immediatelyをFalseにしたけど変わらなかった。
メモリプロファイラーを使ってみる
開発が活発そうで、ドキュメントも分かりやすいPymplerを使ってみる。先述のDockerfileでインストール済み。
https://pythonhosted.org/Pympler/muppy.html
ドキュメントにそって色々出してみる
全オブジェクト数
whileの中に追記。
def main(self): subscription = self.topic.subscription(SUBSCRIPTION_NAME) while True: results = subscription.pull(return_immediately=True, max_messages=100) logger.debug('Received {} messages.'.format(len(results))) if results: subscription.acknowledge([ack_id for ack_id, message in results]) # 全オブジェクト数の変化を見守る all_objects = muppy.get_objects() logger.debug("all_objects len:{}".format(len(all_objects)))
めっちゃもりもり増えてく!
% sudo docker-compose run memorytest starting memory test Received 90 messages. all_objects len:84309 Received 4 messages. all_objects len:84321 Received 1 messages. all_objects len:84326 Received 21 messages. all_objects len:84391 Received 1 messages. all_objects len:84396 Received 8 messages. all_objects len:84422
summaryを使って、diffを取ってみる
def main(self): subscription = self.topic.subscription(SUBSCRIPTION_NAME) count = 0 while True: if count > 500: start_objects = muppy.get_objects() start_summary = summary.summarize(start_objects) results = subscription.pull(return_immediately=True, max_messages=100) logger.debug('Received {} messages.'.format(len(results))) count += len(results) if results: subscription.acknowledge([ack_id for ack_id, message in results]) # 全オブジェクト数の変化を見守る all_objects = muppy.get_objects() logger.debug("all_objects len:{}".format(len(all_objects))) if count > 1000: end_objects = muppy.get_objects() end_summary = summary.summarize(end_objects) summary_diff = summary.get_diff(start_summary, end_summary) summary.print_(summary_diff) return logger.debug("count:{}".format(count))
スタート直後はメモリリーク以外でも増加が激しそうなので、メッセージが500を超えたら一回スナップショットを取り、1000を超えたらdiffを表示して終了する。
検尿時に出始めを捨てるイメージ。
Received 100 messages. all_objects len:131491 count:861 Received 100 messages. all_objects len:143000 count:961 Received 5 messages. all_objects len:154224 count:966 Received 100 messages. all_objects len:165733 types | # objects | total size ============================================= | =========== | ============ <class 'list | 5095 | 3.15 MB <class 'str | 5191 | 388.79 KB <class 'int | 1023 | 27.97 KB <class 'tuple | 100 | 6.25 KB <class 'google.cloud.pubsub.message.Message | 100 | 5.47 KB function (wrap) | 0 | 0 B function (__mod__) | 0 | 0 B function (__ror__) | 0 | 0 B function (_is_dunder) | 0 | 0 B function (fromHexString) | 0 | 0 B function (_qencode) | 0 | 0 B function (_gt_from_le) | 0 | 0 B function (_auth_from_challenge) | 0 | 0 B function (header_store_parse) | 0 | 0 B function (_catch_errors) | 0 | 0 B
listが一番多く、google.cloud.pubsub.message.Messageも入ってる。この中身を見てみたい。
がんばって自分で差分を出してみる
def main(self): subscription = self.topic.subscription(SUBSCRIPTION_NAME) count = 0 while True: if count > 500: start_objects = muppy.get_objects() results = subscription.pull(return_immediately=True, max_messages=100) logger.debug('Received {} messages.'.format(len(results))) count += len(results) if results: subscription.acknowledge([ack_id for ack_id, message in results]) # 全オブジェクト数の変化を見守る all_objects = muppy.get_objects() logger.debug("all_objects len:{}".format(len(all_objects))) if count > 1000: end_objects = muppy.get_objects() end = set(["{}".format((id(o), type(o))) for o in end_objects]) start = set(["{}".format((id(o), type(o))) for o in start_objects]) diff = end - start logger.debug(diff) return logger.debug("count:{}".format(count))
muppy.get_objects()はオブジェクトをlistで返してくる。
そのままsetにしようかと思ったけど、エラーになる。
Traceback (most recent call last): File "memorytest.py", line 64, in <module> mt.main() File "memorytest.py", line 50, in main obj_diff = set(end_objects) - set(start_objects) TypeError: unhashable type: 'grpc._cython.cygrpc.Timespec'
オブジェクトを一旦idとtypeの文字列にして、更にsetして集合を引き算して出してみる。
Received 32 messages. all_objects len:87024 count:978 Received 100 messages. all_objects len:87327 {"(139814560669104, <class 'str'>)", "(139814560860744, <class 'tuple'>)", "(139814560609816, <class 'str'>)", "(139814560846304, <class 'str'>)", "(139814560687328, <class 'google.cloud.pubsub.message.Message'>)", "(139814560861832, <class 'tuple'>)", "(139814560687216, <class 'google.cloud.pubsub.message.Message'>)", "(139814560858888, <class 'tuple'>)", "(139814560780984, <class 'str'>)", "(139814560783792, <class 'str'>)", "(139814560860872, <class 'tuple'>)", "(139814560836072, <class 'google.cloud.pubsub.message.Message'>)", "(139814561298472, <class 'google.cloud.pubsub.message.Message'>)", "(139814560860424, <class 'tuple'>)", "(139814560834448, <class 'google.cloud.pubsub.message.Message'>)", "(139814560691976, <class 'tuple'>)", "(139814560834616, <class 'google.cloud.pubsub.message.Message'>)", "(139814560686152, <class 'google.cloud.pubsub.message.Message'>)", "(139814561300376, <class 'google.cloud.pubsub.message.Message'>)", "(139814560690824, <class 'tuple'>)", "(139814560835680, <class 'google.cloud.pubsub.message.Message'>)", "(139814560848896, <class 'str'>)", "(139814561300152, <class 'google.cloud.pubsub.message.Message'>)", "(139814560858376, <class 'tuple'>)", "(139814560691912, <class 'tuple'>)", "(139814560691400, <class 'tuple'>)", "(139814560861192, <class 'tuple'>)", "(139814560836744, <class 'google.cloud.pubsub.message.Message'>)", "(139814560782928, <class 'str'>)", "(139814560837584, <class 'google.cloud.pubsub.message.Message'>)", "(139814560782064, <class 'str'>)", "(139814560836576, <class 'google.cloud.pubsub.message.Message'>)", "(139814560836184, <class 'google.cloud.pubsub.message.Message'>)", "(139814560783360, <class 'str'>)", "(139814560861000, <class 'tuple'>)", "(139814560858632, <class 'tuple'>)", "(139814560814832, <class 'str'>)",
個別のidと、list、str、
idから中身を表示してみる
この記事を参考に
stackoverflow.com
ctypesのやつは、フリーズしたのでgc使ってる方を試したけど、どっちも途中でフリーズする模様。
CPUとメモリリミットをかけて起動したほうがいいかもしれない。
<google.cloud.pubsub.message.Message object at 0x7f952eda5908> <google.cloud.pubsub.message.Message object at 0x7f952eda5c88> ('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll_VlMSDWlWXGc2US8mc39od2pdGwcHRFp5WlgTJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f952e8edac8>) <google.cloud.pubsub.message.Message object at 0x7f952eda5a90> None ('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll_VlIbDWpWXGc2US8mc39od2pfEwIFQlV6Wl4cJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f952eda5d30>) ('QBJMNgxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRsLUxNRXHETShBuM1x1B1ENGHcuaX1iWhIIBEJZf1ZTEg5rVlxnNlEvJ3V8Y31sXhQABUNbd1pdM7Hoi_lDZic9XBJLLD5-PTlFQV4', <google.cloud.pubsub.message.Message object at 0x7f952eda5c18>) ('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll_VlIbDWFWXGc2US8mc39od2pfFAADQFR3XV4aJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f952eda5d68>) None <google.cloud.pubsub.message.Message object at 0x7f952eda5898> <google.cloud.pubsub.message.Message object at 0x7f952eda5c18> None ('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll_VlMSD2BWXGc2US8mc39od2pcFwYDQVV4WFsYJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f952eda5ba8>) ('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll_VlIbDWhWXGc2US8mc39od2pdGgYLTFZ2WV8fJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f952eda5cc0>) None <google.cloud.pubsub.message.Message object at 0x7f952eda5da0> <google.cloud.pubsub.message.Message object at 0x7f952eda5a20> None None None None None <google.cloud.pubsub.message.Message object at 0x7f952eda5ba8> None None <google.cloud.pubsub.message.Message object at 0x7f952eda59b0> <google.cloud.pubsub.message.Message object at 0x7f952eda5d30>
中身は鍵みたいな文字列と、google.cloud.pubsub.message.Messageオブジェクト、そしてそれが入ったタプルだった。
これはresultsに入っている ack_id, messageだろうと思われる。
次にこれをまだ参照しているものを探す。
refbrowser
def main(self): subscription = self.topic.subscription(SUBSCRIPTION_NAME) count = 0 while True: if count > 500: start_objects = muppy.get_objects() results = subscription.pull(return_immediately=True, max_messages=100) logger.debug('Received {} messages.'.format(len(results))) count += len(results) if results: subscription.acknowledge([ack_id for ack_id, message in results]) # 全オブジェクト数の変化を見守る all_objects = muppy.get_objects() logger.debug("all_objects len:{}".format(len(all_objects))) if count > 600: gc.collect() end_objects = muppy.get_objects() end = set([id(o) for o in end_objects]) start = set([id(o) for o in start_objects]) diff = end - start target_ids = list(diff)[0:10] root = [o for o in end_objects if id(o) in target_ids] cb = refbrowser.RefBrowser(root, maxdepth=3, str_func=output_func) logger.debug("tree:{}".format(cb.get_tree())) return logger.debug("count:{}".format(count))
こんなのでた
tree:['Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1wfCWBWXWc2US8mc39od2pcEAQLTFd2WlwbJdTci9AxZic9XBJLLD5-PTlFQV5A', 'Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1wfC2tWXWc2US8mc39od2pcFAkATFN9WlgfJdTci9AxZic9XBJLLD5-PTlFQV5A', 'Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1wfBWpWXGc2US8mc39od2pSFwAFQlt8W14aJdTci9AxZic9XBJLLD5-PTlFQV5A', 'Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1weDWhWXGc2US8mc39od2pcFwYDQFB2XFgfJdTci9AxZic9XBJLLD5-PTlFQV5A', 'Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1weDG9WXGc2US8mc39od2pTEgQKRFt3WFobJdTci9AxZic9XBJLLD5-PTlFQV5A', ('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1wfC25WXGc2US8mc39od2taEAEEQ1R2XVMYJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f32fdd01828>), ('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1wfCmhWXGc2US8mc39od2tZEwcAQVJ4X1gZJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f32fdd01908>), ('QBJMNgxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRsLUxNRXHETShBuM1x1B1ENGHcuaX1iWhIIBEJZfF9cHwVgVlxnNlEvJ3V8Y31qWhAGAUNQfllTM7Hoi_lDZic9XBJLLD5-PTlFQV4', <google.cloud.pubsub.message.Message object at 0x7f32fdd01d30>), <google.cloud.pubsub.message.Message object at 0x7f32fdd01400>, <google.cloud.pubsub.message.Message object at 0x7f32fdccf400>]
参照元を確認する
rootをrefbrowserで確認しようとしたら、使用メモリががんがん増えて32GBを使い果たし、ほぼフリーズ状態となってしまった。
そのため、rootを最初の1つだけに絞ってみたら、それでも1GB以上のメモリを使い、標準出力は大量の文字列が流れ続ける状態となった。
def main(self): subscription = self.topic.subscription(SUBSCRIPTION_NAME) count = 0 while True: if count > 100: start_objects = muppy.get_objects() results = subscription.pull(return_immediately=True, max_messages=100) logger.debug('Received {} messages.'.format(len(results))) count += len(results) if results: subscription.acknowledge([ack_id for ack_id, message in results]) # 全オブジェクト数の変化を見守る all_objects = muppy.get_objects() logger.debug("all_objects len:{}".format(len(all_objects))) if count > 200: gc.collect() end_objects = muppy.get_objects() end = set([id(o) for o in end_objects]) start = set([id(o) for o in start_objects]) diff = end - start # logger.debug("diff:{}".format(diff)) target_ids = list(diff)[0:10] roots = [o for o in end_objects if id(o) in target_ids] for i, root in enumerate(roots[:1]): logger.debug("root:{}".format(root)) # cb = refbrowser.RefBrowser(root, maxdepth=3, str_func=output_func) # logger.debug("tree:{}".format(cb.get_tree())) fb = refbrowser.FileBrowser(root, maxdepth=1, str_func=output_func) fb.print_tree("print_tree_{}.txt".format(i)) return logger.debug("count:{}".format(count))
出力を一部抜粋するとこんな感じ
Copyright (c) 1991-1995 Stichting Mathematisch Centrum, Amsterdam. All Rights Reserved., 'credits': Thanks to CWI, CNRI, BeOpen.com, Zope Corporation and a cast of thousands for supporting Python development. See www.python.org for more information., 'license': Type license() to see the full license text, 'help': Type help() for interactive help, or help(ob ject) for help about object.}, 'IMPORT_MAPPING': {'__builtin__': 'builtins', 'copy_reg': 'copyreg', 'Queue': 'queue', 'SocketServer': 'socketserver', 'ConfigParser': 'configparser', 'repr': 'reprlib', 'tkFileDialog': 'tkinter.filedialog', 'tkSimpleDialog': 'tkinter.simpledialog', 'tkColorChooser': 'tkinter.colorchooser', 'tkCommonDialog': 'tkinter.commondialog', 'Dialog': 'tkin ter.dialog', 'Tkdnd': 'tkinter.dnd', 'tkFont': 'tkinter.font', 'tkMessageBox': 'tkinter.messagebox', 'ScrolledText': 'tkinter.scrolledtext', 'Tkconstants': 'tkinter.constants', 'Tix': 'tkint er.tix', 'ttk': 'tkinter.ttk', 'Tkinter': 'tkinter', 'markupbase': '_markupbase', '_winreg': 'winreg', 'thread': '_thread', 'dummy_thread': '_dummy_thread', 'dbhash': 'dbm.bsd', 'dumbdbm': ' dbm.dumb', 'dbm': 'dbm.ndbm', 'gdbm': 'dbm.gnu', 'xmlrpclib': 'xmlrpc.client', 'SimpleXMLRPCServer': 'xmlrpc.server', 'httplib': 'http.client', 'htmlentitydefs': 'html.entities', 'HTMLParser ': 'html.parser', 'Cookie': 'http.cookies', 'cookielib': 'http.cookiejar', 'BaseHTTPServer': 'http.server', 'test.test_support': 'test.support', 'commands': 'subprocess', 'urlparse': 'urllib .parse', 'robotparser': 'urllib.robotparser', 'urllib2': 'urllib.request', 'anydbm': 'dbm', '_abcoll': 'collections.abc', 'cPickle': 'pickle', '_elementtree': 'xml.etree.ElementTree', 'FileD ialog': 'tkinter.filedialog', 'SimpleDialog': 'tkinter.simpledialog', 'DocXMLRPCServer': 'xmlrpc.server', 'SimpleHTTPServer': 'http.server', 'CGIHTTPServer': 'http.server', 'UserDict': 'coll ections', 'UserList': 'collections', 'UserString': 'collections', 'whichdb': 'dbm', 'StringIO': 'io', 'cStringIO': 'io'}, 'NAME_MAPPING': {('__builtin__', 'xrange'): ('builtins', 'range'), ( '__builtin__', 'reduce'): ('functools', 'reduce'), ('__builtin__', 'intern'): ('sys', 'intern'), ('__builtin__', 'unichr'): ('builtins', 'chr'), ('__builtin__', 'unicode'): ('builtins', 'str '), ('__builtin__', 'long'): ('builtins', 'int'), ('itertools', 'izip'): ('builtins', 'zip'), ('itertools', 'imap'): ('builtins', 'map'), ('itertools', 'ifilter'): ('builtins', 'filter'), (' itertools', 'ifilterfalse'): ('itertools', 'filterfalse'), ('itertools', 'izip_longest'): ('itertools', 'zip_longest'), ('UserDict', 'IterableUserDict'): ('collections', 'UserDict'), ('UserL ist', 'UserList'): ('collections', 'UserList'), ('UserString', 'UserString'): ('collections', 'UserString'), ('whichdb', 'whichdb'): ('dbm', 'whichdb'), ('_socket', 'fromfd'): ('socket', 'fr omfd'), ('_multiprocessing', 'Connection'): ('multiprocessing.connection', 'Connection'), ('multiprocessing.process', 'Process'): ('multiprocessing.context', 'Process'), ('multiprocessing.fo rking', 'Popen'): ('multiprocessing.popen_fork', 'Popen'), ('urllib', 'ContentTooShortError'): ('urllib.error', 'ContentTooShortError'), ('urllib', 'getproxies'): ('urllib.request', 'getprox ies'), ('urllib', 'pathname2url'): ('urllib.request', 'pathname2url'), ('urllib', 'quote_plus'): ('urllib.parse', 'quote_plus'), ('urllib', 'quote'): ('urllib.parse', 'quote'), ('urllib', 'u nquote_plus'): ('urllib.parse', 'unquote_plus'), ('urllib', 'unquote'): ('urllib.parse', 'unquote'), ('urllib', 'url2pathname'): ('urllib.request', 'url2pathname'), ('urllib', 'urlcleanup'): ('urllib.request', 'urlcleanup'), ('urllib', 'urlencode'): ('urllib.parse', 'urlencode'), ('urllib', 'urlopen'): ('urllib.request', 'urlopen'), ('urllib', 'urlretrieve'): ('urllib.request', 'urlretrieve'), ('urllib2', 'HTTPError'): ('urllib.error', 'HTTPError'), ('urllib2', 'URLError'): ('urllib.error', 'URLError'), ('exceptions', 'ArithmeticError'): ('builtins', 'ArithmeticEr ror'), ('exceptions', 'AssertionError'): ('builtins', 'AssertionError'), ('exceptions', 'AttributeError'): ('builtins', 'AttributeError'), ('exceptions', 'BaseException'): ('builtins', 'Base Exception'), ('exceptions', 'BufferError'): ('builtins', 'BufferError'), ('exceptions', 'BytesWarning'): ('builtins', 'BytesWarning'), ('exceptions', 'DeprecationWarning'): ('builtins', 'Dep recationWarning'), ('exceptions', 'EOFError'): ('builtins', 'EOFError'), ('exceptions', 'EnvironmentError'): ('builtins', 'EnvironmentError'), ('exceptions', 'Exception'): ('builtins', 'Exce ption'), ('exceptions', 'FloatingPointError'): ('builtins', 'FloatingPointError'), ('exceptions', 'FutureWarning'): ('builtins', 'FutureWarning'), ('exceptions', 'GeneratorExit'): ('builtins ', 'GeneratorExit'), ('exceptions', 'IOError'): ('builtins', 'IOError'), ('exceptions', 'ImportError'): ('builtins', 'ImportError'), ('exceptions', 'ImportWarning'): ('builtins', 'ImportWarn ing'), ('exceptions', 'IndentationError'): ('builtins', 'IndentationError'), ('exceptions', 'IndexError'): ('builtins', 'IndexError'), ('exceptions', 'KeyError'):
詳しく確認することができないので、原因となっているgoogle.cloud.pubsub.message.Messageあたりを調べてみる
ひたすらdelしてみる
検索していたら下記記事を見つけた。
grpcio==1.0.2で治っているという記述がある。今回は1.3.5を使っているので別の問題っぽい。
まだ試してない方法があったので念のため試してみる
import atexit from google.cloud import pubsub client = pubsub.Client() for topic in client.list_topics(): print topic.name def cleanup(): global client, topic del client, topic atexit.register(cleanup)
とりあえず、clientやその派生物はすべてループ内で作成し、最後にdelしてみた
gc.collect()はなくても良いかもしれないけど、付けると使用メモリ20MBぐらい減るので残している。
def main(self): while True: pubsub_client = pubsub.Client() topic = pubsub_client.topic(TOPIC_NAME) subscription = topic.subscription(SUBSCRIPTION_NAME) results = subscription.pull(return_immediately=True, max_messages=100) logger.debug('Received {} messages.'.format(len(results))) if results: subscription.acknowledge([ack_id for ack_id, message in results]) del pubsub_client, topic, subscription, results gc.collect()
これで動かしながらお昼ごはんを食べに行くことにする
1時間後
# 起動直後 CONTAINER CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS fe0f61bfdf3c 12.14% 47.45 MiB / 31.33 GiB 0.15% 3.63 MB / 958 kB 0 B / 0 B 1 # 約1時間後 CONTAINER CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS fe0f61bfdf3c 19.14% 58.23 MiB / 31.33 GiB 0.18% 274 MB / 74.3 MB 0 B / 0 B 1
増える量は減ったけど、まだ増えていた。
試しにpubsub自体もdelして、毎回importするようにしてみる。
def main(self): while True: from google.cloud import pubsub pubsub_client = pubsub.Client() topic = pubsub_client.topic(TOPIC_NAME) subscription = topic.subscription(SUBSCRIPTION_NAME) results = subscription.pull(return_immediately=True, max_messages=100) logger.debug('Received {} messages.'.format(len(results))) if results: subscription.acknowledge([ack_id for ack_id, message in results]) del pubsub, pubsub_client, topic, subscription, results gc.collect()
# 起動直後 CONTAINER CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS projectoceanus_memorytest_run_45 8.46% 36.29 MiB / 31.33 GiB 0.11% 933 kB / 273 kB 0 B / 0 B # 約10分後 CONTAINER CPU % MEM USAGE / LIMIT MEM % NET I/O BLOCK I/O PIDS projectoceanus_memorytest_run_45 10.51% 43.46 MiB / 31.33 GiB 0.14% 29.2 MB / 8.42 MB 0 B / 0 B
まだ増えてる
再度diffを取る
いろいろdelしてすっきりしたかもしれないので、再度diffを取って、中身を確認する
def main(self): count = 0 while True: from google.cloud import pubsub pubsub_client = pubsub.Client() topic = pubsub_client.topic(TOPIC_NAME) subscription = topic.subscription(SUBSCRIPTION_NAME) results = subscription.pull(return_immediately=True, max_messages=100) count += len(results) logger.debug('Received {} messages.'.format(len(results))) if results: subscription.acknowledge([ack_id for ack_id, message in results]) del pubsub, pubsub_client, topic, subscription, results gc.collect() if count > 100: start_objects = muppy.get_objects() if count > 199: end_objects = muppy.get_objects() end = set([id(o) for o in end_objects]) start = set([id(o) for o in start_objects]) diff = end - start logger.debug("diff:{}".format(diff)) logger.debug("len:{}".format(len(diff))) for id_ in diff: logger.debug("obj:{}, len:{}".format(type(objects_by_id(id_)), len(objects_by_id(id_)))) return
何回やっても差分が出るのは、listのみとなった。
Received 100 messages. Received 2 messages. Received 5 messages. Received 25 messages. Received 100 messages. diff:{140130552113352} len:1 obj:<class 'list'>, len:83834
しかも、83834とでかい。
参照元を探そうとするとメモリを食ってたのはこれのせいっぽい。
このリスト内で循環参照してたりするんだろうか。
pubusbクライアントだけなら何とか辿れそうだったけど、gRPCとかもっと深いところにいくと私には理解できる気がしない。
2017/6/22追記
gcpugのslackチャンネルに質問したら、Googleのトレーニングでおなじみのsinmetalさんから、stackoverflowで質問してみては?と回答を頂く。
Ask Technical Questions on Stack Exchange Sites | Support | Google Cloud Platform
改めてstackoverflow内で検索してみたら、同じような現象を発見。
batchというオブジェクトがメッセージのIDをずっと増やして持ち続けることが原因で、batch.commit()をすれば良いとのことだったが、今回の処理ではbatchが見当たらない。
topicにbatchを返すメソッドがあったので、呼び出して叩いてみるけど、batchを新規作成しているので意味がなかった。
そこで、昔ながらの使い捨てプロセスを思いついたので実装してみる。
実装して気づいたけど、受け取ったメッセージによって、スプレッドシートに書き込んだり、BigQueryスキャンしたり、メール送ったりと、いろいろとやることが多く、今後そこでもメモリリークが発生する可能性が高いので、結構正解な気がしてきた。
遠回りもしたけれど、もうプロセスを定期的に使い捨てしちゃえ作戦
プロセスを使い捨てにすることで、OSでメモリリークごときれいにしてもらう作戦。
プロセスの生成には標準ライブラリの下記を使う。
from multiprocessing import Process
処理を一つの関数にしてtargetに渡す。今回はtaskという名前。
if __name__ == '__main__': while True: p = Process(target=task, args=(TOPIC_NAME, SUBSCRIPTION_NAME)) p.start() p.join()
task内では、なんかしらの回数をカウントアップしていって、適当に決めた基準を超えたらループから抜けるようにしておく。
そうすれば、またProcessが新規作成されて動き始め、メモリリークは一緒に消える。
もちろんメモリリークは根本的に解決していない。
彼の言葉を思い出す。
I'm not a real programmer. I throw together things until it works then I move on. The real programmers will say "Yeah it works but you're leaking memory everywhere. Perhaps we should fix that." I’ll just restart Apache every 10 requests.
しばらくテストして問題なさそうだったら、gitにコミットする。