仕事中の問題と解決メモ。

最近はPythonとGoogle Cloud Platformがメイン。株式会社ビズオーシャンで企画と開発運用、データ活用とか。https://github.com/uyamazak/

Google Cloud Pub/SubのPythonクライアントがメモリリークするので調査して解決?

要約

  • Google Pub/Subクライアントを使い、ずっと動かし続けてたらメモリリークが見つかった
  • 調査したらクライアント自体のバグか仕様だった。そもそも動かし続けるように作られてない
  • multiprocessingのProcessを使い、別プロセスで動かし、メッセージを一定数ごと受け取ったらそのプロセスごと再起動するようにした



oceanusのpubsubからデータを受け取り処理を行うrevelationで、pubusbをRedisからGoogle Cloud Pub/Subにしたところ、メモリリークが発生した。

データはbizoceanのイベントログが送られてきて、一つ数KB、~10メッセージ/秒が来る。

起動時は50MBだけど1時間に数MB増え、1週間で数百MBになり、他のPODにも影響してしまう。

本番のKubernetes(GKE)では下記コマンドでメモリの増加を確認。

% kubectl top pod
NAME                                 CPU(cores)   MEMORY(bytes)
revelation-1694192843-16c09          4m           74Mi

環境作成

調査のためにrevelationから原因と思われるCloud Pub/Sub以外の処理を削除したスクリプトを作成してローカルで動かす。

memorytest.py

#!/usr/bin/env python
import logging
from os import environ
from google.cloud import pubsub
from subscriber import (list_subscriptions,
                        create_subscription)

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(getattr(logging, "DEBUG"))

TOPIC_NAME = environ.get("TOPIC_NAME")
SUBSCRIPTION_NAME = environ.get("SUBSCRIPTION_NAME")


class MemoryTest:

    def __init__(self):
        self.pubsub_client = pubsub.Client()
        self.topic = self.pubsub_client.topic(TOPIC_NAME)

        if SUBSCRIPTION_NAME not in list_subscriptions(TOPIC_NAME):
            create_subscription(TOPIC_NAME,
                                SUBSCRIPTION_NAME)
            logger.debug("create_subscription. "
                         "topic:{} "
                         "sub:{}".format(TOPIC_NAME,
                                         SUBSCRIPTION_NAME))

    def main(self):
        subscription = self.topic.subscription(SUBSCRIPTION_NAME)
        while True:
            results = subscription.pull(return_immediately=True, max_messages=100)
            logger.debug('Received {} messages.'.format(len(results)))

            if results:
                subscription.acknowledge([ack_id for ack_id, message in results])


if __name__ == '__main__':
    logger.info("starting memory test")
    mt = MemoryTest()
    mt.main()

実行はDockerを使う。

こちらも、本番で使っているDockerfileから不要なものを削る。

Dockerfile

FROM python:3.6

RUN pip install --upgrade -q \
  pip \
  google-api-python-client \
  google-cloud-pubsub \
  pympler \
  objgraph

RUN mkdir /oceanus/
COPY app /oceanus/app
WORKDIR /oceanus/app

CMD ["python3","memorytest.py"]

環境変数Googleの認証キー(json形式)等と下記が必要
TOPIC_NAMEはメッセージがたくさん来るやつを別途準備が必要。

GOOGLE_APPLICATION_CREDENTIALS=path/to/keyfile
TOPIC_NAME=oceanus-combined
SUBSCRIPTION_NAME=memory-test


pip listの結果は下記

% docker exec b75ef0aedd57 pip list --format=columns
Package                      Version
---------------------------- ---------
cachetools                   2.0.0
certifi                      2017.4.17
chardet                      3.0.4
dill                         0.2.6
future                       0.16.0
gapic-google-cloud-pubsub-v1 0.15.4
google-api-python-client     1.6.2
google-auth                  1.0.1
google-auth-httplib2         0.0.2
google-cloud-core            0.24.1
google-cloud-pubsub          0.25.0
google-gax                   0.15.13
googleapis-common-protos     1.5.2
graphviz                     0.7.1
grpc-google-iam-v1           0.11.1
grpcio                       1.3.5
httplib2                     0.10.3
idna                         2.5
oauth2client                 4.1.1
objgraph                     3.1.0
pip                          9.0.1
ply                          3.8
proto-google-cloud-pubsub-v1 0.15.4
protobuf                     3.3.0
pyasn1                       0.2.3
pyasn1-modules               0.0.9
Pympler                      0.5
requests                     2.17.3
rsa                          3.4.2
setuptools                   36.0.1
six                          1.10.0
uritemplate                  3.0.0
urllib3                      1.21.1
wheel                        0.29.0


