GAミント至上主義

Web Monomaniacal Developer.

RedisのlistとpubsubとRabbitMQを使い分けを考える

2017年1月現在、ビッグデータ処理プロジェクトoceanusは下記のようなデータの流れをしています。


f:id:uyamazak:20170120102719p:plain


GEK上でDockerを使ってアプリケーションを構成していますが、Redisのリスト型、pubsub型に加えて、最近RabbitMQも使い始めたので、どう使い分けしているかを整理してみる。

Redis list型

順番を持ったリストで、左から入れたり、右から入れたり、逆に取り出したりすることができる。

リスト型 — redis 2.0.3 documentation

用途

データを失いたくない1対1のデータ処理。

oceanusでは、armsでHTTPリスエストをバリデーション等をした後にlistに保存し、r2bq(Redis to BigQueryの略)が取り出して、BigQueryに保存している。

BigQueryに保存したらもう必要がなくなり消えるので、基本的にRedisに保存されているデータは少ない。

BigQueryが落ちているなど保存できない場合は、再度listに戻す処理を行っている。


Redis PubSub

情報発信側publisherに対して、複数の受信側subcriberが登録することができる。

Sub側は登録以前のものや接続が遮断されてる間のデータは失われる。送受信が失敗してもリトライは無い。

GoogleのCloud PubSub
Google Cloud Pub/Sub Documentation  |  Cloud Pub/Sub  |  Google Cloud Platform
は、データの再送、保持などもしており信頼性は高いけど、実際に使ってみた所、認証や通信でレスポンスが悪くWEBサーバーが遅くなるため、自分でRedis立てて使いました。

用途

1対多のデータ処理。一つのデータをいろいろな所で使いたい時、かつ多少データがなくなっても問題がないもの。

流れてきたデータにフィルターをかけて、条件Aに合致したらメール、条件Bに合致したらスプレッドシートに保存するとか。

oceanusでは、データはBigQueryにすべて保存されるので、データが失われてもいいような通知用途などに使っています。

RabbitMQ

メッセージキューイングのミドルウェア

メッセージといってもメールやチャットのような人が読むものではなく、シリアライズ化したコードなど機械同士のやりとりが基本です。

銀行などでも実績があり、貯めたタスクを処理するワーカーを別プロセスで走らせることができ、耐障害、非同期、分散、スケールなどを用意に実現できます。

AWSでは、似たようなSQSがあります。
aws.amazon.com

GoogleでもApp Engine用にTask Queue

Task Queue Overview  |  App Engine standard environment for Python  |  Google Cloud Platform

があるけど、App Engine以外から使いにくそうなので使いませんでした。

用途

タスクの非同期処理。リクエストに対して処理時間が長い時、リクエストの数分後など時間差で実行したい時など。


PythonCeleryから使っており、送りたいタスクを関数にし、デコレータを付けるだけで使用できます。

CeleryはブローカーとしてRedisも使うことができますが信頼性、対応する機能などからRabbitMQが推奨されてるようです。

公式Dockerイメージを使えば、特に難しい設定もなく使うことができました。


現在は、特定の条件(エラーを意味するものとか)で、Googleスプレッドシートに保存したり、コンバージョン通知からそのユーザーの履歴をBigQueryから取ってきてメールに付けて送るなどのタスクをどんどん投げて処理させています。

ワーカーはデフォルトでは10と多くなっており、スプレッドシートにアクセスしすぎて、繋がらなくなるなどもあるので、1秒に1回などリミットを掛ける必要がありました。

リトライ、オートスケーリングなど多数の機能がありまだ使いこなせていない感。

逆にこれを使っておけばあとで困ることはなさそう。


昔はDBに保存してcronで処理などもあったと思いますが、そのロジックを自分で書くのは大変だし、スケーリングや多重起動の防止などいろいろ面倒なので、こういうミドルウェアを積極的に使った方が昔の自分に勝てます。

まとめ

