GAミント至上主義

Web Monomaniacal Developer.

【解決】 Python3のfirebase_adminでCollectionをon_snapshot()してるとhread-ConsumeBidirectionalStream caught unexpected exception

下記記事でやっていた処理だけど、寝る前に実行して、朝起きるころ見るとエラーを吐いて止まっている。
まだ解決してないけど、メモ。


Raspberry PiでPython3を使ってFirestoreにクエリする - GAミント至上主義

マシンはRaspberry Pi Zero WH。
OSは

pi@raspberrypi:~/led8 $ lsb_release  -a
No LSB modules are available.
Distributor ID: Raspbian
Description:    Raspbian GNU/Linux 9.9 (stretch)
Release:        9.9
Codename:       stretch

関係ありそうなパッケージのバージョンは下記。

firebase-admin==2.17.0
google-api-core==1.13.0
google-api-python-client==1.7.9
google-auth==1.6.3
google-auth-httplib2==0.0.3
google-cloud-core==1.0.2
google-cloud-firestore==1.2.0
google-cloud-storage==1.16.1
google-resumable-media==0.3.2
googleapis-common-protos==1.6.0

googleapis.github.io

エラー出力は下記。

hread-ConsumeBidirectionalStream caught unexpected exception <_Rendezvous of RPC that terminated with:                  
        status = StatusCode.INTERNAL                                                                                     
        details = "Received RST_STREAM with error code 0"                                                                
        debug_error_string = "{"created":"@1562627421.743478058","description":"Error received from peer ipv6:[2404:6800$
4004:801::200a]:443","file":"src/core/lib/surface/call.cc","file_line":1046,"grpc_message":"Received RST_STREAM with err$
r code 0","grpc_status":13}"                                                                                             
> and will exit.                                                                                                         
Traceback (most recent call last):                                                                                       
  File "/usr/local/lib/python3.5/dist-packages/google/api_core/bidi.py", line 633, in _thread_main                       
    response = self._bidi_rpc.recv()                                                                                     
  File "/usr/local/lib/python3.5/dist-packages/google/api_core/bidi.py", line 544, in recv                               
    return self._recoverable(self._recv)                                                                                 
  File "/usr/local/lib/python3.5/dist-packages/google/api_core/bidi.py", line 503, in _recoverable                       
    raise exc                                                                                                            
  File "/usr/local/lib/python3.5/dist-packages/google/api_core/bidi.py", line 493, in _recoverable                       
    return method(*args, **kwargs)                                                                                       
  File "/usr/local/lib/python3.5/dist-packages/google/api_core/bidi.py", line 541, in _recv                              
    return next(call)
  File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 364, in __next__
    return self._next()
  File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 358, in _next
    raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
        status = StatusCode.INTERNAL
        details = "Received RST_STREAM with error code 0"
        debug_error_string = "{"created":"@1562627421.743478058","description":"Error received from peer ipv6:[2404:6800:
4004:801::200a]:443","file":"src/core/lib/surface/call.cc","file_line":1046,"grpc_message":"Received RST_STREAM with erro
r code 0","grpc_status":13}"
>
Exception in thread Thread-OnRpcTerminated:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.5/dist-packages/google/cloud/firestore_v1/watch.py", line 290, in close
    raise reason
google.api_core.exceptions.InternalServerError: 500 Received RST_STREAM with error code 0

検索するとPub/Subなどでも同じようなエラーを出すようで、Firestore特有の問題ではなさそう。

https://github.com/googleapis/google-cloud-python/issues/4234

Threadingを使っているので、簡単にはtryでExceptionを受け取ることができない。

2019/7/10 追記

Firestore: what possible cause of this exception: InternalServerError: 500 Received RST_STREAM with error code 0 · Issue #282 · firebase/firebase-admin-python · GitHub
こちらのやり取りでwatchオブジェクトの_closedプロパティを見て、再接続をするハックを見つけたので試してみる。
みんなちょうど60分で切断されているらしい。

python - How to detect realtime listener errors in firebase firestore database? - Stack Overflow

ソースを確認したところ、_closed以外にもis_activeというプロパティもあったので、こちらも確認してみる。
google-cloud-python/watch.py at master · googleapis/google-cloud-python · GitHub

# 省略
def main():↲
    try:↲
        watcher = commands.collection_ref.on_snapshot(↲
            update_callback)↲
    except Exception as e:↲
        print(e)↲
        exit()↲
    while True:↲
        print(datetime.now())↲
        sleep(60)↲