これをdocker runとかdocker-compose runで実行するとひたすら指定したトピックからメッセージ受け取って件数を表示しつづける。

Received 7 messages.
Received 1 messages.
Received 9 messages.
Received 1 messages.

常に行っているの処理はmain()のwhile内、subscription.pullとsubscription.acknowledgeのみ。

pullはメッセージを引き出し、acknowledgeは無事受け取ったという通知を送る。
acknowledgeを送らないと、再送される。

docker psで名前かIDを確認し

% docker ps
CONTAINER ID        IMAGE                              COMMAND                  CREATED             STATUS              PORTS                                                   NAMES
c56258dcc9a2        projectoceanus_memorytest          "python3 memorytes..."   15 minutes ago      Up 15 minutes                                                               projectoceanus_memorytest_run_7

起動後、statsでメモリ使用量を見守る。

% docker stats projectoceanus_memorytest_run_7 
#起動直後
CONTAINER                         CPU %               MEM USAGE / LIMIT       MEM %               NET I/O             BLOCK I/O           PIDS
projectoceanus_memorytest_run_7   0.72%               40.43 MiB / 31.33 GiB   0.13%               448 kB / 143 kB     0 B / 0 B           1

#しばらくして
CONTAINER                         CPU %               MEM USAGE / LIMIT      MEM %               NET I/O             BLOCK I/O           PIDS
projectoceanus_memorytest_run_7   0.65%               45.1 MiB / 31.33 GiB   0.14%               25.4 MB / 12.6 MB   0 B / 0 B           1

これでメモリリークを再現できる最低限の環境ができた。

ローカルのDocker環境でも同じ現象が起きたので、原因がGKEではないことは分かる。

簡単に変えられるところを変えてみる

もしかしたら最新版だと解決している可能性もなくはないので、DockerfileのFROMを最新の3.6.1にして、推奨されていたslimに変更してみる。

OSとかパッケージを気軽に変更できるのもDockerのいいところ。

FROM python:3.6.1-slim
#起動直後
CONTAINER                         CPU %               MEM USAGE / LIMIT       MEM %               NET I/O             BLOCK I/O           PIDS
projectoceanus_memorytest_run_9   0.83%               37.04 MiB / 31.33 GiB   0.12%               1.54 MB / 459 kB    0 B / 0 B           1

#しばらく
CONTAINER                         CPU %               MEM USAGE / LIMIT       MEM %               NET I/O             BLOCK I/O           PIDS
projectoceanus_memorytest_run_9   0.73%               38.16 MiB / 31.33 GiB   0.12%               6.12 MB / 3.2 MB    0 B / 0 B           1

slimのおかげで初期のメモリ使用量は3MBぐらい減ったけど、増加は治らなかった。

記事を書いていることで、NET I/OのINが5MB増えると、使用メモリが1MB弱増えるような関係が見えてきた。受け取ったデータの1/5がどっかに残っている?

あとsubscription.pull()の引数、return_immediatelyをFalseにしたけど変わらなかった。

メモリプロファイラーを使ってみる

開発が活発そうで、ドキュメントも分かりやすいPymplerを使ってみる。先述のDockerfileでインストール済み。

https://pythonhosted.org/Pympler/muppy.html

ドキュメントにそって色々出してみる

全オブジェクト数

whileの中に追記。

      def main(self):
          subscription = self.topic.subscription(SUBSCRIPTION_NAME)
          while True:
              results = subscription.pull(return_immediately=True, max_messages=100)
              logger.debug('Received {} messages.'.format(len(results)))

              if results:
                  subscription.acknowledge([ack_id for ack_id, message in results])

              # 全オブジェクト数の変化を見守る
              all_objects = muppy.get_objects()
              logger.debug("all_objects len:{}".format(len(all_objects)))

めっちゃもりもり増えてく!

% sudo docker-compose run memorytest
starting memory test
Received 90 messages.
all_objects len:84309
Received 4 messages.
all_objects len:84321
Received 1 messages.
all_objects len:84326
Received 21 messages.
all_objects len:84391
Received 1 messages.
all_objects len:84396
Received 8 messages.
all_objects len:84422