最初はRedisとBigQueryだけでしたが、間にPubSub、RabbitMQを入れることにより、リアルタイム処理や、重い処理の非同期処理などさまざまな形でデータを利用することができるようになりました。

また疎結合を保てるので、listを使う部分の開発、pubsubを使う部分の開発をお互いにあまり影響せずに進めることで出来て安心して、本番テスト等も行えます。

プログラマとして、使える道具はどんどん増やしていくべき。


Redis入門 インメモリKVSによる高速データ管理

Redis入門 インメモリKVSによる高速データ管理

Mastering RabbitMQ

Mastering RabbitMQ

KubernetesがServiceのIPなどを勝手に環境変数に入れてくれてる

ビッグデータ処理のプロジェクトoceanusで、セットした覚えのない環境変数がセットされており、それがたまたまコンテナで使っている変数名と同じだったため、原因の特定に時間がかかってしまった。



RabiitMQを使うことになって、それを使うコンテナには、RABBITMQ_PORTという自分で付けた名前の環境変数を使っていた。

ポートはデフォルトのまま5672を使っているが、あとで変更できるようにPython側では下記のように呼び出していた。

from os import environ

RABBITMQ_PORT = environ.get('RABBITMQ_PORT', 5672)

getを使うことで、RABBITMQ_PORTがない場合は、デフォルトの5672が使われるように書いていたつもりで、ローカルでは問題なく動いていた。

しかし、Kubernetes(Google Container Engine)にアップすると、RABBITMQ_PORTに下記のような、ポート番号だけでなく、Kubernetes上のローカルIPまで含めた値が入ってしまい、エラーが起きていた。

tcp://10.7.251.6:5672

調べてみたら、Kubernetesでは、同一クラスタ内のサービスに繋げるように、それぞれのホスト名、ポート番号などを自動で環境変数にいれてくれるようだ。

ドキュメントにも書いてあり、完全に見落としていた。

kubernetes.io

ドキュメントにあるように例えば、redis-masterというservice名だったら、下記の環境変数がすべてセットされる。

REDIS_MASTER_SERVICE_HOST=10.0.0.11
REDIS_MASTER_SERVICE_PORT=6379
REDIS_MASTER_PORT=tcp://10.0.0.11:6379
REDIS_MASTER_PORT_6379_TCP=tcp://10.0.0.11:6379
REDIS_MASTER_PORT_6379_TCP_PROTO=tcp
REDIS_MASTER_PORT_6379_TCP_PORT=6379
REDIS_MASTER_PORT_6379_TCP_ADDR=10.0.0.11

今回自分のアプリケーションでは、たまたまサービス名と、それから生成される環境変数が同じになってしまっていた。

ここらへんの環境変数を整理して、この環境変数名でアプリケーション側から呼び出すようにすれば、新規作成時にServiceのIPを指定しなくても、一気につながるようにできるはず。


WEBサーバー、DBサーバー、キャッシュサーバー、MQサーバーなど、Kubernetes内のネットワークが増えてきたら、この変数名を使ってyamlを書いたりするのが必須になりそう。

Kubernetes: Up and Running; Dive into the Future of Infrastructure

Kubernetes: Up and Running; Dive into the Future of Infrastructure

Pythonでリトライ処理を考える

ビッグデータ処理のために作ってるoceanusでは、受け取ったデータをRedis、BigQuery、Google SpreadSheet、SendGrid(メール)など外部に送ることが多く、残念ながら外部とのやりとりはコントロールできない不確定要素が多い。

そのため、いろんなところでリトライ処理が必要になる。

リトライ時の間隔については別記事で書いたので今回は省く。
uyamazak.hatenablog.com

これまで使ったリトライ方法3つをまとめた。
どんなアプリケーションかによって正解は異なると思う。

エラーをtryで取って、except節でリトライ

一番シンプルなのがこれ。

try:
    result = task()
except Exception as e:
    print("error and retry")
    # retry
    result = task()

もしくは処理結果をif文で。

result = task()
if not result:
    # retry
    result = task()