↲
        if not watcher.is_active:↲
            print('is not active!!!')↲
            exit()↲
↲
        if watcher._closed:↲
            print('_closed!!!!!')↲
            exit()↲

7/11 解決。

_closedプロパティを確認して、Trueだったら再度開始することで数日以上動き続けている

# 省略
def main():↲
    try:↲
        watcher = collection_ref.on_snapshot(update_callback)↲
    except Exception as e:↲ 
        print(e)↲
        exit()↲
    while True:↲
        # print(datetime.now())↲
        sleep(60)↲
        if watcher._closed:↲
            print('_closed!!!!!')↲
            watcher = collection_ref.on_snapshot(update_callback

Raspberry PiでPython3を使ってFirestoreにクエリする

Raspberry PiでFirebaseのFirestoreを介して外部と通信するために、最初はNode.jsのライブラリでやろうとがんばってたけど、1日やっても下記エラーが出てダメそうなので、LED部分と同じくPythonでやる。

Node.jsでのエラー

Node.jsはaptで入れて、nでv10を入れたもの、公式からLinux Binaries (ARM) ARMv6を入れて動かしたけど同じだった。認証部分でコケてしまうがまったく同一のファイルでPixelbookのDebian上で動いた。

(node:873) UnhandledPromiseRejectionWarning: FetchError: request to https://www.googleapis.com/oauth2/v4/token failed, reason: connect ENETUNREACH 172.217.161.42:443 - Local (0.0.0.0:0)
    at ClientRequest.<anonymous> (/home/pi/led8/node_modules/node-fetch/lib/index.js:1455:11)
    at ClientRequest.emit (events.js:198:13)
    at TLSSocket.socketErrorListener (_http_client.js:392:9)
    at TLSSocket.emit (events.js:198:13)
    at emitErrorNT (internal/streams/destroy.js:91:8)
    at emitErrorAndCloseNT (internal/streams/destroy.js:59:3)
    at process._tickCallback (internal/process/next_tick.js:63:19)
(node:873) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:873) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

Pythonの準備

pipのインストールは省略。
公式通り入れて
firebase.google.com

サンプルコードを動かす。キーファイルはjsの時と同じものを利用した。

import firebase_admin
from firebase_admin import credentials

cred = credentials.Certificate("path/to/serviceAccountKey.json")
firebase_admin.initialize_app(cred)

PythonでFirestoreにクエリする

今回は特定のコレクションの最新のドキュメントを1件取得し続けたいので下記のようなコードとなった。

import firebase_admin
from time import sleep 
from firebase_admin import credentials 
from firebase_admin import firestore 
 
cred = credentials.Certificate("accountKey.json")
firebase_admin.initialize_app(cred)
 
client = firestore.client()

# 監視したいコレクション 今回はドキュメントがtimestampを持ってるので並び替えしとく
collection_ref = client.collection('path', 'to', 'parent_document')\
    .order_by('timestamp', direction='DESCENDING')\
    .limit(1)

# 更新があったときのコールバック用関数
def update_callback(docs, changes, read_time): 
    print('changes', changes)
    print('read_time', read_time)
    for doc in docs: 
        print(doc.id, doc.to_dict()) 

# 監視の開始、サブプロセスで動く
watcher = collection_ref.on_snapshot(update_callback)

print(watcher) 

# メインプロセスはとりあえずWhileしとく。sleep()をかませないとCPU利用率が高くなるので適度につけとく。
while True:
    sleep(1) 

実行して、Firebaseのコンソールでドキュメントを追加したところ、ほぼリアルタイムで反応したのを確認できた

pi@raspberrypi:~/led8 $ python3 firestore.py 
<google.cloud.firestore_v1.watch.Watch object at 0xb5fe5610>
changes [<google.cloud.firestore_v1.watch.DocumentChange object at 0xb5674350>]
read_time 2019-07-01 02:40:33+00:00
aXPGdePvZFzc5nA6S4RE {'timestamp': DatetimeWithNanoseconds(2019, 7, 2, 15, 0, tzinfo=<UTC>)}
changes [<google.cloud.firestore_v1.watch.DocumentChange object at 0xb56742f0>, <google.cloud.firestore_v1.watch.DocumentChange object at 0xb5674350>]
read_time 2019-07-01 02:41:05+00:00
yWwK3RmSt49EhZazqICW {'timestamp': DatetimeWithNanoseconds(2019, 7, 3, 15, 0, tzinfo=<UTC>)}

