GAミント至上主義

Web Monomaniacal Developer.

CeleryでWokerとbeatを同時起動する

タスクキューイングができるCeleryを使っているけど、一部認証が1時間で切れる箇所があり、期限切れの前に定期的に再認証を行う処理が必要になった。

そのため、Celeryでcron的なことができるbeat機能を使った。

Periodic Tasks
Periodic Tasks — Celery 4.0.2 documentation

ドキュメントから下記部分を参考に再認証処理を追加

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)
Setting these up from w


最初はworkerの他にbeatの起動が必要なのを知らずに、全然動かんと思ってしまった。

しかし、今回の再認証処理は、同一インスタンス内で再認証するので、別プロセス、別コンテナ(Docker)になってしまうと、全く意味がない。

なんとか一プロセスでworkerとbeatを起動できないかと思ってたら簡単にできるよう。

stackoverflow.com

python manage.py celery worker --beat

 ※--beatの代わりに-Bでも同じ


が、ドキュメントを見てみると
Periodic Tasks — Celery 4.0.2 documentation

this is convenient if you’ll never run more than one worker node, but it’s not commonly used and for that reason isn’t recommended for production use:

あまり一般的ではなく、本番環境ではおすすめしないとのこと。

Worker内で行う再認証などは、素直にWorker内で完結させたほうが良さそう。



Celery、リトライ、時間制限(1秒に1回だけとか)、cronのような定期実行までできて便利すぎてやばい。

RedisのlistとpubsubとRabbitMQを使い分けを考える

2017年1月現在、ビッグデータ処理プロジェクトoceanusは下記のようなデータの流れをしています。


f:id:uyamazak:20170120102719p:plain


GEK上でDockerを使ってアプリケーションを構成していますが、Redisのリスト型、pubsub型に加えて、最近RabbitMQも使い始めたので、どう使い分けしているかを整理してみる。

Redis list型

順番を持ったリストで、左から入れたり、右から入れたり、逆に取り出したりすることができる。

リスト型 — redis 2.0.3 documentation

用途

データを失いたくない1対1のデータ処理。

oceanusでは、armsでHTTPリスエストをバリデーション等をした後にlistに保存し、r2bq(Redis to BigQueryの略)が取り出して、BigQueryに保存している。

BigQueryに保存したらもう必要がなくなり消えるので、基本的にRedisに保存されているデータは少ない。

BigQueryが落ちているなど保存できない場合は、再度listに戻す処理を行っている。


Redis PubSub

情報発信側publisherに対して、複数の受信側subcriberが登録することができる。

Sub側は登録以前のものや接続が遮断されてる間のデータは失われる。送受信が失敗してもリトライは無い。

GoogleのCloud PubSub
Google Cloud Pub/Sub Documentation  |  Cloud Pub/Sub  |  Google Cloud Platform
は、データの再送、保持などもしており信頼性は高いけど、実際に使ってみた所、認証や通信でレスポンスが悪くWEBサーバーが遅くなるため、自分でRedis立てて使いました。

用途

1対多のデータ処理。一つのデータをいろいろな所で使いたい時、かつ多少データがなくなっても問題がないもの。

流れてきたデータにフィルターをかけて、条件Aに合致したらメール、条件Bに合致したらスプレッドシートに保存するとか。

oceanusでは、データはBigQueryにすべて保存されるので、データが失われてもいいような通知用途などに使っています。

RabbitMQ

メッセージキューイングのミドルウェア

メッセージといってもメールやチャットのような人が読むものではなく、シリアライズ化したコードなど機械同士のやりとりが基本です。

銀行などでも実績があり、貯めたタスクを処理するワーカーを別プロセスで走らせることができ、耐障害、非同期、分散、スケールなどを用意に実現できます。

AWSでは、似たようなSQSがあります。
aws.amazon.com

GoogleでもApp Engine用にTask Queue

Task Queue Overview  |  App Engine standard environment for Python  |  Google Cloud Platform

があるけど、App Engine以外から使いにくそうなので使いませんでした。

用途

タスクの非同期処理。リクエストに対して処理時間が長い時、リクエストの数分後など時間差で実行したい時など。


PythonCeleryから使っており、送りたいタスクを関数にし、デコレータを付けるだけで使用できます。