summaryを使って、diffを取ってみる

      def main(self):
          subscription = self.topic.subscription(SUBSCRIPTION_NAME)
          count = 0
          while True:
              if count > 500:
                  start_objects = muppy.get_objects()
                  start_summary = summary.summarize(start_objects)
              results = subscription.pull(return_immediately=True, max_messages=100)
              logger.debug('Received {} messages.'.format(len(results)))
              count += len(results)
              if results:
                  subscription.acknowledge([ack_id for ack_id, message in results])

              # 全オブジェクト数の変化を見守る
              all_objects = muppy.get_objects()
              logger.debug("all_objects len:{}".format(len(all_objects)))
              if count > 1000:
                  end_objects = muppy.get_objects()
                  end_summary = summary.summarize(end_objects)
                  summary_diff = summary.get_diff(start_summary, end_summary)
                  summary.print_(summary_diff)
                  return
              logger.debug("count:{}".format(count))

スタート直後はメモリリーク以外でも増加が激しそうなので、メッセージが500を超えたら一回スナップショットを取り、1000を超えたらdiffを表示して終了する。

検尿時に出始めを捨てるイメージ。

Received 100 messages.
all_objects len:131491
count:861
Received 100 messages.
all_objects len:143000
count:961
Received 5 messages.
all_objects len:154224
count:966
Received 100 messages.
all_objects len:165733

                                        types |   # objects |   total size
============================================= | =========== | ============
                                 <class 'list |        5095 |      3.15 MB
                                  <class 'str |        5191 |    388.79 KB
                                  <class 'int |        1023 |     27.97 KB
                                <class 'tuple |         100 |      6.25 KB
  <class 'google.cloud.pubsub.message.Message |         100 |      5.47 KB
                              function (wrap) |           0 |      0     B
                           function (__mod__) |           0 |      0     B
                           function (__ror__) |           0 |      0     B
                        function (_is_dunder) |           0 |      0     B
                     function (fromHexString) |           0 |      0     B
                          function (_qencode) |           0 |      0     B
                       function (_gt_from_le) |           0 |      0     B
              function (_auth_from_challenge) |           0 |      0     B
                function (header_store_parse) |           0 |      0     B
                     function (_catch_errors) |           0 |      0     B

listが一番多く、google.cloud.pubsub.message.Messageも入ってる。この中身を見てみたい。


がんばって自分で差分を出してみる

    def main(self):
          subscription = self.topic.subscription(SUBSCRIPTION_NAME)
          count = 0
          while True:
              if count > 500:
                  start_objects = muppy.get_objects()
              results = subscription.pull(return_immediately=True, max_messages=100)
              logger.debug('Received {} messages.'.format(len(results)))
              count += len(results)
              if results:
                  subscription.acknowledge([ack_id for ack_id, message in results])

              # 全オブジェクト数の変化を見守る
              all_objects = muppy.get_objects()
              logger.debug("all_objects len:{}".format(len(all_objects)))
              if count > 1000:
                  end_objects = muppy.get_objects()
                  end = set(["{}".format((id(o), type(o))) for o in end_objects])
                  start = set(["{}".format((id(o), type(o))) for o in start_objects])
                  diff = end - start
                  logger.debug(diff)

                  return
              logger.debug("count:{}".format(count))

muppy.get_objects()はオブジェクトをlistで返してくる。
そのままsetにしようかと思ったけど、エラーになる。

Traceback (most recent call last):
  File "memorytest.py", line 64, in <module>
    mt.main()
  File "memorytest.py", line 50, in main
    obj_diff = set(end_objects) - set(start_objects)
TypeError: unhashable type: 'grpc._cython.cygrpc.Timespec'


オブジェクトを一旦idとtypeの文字列にして、更にsetして集合を引き算して出してみる。