まあ失敗してもいいけど、一度はリトライしてみるか、程度のあんまり重要じゃないところや、2回以上リトライが必要ない場面ではこれでいいかもしれない。
欠点としては、リトライ回数を増やせないこと。

入れ子にすれば何回でもできないことはないけど、読みにくいし、ダサすぎる。

ループで回す

forなどでリトライ回数分回し、成功次第抜ける。
While True:は無限ループ恐怖症なのか避けるようにしている。

ループ回数や、ループ中の処理などは自由に書ける。

大分端折ったけど、BigQueryや外部のサーバーを使うときは下記のような処理を書いた。
リトライ中ではエラー、全部ダメだったときはクリティカルの出し分けを付けた。
実際はprintではなく、raiseやログ出力している

tryのelse節(tryにエラーが無い時だけ実行する)は、初めて使ったかもしれない。あまり見ないので推奨できないかも。
最初のリトライ回数が0だと、感覚的に違うので、rangeは1から、制限回数にも+1して見やすいようにしている。

from time import sleep
CONNECTION_RETRY = 3

def task_with_retry():
    for i in range(1, CONNECTION_RETRY + 1):
        try:
            result = task()
        except Exception as e:
            print("error:{e} retry:{i}/{max}".format(e=e, i=i, max=CONNECTION_RETRY))
            sleep(i * 5)
        else:
            return True
    print("critical")
    return False

パッケージrertyを使う

pypi.python.org
まんまretryというパッケージがpypiに公開されている。


使用する関数にデコレータで利用できるので、コードをシンプルに保てて賢い。

from retry import retry
@retry()
def make_trouble():
    '''Retry until succeed'''
@retry(ZeroDivisionError, tries=3, delay=2)
def make_trouble():
    '''Retry on ZeroDivisionError, raise error after 3 attempts, sleep 2 seconds between attempts.'''
@retry((ValueError, TypeError), delay=1, backoff=2)
def make_trouble():
    '''Retry on ValueError or TypeError, sleep 1, 2, 4, 8, ... seconds between attempts.'''
@retry((ValueError, TypeError), delay=1, backoff=2, max_delay=4)
def make_trouble():
    '''Retry on ValueError or TypeError, sleep 1, 2, 4, 4, ... seconds between attempts.'''
@retry(ValueError, delay=1, jitter=1)
def make_trouble():
    '''Retry on ValueError, sleep 1, 2, 3, 4, ... seconds between attempts.'''
# If you enable logging, you can get warnings like 'ValueError, retrying in
# 1 seconds'
if __name__ == '__main__':
    import logging
    logging.basicConfig()
    make_trouble()

冒頭の記事に書いたリトライ間隔のジッター、loggerなども引数で指定可能

def retry(exceptions=Exception, tries=-1, delay=0, max_delay=None, backoff=1, jitter=0, logger=logging_logger):
    """Return a retry decorator.

    :param exceptions: an exception or a tuple of exceptions to catch. default: Exception.
    :param tries: the maximum number of attempts. default: -1 (infinite).
    :param delay: initial delay between attempts. default: 0.
    :param max_delay: the maximum value of delay. default: None (no limit).
    :param backoff: multiplier applied to delay between attempts. default: 1 (no backoff).
    :param jitter: extra seconds added to delay between attempts. default: 0.
                   fixed if a number, random if a range tuple (min, max)
    :param logger: logger.warning(fmt, error, delay) will be called on failed attempts.
                   default: retry.logging_logger. if None, logging is disabled.
    """

なるべく使用するパッケージは少なくしたいけど、リトライする箇所が増えて、このオプションでなんとかなるのなら、いちいち自分で書かずにこれを使った方がいいと思う。

いちばんやさしいPythonの教本 人気講師が教える基礎からサーバサイド開発まで (「いちばんやさしい教本」シリーズ)

いちばんやさしいPythonの教本 人気講師が教える基礎からサーバサイド開発まで (「いちばんやさしい教本」シリーズ)