CeleryはブローカーとしてRedisも使うことができますが信頼性、対応する機能などからRabbitMQが推奨されてるようです。

公式Dockerイメージを使えば、特に難しい設定もなく使うことができました。


現在は、特定の条件(エラーを意味するものとか)で、Googleスプレッドシートに保存したり、コンバージョン通知からそのユーザーの履歴をBigQueryから取ってきてメールに付けて送るなどのタスクをどんどん投げて処理させています。

ワーカーはデフォルトでは10と多くなっており、スプレッドシートにアクセスしすぎて、繋がらなくなるなどもあるので、1秒に1回などリミットを掛ける必要がありました。

リトライ、オートスケーリングなど多数の機能がありまだ使いこなせていない感。

逆にこれを使っておけばあとで困ることはなさそう。


昔はDBに保存してcronで処理などもあったと思いますが、そのロジックを自分で書くのは大変だし、スケーリングや多重起動の防止などいろいろ面倒なので、こういうミドルウェアを積極的に使った方が昔の自分に勝てます。

まとめ

最初はRedisとBigQueryだけでしたが、間にPubSub、RabbitMQを入れることにより、リアルタイム処理や、重い処理の非同期処理などさまざまな形でデータを利用することができるようになりました。

また疎結合を保てるので、listを使う部分の開発、pubsubを使う部分の開発をお互いにあまり影響せずに進めることで出来て安心して、本番テスト等も行えます。

プログラマとして、使える道具はどんどん増やしていくべき。


Redis入門 インメモリKVSによる高速データ管理

Redis入門 インメモリKVSによる高速データ管理

Mastering RabbitMQ

Mastering RabbitMQ

KubernetesがServiceのIPなどを勝手に環境変数に入れてくれてる

ビッグデータ処理のプロジェクトoceanusで、セットした覚えのない環境変数がセットされており、それがたまたまコンテナで使っている変数名と同じだったため、原因の特定に時間がかかってしまった。



RabiitMQを使うことになって、それを使うコンテナには、RABBITMQ_PORTという自分で付けた名前の環境変数を使っていた。

ポートはデフォルトのまま5672を使っているが、あとで変更できるようにPython側では下記のように呼び出していた。

from os import environ

RABBITMQ_PORT = environ.get('RABBITMQ_PORT', 5672)

getを使うことで、RABBITMQ_PORTがない場合は、デフォルトの5672が使われるように書いていたつもりで、ローカルでは問題なく動いていた。

しかし、Kubernetes(Google Container Engine)にアップすると、RABBITMQ_PORTに下記のような、ポート番号だけでなく、Kubernetes上のローカルIPまで含めた値が入ってしまい、エラーが起きていた。

tcp://10.7.251.6:5672

調べてみたら、Kubernetesでは、同一クラスタ内のサービスに繋げるように、それぞれのホスト名、ポート番号などを自動で環境変数にいれてくれるようだ。

ドキュメントにも書いてあり、完全に見落としていた。

kubernetes.io

ドキュメントにあるように例えば、redis-masterというservice名だったら、下記の環境変数がすべてセットされる。

REDIS_MASTER_SERVICE_HOST=10.0.0.11
REDIS_MASTER_SERVICE_PORT=6379
REDIS_MASTER_PORT=tcp://10.0.0.11:6379
REDIS_MASTER_PORT_6379_TCP=tcp://10.0.0.11:6379
REDIS_MASTER_PORT_6379_TCP_PROTO=tcp
REDIS_MASTER_PORT_6379_TCP_PORT=6379
REDIS_MASTER_PORT_6379_TCP_ADDR=10.0.0.11

今回自分のアプリケーションでは、たまたまサービス名と、それから生成される環境変数が同じになってしまっていた。

ここらへんの環境変数を整理して、この環境変数名でアプリケーション側から呼び出すようにすれば、新規作成時にServiceのIPを指定しなくても、一気につながるようにできるはず。


WEBサーバー、DBサーバー、キャッシュサーバー、MQサーバーなど、Kubernetes内のネットワークが増えてきたら、この変数名を使ってyamlを書いたりするのが必須になりそう。

Kubernetes: Up and Running; Dive into the Future of Infrastructure

Kubernetes: Up and Running; Dive into the Future of Infrastructure