Received 32 messages.
all_objects len:87024
count:978
Received 100 messages.
all_objects len:87327
{"(139814560669104, <class 'str'>)", "(139814560860744, <class 'tuple'>)", "(139814560609816, <class 'str'>)", "(139814560846304, <class 'str'>)", "(139814560687328, <class 'google.cloud.pubsub.message.Message'>)", "(139814560861832, <class 'tuple'>)", "(139814560687216, <class 'google.cloud.pubsub.message.Message'>)", "(139814560858888, <class 'tuple'>)", "(139814560780984, <class 'str'>)", "(139814560783792, <class 'str'>)", "(139814560860872, <class 'tuple'>)", "(139814560836072, <class 'google.cloud.pubsub.message.Message'>)", "(139814561298472, <class 'google.cloud.pubsub.message.Message'>)", "(139814560860424, <class 'tuple'>)", "(139814560834448, <class 'google.cloud.pubsub.message.Message'>)", "(139814560691976, <class 'tuple'>)", "(139814560834616, <class 'google.cloud.pubsub.message.Message'>)", "(139814560686152, <class 'google.cloud.pubsub.message.Message'>)", "(139814561300376, <class 'google.cloud.pubsub.message.Message'>)", "(139814560690824, <class 'tuple'>)", "(139814560835680, <class 'google.cloud.pubsub.message.Message'>)", "(139814560848896, <class 'str'>)", "(139814561300152, <class 'google.cloud.pubsub.message.Message'>)", "(139814560858376, <class 'tuple'>)", "(139814560691912, <class 'tuple'>)", "(139814560691400, <class 'tuple'>)", "(139814560861192, <class 'tuple'>)", "(139814560836744, <class 'google.cloud.pubsub.message.Message'>)", "(139814560782928, <class 'str'>)", "(139814560837584, <class 'google.cloud.pubsub.message.Message'>)", "(139814560782064, <class 'str'>)", "(139814560836576, <class 'google.cloud.pubsub.message.Message'>)", "(139814560836184, <class 'google.cloud.pubsub.message.Message'>)", "(139814560783360, <class 'str'>)", "(139814560861000, <class 'tuple'>)", "(139814560858632, <class 'tuple'>)", "(139814560814832, <class 'str'>)",

個別のidと、list、str、等さっきと同じようなtypeが見れた。

idから中身を表示してみる

この記事を参考に
stackoverflow.com

ctypesのやつは、フリーズしたのでgc使ってる方を試したけど、どっちも途中でフリーズする模様。
CPUとメモリリミットをかけて起動したほうがいいかもしれない。

<google.cloud.pubsub.message.Message object at 0x7f952eda5908>
<google.cloud.pubsub.message.Message object at 0x7f952eda5c88>
('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll_VlMSDWlWXGc2US8mc39od2pdGwcHRFp5WlgTJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f952e8edac8>)
<google.cloud.pubsub.message.Message object at 0x7f952eda5a90>
None
('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll_VlIbDWpWXGc2US8mc39od2pfEwIFQlV6Wl4cJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f952eda5d30>)
('QBJMNgxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRsLUxNRXHETShBuM1x1B1ENGHcuaX1iWhIIBEJZf1ZTEg5rVlxnNlEvJ3V8Y31sXhQABUNbd1pdM7Hoi_lDZic9XBJLLD5-PTlFQV4', <google.cloud.pubsub.message.Message object at 0x7f952eda5c18>)
('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll_VlIbDWFWXGc2US8mc39od2pfFAADQFR3XV4aJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f952eda5d68>)
None
<google.cloud.pubsub.message.Message object at 0x7f952eda5898>
<google.cloud.pubsub.message.Message object at 0x7f952eda5c18>
None
('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll_VlMSD2BWXGc2US8mc39od2pcFwYDQVV4WFsYJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f952eda5ba8>)
('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll_VlIbDWhWXGc2US8mc39od2pdGgYLTFZ2WV8fJdTci9AxZic9XBJLLD5-PTlFQV5A', <google.cloud.pubsub.message.Message object at 0x7f952eda5cc0>)
None
<google.cloud.pubsub.message.Message object at 0x7f952eda5da0>
<google.cloud.pubsub.message.Message object at 0x7f952eda5a20>
None
None
None
None
None
<google.cloud.pubsub.message.Message object at 0x7f952eda5ba8>
None
None
<google.cloud.pubsub.message.Message object at 0x7f952eda59b0>
<google.cloud.pubsub.message.Message object at 0x7f952eda5d30>

中身は鍵みたいな文字列と、google.cloud.pubsub.message.Messageオブジェクト、そしてそれが入ったタプルだった。

これはresultsに入っている ack_id, messageだろうと思われる。

次にこれをまだ参照しているものを探す。

refbrowser

      def main(self):
          subscription = self.topic.subscription(SUBSCRIPTION_NAME)
          count = 0
          while True:
              if count > 500:
                  start_objects = muppy.get_objects()
              results = subscription.pull(return_immediately=True, max_messages=100)
              logger.debug('Received {} messages.'.format(len(results)))
              count += len(results)
              if results:
                  subscription.acknowledge([ack_id for ack_id, message in results])

              # 全オブジェクト数の変化を見守る
              all_objects = muppy.get_objects()
              logger.debug("all_objects len:{}".format(len(all_objects)))
              if count > 600:
                  gc.collect()
                  end_objects = muppy.get_objects()
                  end = set([id(o) for o in end_objects])
                  start = set([id(o) for o in start_objects])
                  diff = end - start
                  target_ids = list(diff)[0:10]
                  root = [o for o in end_objects if id(o) in target_ids]
                  cb = refbrowser.RefBrowser(root, maxdepth=3, str_func=output_func)
                  logger.debug("tree:{}".format(cb.get_tree()))
                  return
              logger.debug("count:{}".format(count))

こんなのでた

tree:['Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1wfCWBWXWc2US8mc39od2pcEAQLTFd2WlwbJdTci9AxZic9XBJLLD5-PTlFQV5A', 
      'Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1wfC2tWXWc2US8mc39od2pcFAkATFN9WlgfJdTci9AxZic9XBJLLD5-PTlFQV5A', 
      'Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1wfBWpWXGc2US8mc39od2pSFwAFQlt8W14aJdTci9AxZic9XBJLLD5-PTlFQV5A', 
      'Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1weDWhWXGc2US8mc39od2pcFwYDQFB2XFgfJdTci9AxZic9XBJLLD5-PTlFQV5A', 
      'Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1weDG9WXGc2US8mc39od2pTEgQKRFt3WFobJdTci9AxZic9XBJLLD5-PTlFQV5A', 
      ('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1wfC25WXGc2US8mc39od2taEAEEQ1R2XVMYJdTci9AxZic9XBJLLD5-PTlFQV5A', 
      <google.cloud.pubsub.message.Message object at 0x7f32fdd01828>), 
      ('Ekw2DERJUytDCypYEU4EISE-MD5FU0RQBhYsXUZIUTcZCGhRDk9eIz81IChFGwtTE1FccRNKEG4zXHUHUQ0Ydy5pfWJaEggEQll8X1wfCmhWXGc2US8mc39od2tZEwcAQVJ4X1gZJdTci9AxZic9XBJLLD5-PTlFQV5A', 
      <google.cloud.pubsub.message.Message object at 0x7f32fdd01908>), 
      ('QBJMNgxESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRsLUxNRXHETShBuM1x1B1ENGHcuaX1iWhIIBEJZfF9cHwVgVlxnNlEvJ3V8Y31qWhAGAUNQfllTM7Hoi_lDZic9XBJLLD5-PTlFQV4', 
      <google.cloud.pubsub.message.Message object at 0x7f32fdd01d30>), 
      <google.cloud.pubsub.message.Message object at 0x7f32fdd01400>, 
      <google.cloud.pubsub.message.Message object at 0x7f32fdccf400>]

参照元を確認する

rootをrefbrowserで確認しようとしたら、使用メモリががんがん増えて32GBを使い果たし、ほぼフリーズ状態となってしまった。

そのため、rootを最初の1つだけに絞ってみたら、それでも1GB以上のメモリを使い、標準出力は大量の文字列が流れ続ける状態となった。

      def main(self):
          subscription = self.topic.subscription(SUBSCRIPTION_NAME)
          count = 0
          while True:
              if count > 100:
                  start_objects = muppy.get_objects()
              results = subscription.pull(return_immediately=True, max_messages=100)
              logger.debug('Received {} messages.'.format(len(results)))
              count += len(results)
              if results:
                  subscription.acknowledge([ack_id for ack_id, message in results])

              # 全オブジェクト数の変化を見守る
              all_objects = muppy.get_objects()
              logger.debug("all_objects len:{}".format(len(all_objects)))
              if count > 200:
                  gc.collect()
                  end_objects = muppy.get_objects()
                  end = set([id(o) for o in end_objects])
                  start = set([id(o) for o in start_objects])
                  diff = end - start
                  # logger.debug("diff:{}".format(diff))
                  target_ids = list(diff)[0:10]
                  roots = [o for o in end_objects if id(o) in target_ids]
                  for i, root in enumerate(roots[:1]):
                      logger.debug("root:{}".format(root))
                      # cb = refbrowser.RefBrowser(root, maxdepth=3, str_func=output_func)
                      # logger.debug("tree:{}".format(cb.get_tree()))
                      fb = refbrowser.FileBrowser(root, maxdepth=1, str_func=output_func)
                      fb.print_tree("print_tree_{}.txt".format(i))
                  return
              logger.debug("count:{}".format(count))

出力を一部抜粋するとこんな感じ

Copyright (c) 1991-1995 Stichting Mathematisch Centrum, Amsterdam.
All Rights Reserved., 'credits':     Thanks to CWI, CNRI, BeOpen.com, Zope Corporation and a cast of thousands
    for supporting Python development.  See www.python.org for more information., 'license': Type license() to see the full license text, 'help': Type help() for interactive help, or help(ob
ject) for help about object.}, 'IMPORT_MAPPING': {'__builtin__': 'builtins', 'copy_reg': 'copyreg', 'Queue': 'queue', 'SocketServer': 'socketserver', 'ConfigParser': 'configparser', 'repr':
'reprlib', 'tkFileDialog': 'tkinter.filedialog', 'tkSimpleDialog': 'tkinter.simpledialog', 'tkColorChooser': 'tkinter.colorchooser', 'tkCommonDialog': 'tkinter.commondialog', 'Dialog': 'tkin
ter.dialog', 'Tkdnd': 'tkinter.dnd', 'tkFont': 'tkinter.font', 'tkMessageBox': 'tkinter.messagebox', 'ScrolledText': 'tkinter.scrolledtext', 'Tkconstants': 'tkinter.constants', 'Tix': 'tkint
er.tix', 'ttk': 'tkinter.ttk', 'Tkinter': 'tkinter', 'markupbase': '_markupbase', '_winreg': 'winreg', 'thread': '_thread', 'dummy_thread': '_dummy_thread', 'dbhash': 'dbm.bsd', 'dumbdbm': '
dbm.dumb', 'dbm': 'dbm.ndbm', 'gdbm': 'dbm.gnu', 'xmlrpclib': 'xmlrpc.client', 'SimpleXMLRPCServer': 'xmlrpc.server', 'httplib': 'http.client', 'htmlentitydefs': 'html.entities', 'HTMLParser
': 'html.parser', 'Cookie': 'http.cookies', 'cookielib': 'http.cookiejar', 'BaseHTTPServer': 'http.server', 'test.test_support': 'test.support', 'commands': 'subprocess', 'urlparse': 'urllib
.parse', 'robotparser': 'urllib.robotparser', 'urllib2': 'urllib.request', 'anydbm': 'dbm', '_abcoll': 'collections.abc', 'cPickle': 'pickle', '_elementtree': 'xml.etree.ElementTree', 'FileD
ialog': 'tkinter.filedialog', 'SimpleDialog': 'tkinter.simpledialog', 'DocXMLRPCServer': 'xmlrpc.server', 'SimpleHTTPServer': 'http.server', 'CGIHTTPServer': 'http.server', 'UserDict': 'coll
ections', 'UserList': 'collections', 'UserString': 'collections', 'whichdb': 'dbm', 'StringIO': 'io', 'cStringIO': 'io'}, 'NAME_MAPPING': {('__builtin__', 'xrange'): ('builtins', 'range'), (
'__builtin__', 'reduce'): ('functools', 'reduce'), ('__builtin__', 'intern'): ('sys', 'intern'), ('__builtin__', 'unichr'): ('builtins', 'chr'), ('__builtin__', 'unicode'): ('builtins', 'str
'), ('__builtin__', 'long'): ('builtins', 'int'), ('itertools', 'izip'): ('builtins', 'zip'), ('itertools', 'imap'): ('builtins', 'map'), ('itertools', 'ifilter'): ('builtins', 'filter'), ('
itertools', 'ifilterfalse'): ('itertools', 'filterfalse'), ('itertools', 'izip_longest'): ('itertools', 'zip_longest'), ('UserDict', 'IterableUserDict'): ('collections', 'UserDict'), ('UserL
ist', 'UserList'): ('collections', 'UserList'), ('UserString', 'UserString'): ('collections', 'UserString'), ('whichdb', 'whichdb'): ('dbm', 'whichdb'), ('_socket', 'fromfd'): ('socket', 'fr
omfd'), ('_multiprocessing', 'Connection'): ('multiprocessing.connection', 'Connection'), ('multiprocessing.process', 'Process'): ('multiprocessing.context', 'Process'), ('multiprocessing.fo
rking', 'Popen'): ('multiprocessing.popen_fork', 'Popen'), ('urllib', 'ContentTooShortError'): ('urllib.error', 'ContentTooShortError'), ('urllib', 'getproxies'): ('urllib.request', 'getprox
ies'), ('urllib', 'pathname2url'): ('urllib.request', 'pathname2url'), ('urllib', 'quote_plus'): ('urllib.parse', 'quote_plus'), ('urllib', 'quote'): ('urllib.parse', 'quote'), ('urllib', 'u
nquote_plus'): ('urllib.parse', 'unquote_plus'), ('urllib', 'unquote'): ('urllib.parse', 'unquote'), ('urllib', 'url2pathname'): ('urllib.request', 'url2pathname'), ('urllib', 'urlcleanup'):
 ('urllib.request', 'urlcleanup'), ('urllib', 'urlencode'): ('urllib.parse', 'urlencode'), ('urllib', 'urlopen'): ('urllib.request', 'urlopen'), ('urllib', 'urlretrieve'): ('urllib.request',
 'urlretrieve'), ('urllib2', 'HTTPError'): ('urllib.error', 'HTTPError'), ('urllib2', 'URLError'): ('urllib.error', 'URLError'), ('exceptions', 'ArithmeticError'): ('builtins', 'ArithmeticEr
ror'), ('exceptions', 'AssertionError'): ('builtins', 'AssertionError'), ('exceptions', 'AttributeError'): ('builtins', 'AttributeError'), ('exceptions', 'BaseException'): ('builtins', 'Base
Exception'), ('exceptions', 'BufferError'): ('builtins', 'BufferError'), ('exceptions', 'BytesWarning'): ('builtins', 'BytesWarning'), ('exceptions', 'DeprecationWarning'): ('builtins', 'Dep
recationWarning'), ('exceptions', 'EOFError'): ('builtins', 'EOFError'), ('exceptions', 'EnvironmentError'): ('builtins', 'EnvironmentError'), ('exceptions', 'Exception'): ('builtins', 'Exce
ption'), ('exceptions', 'FloatingPointError'): ('builtins', 'FloatingPointError'), ('exceptions', 'FutureWarning'): ('builtins', 'FutureWarning'), ('exceptions', 'GeneratorExit'): ('builtins
', 'GeneratorExit'), ('exceptions', 'IOError'): ('builtins', 'IOError'), ('exceptions', 'ImportError'): ('builtins', 'ImportError'), ('exceptions', 'ImportWarning'): ('builtins', 'ImportWarn
ing'), ('exceptions', 'IndentationError'): ('builtins', 'IndentationError'), ('exceptions', 'IndexError'): ('builtins', 'IndexError'), ('exceptions', 'KeyError'): 


詳しく確認することができないので、原因となっているgoogle.cloud.pubsub.message.Messageあたりを調べてみる

ひたすらdelしてみる

検索していたら下記記事を見つけた。
grpcio==1.0.2で治っているという記述がある。今回は1.3.5を使っているので別の問題っぽい。

まだ試してない方法があったので念のため試してみる

github.com

import atexit
from google.cloud import pubsub

client = pubsub.Client()
for topic in client.list_topics():
    print topic.name

def cleanup():
    global client, topic
    del client, topic
atexit.register(cleanup)

とりあえず、clientやその派生物はすべてループ内で作成し、最後にdelしてみた

gc.collect()はなくても良いかもしれないけど、付けると使用メモリ20MBぐらい減るので残している。

    def main(self):
        while True:
            pubsub_client = pubsub.Client()
            topic = pubsub_client.topic(TOPIC_NAME)
            subscription = topic.subscription(SUBSCRIPTION_NAME)
            results = subscription.pull(return_immediately=True, max_messages=100)
            logger.debug('Received {} messages.'.format(len(results)))
            if results:
                subscription.acknowledge([ack_id for ack_id, message in results])
            del pubsub_client, topic, subscription, results
            gc.collect()

これで動かしながらお昼ごはんを食べに行くことにする

1時間後

# 起動直後
CONTAINER           CPU %               MEM USAGE / LIMIT       MEM %               NET I/O             BLOCK I/O           PIDS
fe0f61bfdf3c        12.14%              47.45 MiB / 31.33 GiB   0.15%               3.63 MB / 958 kB    0 B / 0 B           1


# 約1時間後
CONTAINER           CPU %               MEM USAGE / LIMIT       MEM %               NET I/O             BLOCK I/O           PIDS
fe0f61bfdf3c        19.14%              58.23 MiB / 31.33 GiB   0.18%               274 MB / 74.3 MB    0 B / 0 B           1

増える量は減ったけど、まだ増えていた。

試しにpubsub自体もdelして、毎回importするようにしてみる。

    def main(self):
        while True:
            from google.cloud import pubsub
            pubsub_client = pubsub.Client()
            topic = pubsub_client.topic(TOPIC_NAME)
            subscription = topic.subscription(SUBSCRIPTION_NAME)
            results = subscription.pull(return_immediately=True, max_messages=100)
            logger.debug('Received {} messages.'.format(len(results)))
            if results:
                subscription.acknowledge([ack_id for ack_id, message in results])
            del pubsub, pubsub_client, topic, subscription, results
            gc.collect()
# 起動直後
CONTAINER                          CPU %               MEM USAGE / LIMIT       MEM %               NET I/O             BLOCK I/O           PIDS
projectoceanus_memorytest_run_45   8.46%               36.29 MiB / 31.33 GiB   0.11%               933 kB / 273 kB     0 B / 0 B   

# 約10分後
CONTAINER                          CPU %               MEM USAGE / LIMIT       MEM %               NET I/O             BLOCK I/O           PIDS
projectoceanus_memorytest_run_45   10.51%              43.46 MiB / 31.33 GiB   0.14%               29.2 MB / 8.42 MB   0 B / 0 B 

まだ増えてる

再度diffを取る

いろいろdelしてすっきりしたかもしれないので、再度diffを取って、中身を確認する

      def main(self):
          count = 0
          while True:
              from google.cloud import pubsub
              pubsub_client = pubsub.Client()
              topic = pubsub_client.topic(TOPIC_NAME)
              subscription = topic.subscription(SUBSCRIPTION_NAME)
              results = subscription.pull(return_immediately=True, max_messages=100)

              count += len(results)
              logger.debug('Received {} messages.'.format(len(results)))
              if results:
                  subscription.acknowledge([ack_id for ack_id, message in results])
              del pubsub, pubsub_client, topic, subscription, results
              gc.collect()
              if count > 100:
                  start_objects = muppy.get_objects()

              if count > 199:
                  end_objects = muppy.get_objects()
                  end = set([id(o) for o in end_objects])
                  start = set([id(o) for o in start_objects])
                  diff = end - start
                  logger.debug("diff:{}".format(diff))
                  logger.debug("len:{}".format(len(diff)))
                  for id_ in diff:
                      logger.debug("obj:{}, len:{}".format(type(objects_by_id(id_)), len(objects_by_id(id_))))
                  return

何回やっても差分が出るのは、listのみとなった。

Received 100 messages.
Received 2 messages.
Received 5 messages.
Received 25 messages.
Received 100 messages.
diff:{140130552113352}
len:1
obj:<class 'list'>, len:83834

しかも、83834とでかい。
参照元を探そうとするとメモリを食ってたのはこれのせいっぽい。
このリスト内で循環参照してたりするんだろうか。

pubusbクライアントだけなら何とか辿れそうだったけど、gRPCとかもっと深いところにいくと私には理解できる気がしない。

2017/6/22追記


gcpugのslackチャンネルに質問したら、Googleのトレーニングでおなじみのsinmetalさんから、stackoverflowで質問してみては?と回答を頂く。

Ask Technical Questions on Stack Exchange Sites  |  Support  |  Google Cloud Platform

改めてstackoverflow内で検索してみたら、同じような現象を発見。

stackoverflow.com

batchというオブジェクトがメッセージのIDをずっと増やして持ち続けることが原因で、batch.commit()をすれば良いとのことだったが、今回の処理ではbatchが見当たらない。

topicにbatchを返すメソッドがあったので、呼び出して叩いてみるけど、batchを新規作成しているので意味がなかった。

そこで、昔ながらの使い捨てプロセスを思いついたので実装してみる。

実装して気づいたけど、受け取ったメッセージによって、スプレッドシートに書き込んだり、BigQueryスキャンしたり、メール送ったりと、いろいろとやることが多く、今後そこでもメモリリークが発生する可能性が高いので、結構正解な気がしてきた。

遠回りもしたけれど、もうプロセスを定期的に使い捨てしちゃえ作戦

プロセスを使い捨てにすることで、OSでメモリリークごときれいにしてもらう作戦。

プロセスの生成には標準ライブラリの下記を使う。

from multiprocessing import Process


処理を一つの関数にしてtargetに渡す。今回はtaskという名前。


if __name__ == '__main__':
    while True:
        p = Process(target=task, args=(TOPIC_NAME, SUBSCRIPTION_NAME))
        p.start()
        p.join()


task内では、なんかしらの回数をカウントアップしていって、適当に決めた基準を超えたらループから抜けるようにしておく。
そうすれば、またProcessが新規作成されて動き始め、メモリリークは一緒に消える。

もちろんメモリリークは根本的に解決していない。

彼の言葉を思い出す。

I'm not a real programmer. I throw together things until it works then I move on. The real programmers will say "Yeah it works but you're leaking memory everywhere. Perhaps we should fix that." I’ll just restart Apache every 10 requests.

Rasmus Lerdorf - Wikiquote


しばらくテストして問題なさそうだったら、gitにコミットする。