家のRaspberry Pi へのSSH接続を楽にする設定

家のローカルネットワーク経由でSSH経由でラズパイに入ることが増えてきたので、手間を減らすためにいろいろ設定する。
特にラズパイに限ったことではない気がする。

最終的には

ssh pi

だけでログインできるようになる。

使ったのはRaspberry Pi Zero WH

OSは

pi@raspberrypi:~ $ lsb_release -a
No LSB modules are available.
Distributor ID: Raspbian
Description:    Raspbian GNU/Linux 9.9 (stretch)
Release:        9.9
Codename:       stretch

IPを固定する(DHCP内)

今までラズパイを再起動すると、前後してしまうので固定したかった。

下記を参考にした
Raspberry Pi に固定IPアドレスを割り当てる方法(Raspbian Jessie) - Qiita

# 現在のIPを確認する
# 作業前のログを残すの忘れたので、これは作業後のもの。
#  inet 192.168.0.81/24 で現在のIPと範囲を確認する。ルーター等によって違う。
# 今回は inet 192.168.0.6-9ぐらいで変わっていたので、
# /24なら一番右のところは変えて大丈夫そうという雑なネットワーク知識のもと81にした

$ pi@raspberrypi:~ $ ip a
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
    inet 127.0.0.1/8 scope host lo
       valid_lft forever preferred_lft forever
    inet6 ::1/128 scope host 
       valid_lft forever preferred_lft forever
2: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
    link/ether b8:27:eb:40:53:0b brd ff:ff:ff:ff:ff:ff
    inet 192.168.0.81/24 brd 192.168.0.255 scope global wlan0
       valid_lft forever preferred_lft forever
    inet6 240f:65:8a29:1:2bbd:361e:3a6c:9a0d/64 scope global mngtmpaddr noprefixroute dynamic 
       valid_lft 263sec preferred_lft 263sec
    inet6 fe80::636a:e31b:a209:3831/64 scope link 
       valid_lft forever preferred_lft forever
# 設定ファイルを開く
pi@raspberrypi:~ $ sudo vim /etc/dhcpcd.conf

# It is possible to fall back to a static IP if DHCP fails:
# define static profile
#profile static_eth0
static ip_address=192.168.0.81/24
#static routers=192.168.1.1
static domain_name_servers=192.168.0.1

static ip_addressのところだけ#をはずし、さっき決めたIPに変更する。
他のはそのままだったが今回は大丈夫だった。
static domain_name_serversを家のルーターのIPにしないと繋がらない場所が出てきた。

# 終わったら再起動して接続を確認する
pi@raspberrypi:~ $ sudo reboot

秘密鍵でログインする

SSHのたびに毎回パスワード打つの面倒なので、設定する。

公開鍵の作り方とかはありふれた情報なのでググる
SSH公開鍵認証で接続するまで - Qiita

今回はパスフレーズすら打ちたくないので、ラズパイ専用に新しくパスフレーズが空のものを作って設定した。
終わったら一度ログアウトして、サラッと入れるか試す。

.ssh/configで入力を省略する

詳しくは下記の記事。
~/.ssh/configについて - Qiita

# ラズパイではなく、ノートPCなどホスト側の作業
~$ vim .ssh/config

# 今回は下記3行だけ。
Host pi
HostName 192.168.0.81
User pi

確認する

~$ ssh pi
Linux raspberrypi 4.19.42+ #1219 Tue May 14 21:16:38 BST 2019 armv6l

The programs included with the Debian GNU/Linux system are free software;
the exact distribution terms for each program are described in the
individual files in /usr/share/doc/*/copyright.

Debian GNU/Linux comes with ABSOLUTELY NO WARRANTY, to the extent
permitted by applicable law.
Last login: Sat Jun 29 00:00:30 2019 from 192.168.0.6
pi@raspberrypi:~ $ 

たった6文字入力で入れるようになった

またsshを使う、scpなどでも有効なので

scp ./filename pi:~/

だけでできるようになる。

おまけ

さらに.bashrcや.bash_profileなどにエイリアスを登録すれば

.bashrc

alias pi="ssh pi"
pi

だけでおうちのラズパイにログインできるようになる