読者です 読者をやめる 読者になる 読者になる

仕事中の問題と解決メモ。

最近はPythonとGoogle Cloud Platformがメイン。株式会社ビズオーシャンで企画と開発運用、データ活用とか。 http://mstdn.bizocean.co.jp/@uyamazak https://github.com/uyamazak/

Python3でipaddressモジュールを使って、IPアドレス認証を行う

本番環境のWEBアプリケーションで、デバッグ情報を表示させる際、サーバー情報なども全部さらけだすので、最低限IP認証だけ行いたいと思った。

IPアドレスの文字列比較なら簡単だけど、できればネットワークアドレス(192.168.0.0/24等)で許可できると便利。

検索した所、python3からは標準でipaddressというモジュールがあるのを知った。Googleが作ったipaddrが元になってるらしい。

21.28. ipaddress — IPv4/IPv6 操作ライブラリ — Python 3.5.2 ドキュメント


下記のような感じで書いてみた。

許可するIPのリスト名はDjangoのsettings.pyから取った。

strでカンマ区切りで指定して後で分割する。

import ipaddress

INTERNAL_IPS_V4 = "192.168.0.0/16,127.1.1.1"


def is_internal_ip(client_ip_str) -> bool:
    internal_ip_list = INTERNAL_IPS_V4.split(",")
    client_ip = ipaddress.ip_address(client_ip_str)
    for internal_ip in internal_ip_list:
        ip_network = ipaddress.ip_network(internal_ip)
        if client_ip in ip_network:
            return True

    return False

ipaddress.ip_networkは、/なしで普通のIPアドレスを指定すると/32があるとして良しなにしてくれるので、IPアドレスとネットワークアドレスが一緒でも問題なく動いた。

>>> is_internal_ip("192.168.50.1")
True
>>> is_internal_ip("192.167.50.1")
False
>>> is_internal_ip("127.1.1.1")
True
>>> is_internal_ip("127.1.1.2")
False
>>> is_internal_ip("127.1.2.1")
False
>>> is_internal_ip("255.255.255.0")
False
>>> is_internal_ip("192.168.277.777")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/oceanus/app/common/utils.py", line 77, in is_internal_ip
    client_ip = ipaddress.ip_address(client_ip_str)
  File "/usr/local/lib/python3.6/ipaddress.py", line 54, in ip_address
    address)
ValueError: '192.168.277.777' does not appear to be an IPv4 or IPv6 address


そもそも与えられたclient_ip_strがIPアドレスとして正しいかは見ていないので、変なの入れるとエラーが出るけど、自分で設定するんだから大丈夫だよね。

入門 Python 3

入門 Python 3

データ解析インターンの予定を立ててみる

以前の記事でインターン募集をしていましたが、1件応募があり、採用の方向で進んでいるので、ざっくり今後のスケジュールを立ててみる


前提条件いろいろ

対象

大学2年生

マーケティングに興味があり、データ分析、プログラミングのインターンを探していた。

プログラミング未経験

期間

週2で6ヶ月以上

就活が本格化するまで。

目的

大学生

就職活動のため

自社側

会社の知名度アップ

社員(自分)の教育能力を付けるため

採用のため

前提条件以上。




まずゴールを設定。

ゴール

ざっくりだけど

仕事ができる人にする(なる)

bizocean自体も書式で仕事を効率よくできるようにするサービスだし、インターン生を仕事ができる人にできるのであれば、採用にも、教育スキル的にも良いと判断できそう。


求めること

自社社員との世代ギャップを活かした視点、発見。


評価軸

面談はよくやるけど、ただ「データ解析のインターンしてました」だけだと就活の時に弱い気がする。

実際にその過程と成果がネット上で見られるようになっていれば、履歴書にURL書くことでかなりアピールできそう。

そのため、成果物として3つを設定。

- bizoceanの技術ブログを執筆
- データ分析を行いグラフ、レポートの作成
- 企画、提案資料を作成

この3つをメンターが質的に、アクセス数などを量的に判断して評価する。

スケジュール

当たり前だけど途中のサポートや進捗確認はメンターがする。

入門 ~2、3ヶ月

bizoceanとは

まず、データはうちのbizoceanのアクセスデータ、行動データ、会員データなどを使うので、うちのビジネスはなにか、そのデータが何を意味しているのか、どうやって記録しているのかを知ってもらう。

www.bizocean.jp

社員にインタビューして、先入観などを知るのもいいかも。

途中成果として、「bizoceanとは○○○である」みたいなのを100個書くとかもいいかな。大学生の視点で出すと既存の社員には意外かもしれない。

データ分析とは

まだ簡単なところしか手を付けられてないけど、どういった手順でデータを可視化していくのかを見てもらう

Pythonと環境について

データ解析にはJupyterのGoogleカスタマイズである、Google Cloud Datalab上でPythonを書き、BigQueryからデータを取って使ってもらう。

アプリ開発よりはPythonで覚える必要がある範囲が狭いので(Pythonで使うライブラリが限られているという意味)、実際に動かしながら基本的な文法、エラー対処に慣れてもらう。

実践 ~2、3ヶ月

テーマを考える

bizoceanと、データについて理解してもらったら、自身の興味や、会社の課題に合わせて、データを使って何をするのかのテーマを決めてもらう。

一つだけだと視野が狭くなるので、2、3個以上とする。

できれば既存の社員の思い込みを打破し、新しい発見となるようなものがいい。


- bizocean会員は実は○○だった!
- bizocean会員が辞める本当の理由
- こういうコンテンツがあれば、会員は○○万人増やせる!

さっき拾った記事だけどこういうフレームワーク使うのもありかも
medium.com

資料作り

データを使って、テーマについて資料を作成する

発表

月一の全体会議にて発表


絶対内定2018 インターンシップ

絶対内定2018 インターンシップ

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

Pythonによるデータ分析入門 ―NumPy、pandasを使ったデータ処理

CeleryでWokerとbeatを同時起動する

タスクキューイングができるCeleryを使っているけど、一部認証が1時間で切れる箇所があり、期限切れの前に定期的に再認証を行う処理が必要になった。

そのため、Celeryでcron的なことができるbeat機能を使った。

Periodic Tasks
Periodic Tasks — Celery 4.0.2 documentation

ドキュメントから下記部分を参考に再認証処理を追加

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)
Setting these up from w


最初はworkerの他にbeatの起動が必要なのを知らずに、全然動かんと思ってしまった。

しかし、今回の再認証処理は、同一インスタンス内で再認証するので、別プロセス、別コンテナ(Docker)になってしまうと、全く意味がない。

なんとか一プロセスでworkerとbeatを起動できないかと思ってたら簡単にできるよう。

stackoverflow.com

python manage.py celery worker --beat

 ※--beatの代わりに-Bでも同じ


が、ドキュメントを見てみると
Periodic Tasks — Celery 4.0.2 documentation

this is convenient if you’ll never run more than one worker node, but it’s not commonly used and for that reason isn’t recommended for production use:

あまり一般的ではなく、本番環境ではおすすめしないとのこと。

Worker内で行う再認証などは、素直にWorker内で完結させたほうが良さそう。



Celery、リトライ、時間制限(1秒に1回だけとか)、cronのような定期実行までできて便利すぎてやばい。

RedisのlistとpubsubとRabbitMQを使い分けを考える

2017年1月現在、ビッグデータ処理プロジェクトoceanusは下記のようなデータの流れをしています。


f:id:uyamazak:20170120102719p:plain


GEK上でDockerを使ってアプリケーションを構成していますが、Redisのリスト型、pubsub型に加えて、最近RabbitMQも使い始めたので、どう使い分けしているかを整理してみる。

Redis list型

順番を持ったリストで、左から入れたり、右から入れたり、逆に取り出したりすることができる。

リスト型 — redis 2.0.3 documentation

用途

データを失いたくない1対1のデータ処理。

oceanusでは、armsでHTTPリスエストをバリデーション等をした後にlistに保存し、r2bq(Redis to BigQueryの略)が取り出して、BigQueryに保存している。

BigQueryに保存したらもう必要がなくなり消えるので、基本的にRedisに保存されているデータは少ない。

BigQueryが落ちているなど保存できない場合は、再度listに戻す処理を行っている。


Redis PubSub

情報発信側publisherに対して、複数の受信側subcriberが登録することができる。

Sub側は登録以前のものや接続が遮断されてる間のデータは失われる。送受信が失敗してもリトライは無い。

GoogleのCloud PubSub
Google Cloud Pub/Sub Documentation  |  Cloud Pub/Sub  |  Google Cloud Platform
は、データの再送、保持などもしており信頼性は高いけど、実際に使ってみた所、認証や通信でレスポンスが悪くWEBサーバーが遅くなるため、自分でRedis立てて使いました。

用途

1対多のデータ処理。一つのデータをいろいろな所で使いたい時、かつ多少データがなくなっても問題がないもの。

流れてきたデータにフィルターをかけて、条件Aに合致したらメール、条件Bに合致したらスプレッドシートに保存するとか。

oceanusでは、データはBigQueryにすべて保存されるので、データが失われてもいいような通知用途などに使っています。

RabbitMQ

メッセージキューイングのミドルウェア

メッセージといってもメールやチャットのような人が読むものではなく、シリアライズ化したコードなど機械同士のやりとりが基本です。

銀行などでも実績があり、貯めたタスクを処理するワーカーを別プロセスで走らせることができ、耐障害、非同期、分散、スケールなどを用意に実現できます。

AWSでは、似たようなSQSがあります。
aws.amazon.com

GoogleでもApp Engine用にTask Queue

Task Queue Overview  |  App Engine standard environment for Python  |  Google Cloud Platform

があるけど、App Engine以外から使いにくそうなので使いませんでした。

用途

タスクの非同期処理。リクエストに対して処理時間が長い時、リクエストの数分後など時間差で実行したい時など。


PythonCeleryから使っており、送りたいタスクを関数にし、デコレータを付けるだけで使用できます。

CeleryはブローカーとしてRedisも使うことができますが信頼性、対応する機能などからRabbitMQが推奨されてるようです。

公式Dockerイメージを使えば、特に難しい設定もなく使うことができました。


現在は、特定の条件(エラーを意味するものとか)で、Googleスプレッドシートに保存したり、コンバージョン通知からそのユーザーの履歴をBigQueryから取ってきてメールに付けて送るなどのタスクをどんどん投げて処理させています。

ワーカーはデフォルトでは10と多くなっており、スプレッドシートにアクセスしすぎて、繋がらなくなるなどもあるので、1秒に1回などリミットを掛ける必要がありました。

リトライ、オートスケーリングなど多数の機能がありまだ使いこなせていない感。

逆にこれを使っておけばあとで困ることはなさそう。


昔はDBに保存してcronで処理などもあったと思いますが、そのロジックを自分で書くのは大変だし、スケーリングや多重起動の防止などいろいろ面倒なので、こういうミドルウェアを積極的に使った方が昔の自分に勝てます。

まとめ

最初はRedisとBigQueryだけでしたが、間にPubSub、RabbitMQを入れることにより、リアルタイム処理や、重い処理の非同期処理などさまざまな形でデータを利用することができるようになりました。

また疎結合を保てるので、listを使う部分の開発、pubsubを使う部分の開発をお互いにあまり影響せずに進めることで出来て安心して、本番テスト等も行えます。

プログラマとして、使える道具はどんどん増やしていくべき。


Redis入門 インメモリKVSによる高速データ管理

Redis入門 インメモリKVSによる高速データ管理

Mastering RabbitMQ

Mastering RabbitMQ

KubernetesがServiceのIPなどを勝手に環境変数に入れてくれてる

ビッグデータ処理のプロジェクトoceanusで、セットした覚えのない環境変数がセットされており、それがたまたまコンテナで使っている変数名と同じだったため、原因の特定に時間がかかってしまった。



RabiitMQを使うことになって、それを使うコンテナには、RABBITMQ_PORTという自分で付けた名前の環境変数を使っていた。

ポートはデフォルトのまま5672を使っているが、あとで変更できるようにPython側では下記のように呼び出していた。

from os import environ

RABBITMQ_PORT = environ.get('RABBITMQ_PORT', 5672)

getを使うことで、RABBITMQ_PORTがない場合は、デフォルトの5672が使われるように書いていたつもりで、ローカルでは問題なく動いていた。

しかし、Kubernetes(Google Container Engine)にアップすると、RABBITMQ_PORTに下記のような、ポート番号だけでなく、Kubernetes上のローカルIPまで含めた値が入ってしまい、エラーが起きていた。

tcp://10.7.251.6:5672

調べてみたら、Kubernetesでは、同一クラスタ内のサービスに繋げるように、それぞれのホスト名、ポート番号などを自動で環境変数にいれてくれるようだ。

ドキュメントにも書いてあり、完全に見落としていた。

kubernetes.io

ドキュメントにあるように例えば、redis-masterというservice名だったら、下記の環境変数がすべてセットされる。

REDIS_MASTER_SERVICE_HOST=10.0.0.11
REDIS_MASTER_SERVICE_PORT=6379
REDIS_MASTER_PORT=tcp://10.0.0.11:6379
REDIS_MASTER_PORT_6379_TCP=tcp://10.0.0.11:6379
REDIS_MASTER_PORT_6379_TCP_PROTO=tcp
REDIS_MASTER_PORT_6379_TCP_PORT=6379
REDIS_MASTER_PORT_6379_TCP_ADDR=10.0.0.11

今回自分のアプリケーションでは、たまたまサービス名と、それから生成される環境変数が同じになってしまっていた。

ここらへんの環境変数を整理して、この環境変数名でアプリケーション側から呼び出すようにすれば、新規作成時にServiceのIPを指定しなくても、一気につながるようにできるはず。


WEBサーバー、DBサーバー、キャッシュサーバー、MQサーバーなど、Kubernetes内のネットワークが増えてきたら、この変数名を使ってyamlを書いたりするのが必須になりそう。

Kubernetes: Up and Running; Dive into the Future of Infrastructure

Kubernetes: Up and Running; Dive into the Future of Infrastructure

Pythonでリトライ処理を考える

ビッグデータ処理のために作ってるoceanusでは、受け取ったデータをRedis、BigQuery、Google SpreadSheet、SendGrid(メール)など外部に送ることが多く、残念ながら外部とのやりとりはコントロールできない不確定要素が多い。

そのため、いろんなところでリトライ処理が必要になる。

リトライ時の間隔については別記事で書いたので今回は省く。
uyamazak.hatenablog.com

これまで使ったリトライ方法3つをまとめた。
どんなアプリケーションかによって正解は異なると思う。

エラーをtryで取って、except節でリトライ

一番シンプルなのがこれ。

try:
    result = task()
except Exception as e:
    print("error and retry")
    # retry
    result = task()

もしくは処理結果をif文で。

result = task()
if not result:
    # retry
    result = task()

まあ失敗してもいいけど、一度はリトライしてみるか、程度のあんまり重要じゃないところや、2回以上リトライが必要ない場面ではこれでいいかもしれない。
欠点としては、リトライ回数を増やせないこと。

入れ子にすれば何回でもできないことはないけど、読みにくいし、ダサすぎる。

ループで回す

forなどでリトライ回数分回し、成功次第抜ける。
While True:は無限ループ恐怖症なのか避けるようにしている。

ループ回数や、ループ中の処理などは自由に書ける。

大分端折ったけど、BigQueryや外部のサーバーを使うときは下記のような処理を書いた。
リトライ中ではエラー、全部ダメだったときはクリティカルの出し分けを付けた。
実際はprintではなく、raiseやログ出力している

tryのelse節(tryにエラーが無い時だけ実行する)は、初めて使ったかもしれない。あまり見ないので推奨できないかも。
最初のリトライ回数が0だと、感覚的に違うので、rangeは1から、制限回数にも+1して見やすいようにしている。

from time import sleep
CONNECTION_RETRY = 3

def task_with_retry():
    for i in range(1, CONNECTION_RETRY + 1):
        try:
            result = task()
        except Exception as e:
            print("error:{e} retry:{i}/{max}".format(e=e, i=i, max=CONNECTION_RETRY))
            sleep(i * 5)
        else:
            return True
    print("critical")
    return False

パッケージrertyを使う

pypi.python.org
まんまretryというパッケージがpypiに公開されている。


使用する関数にデコレータで利用できるので、コードをシンプルに保てて賢い。

from retry import retry
@retry()
def make_trouble():
    '''Retry until succeed'''
@retry(ZeroDivisionError, tries=3, delay=2)
def make_trouble():
    '''Retry on ZeroDivisionError, raise error after 3 attempts, sleep 2 seconds between attempts.'''
@retry((ValueError, TypeError), delay=1, backoff=2)
def make_trouble():
    '''Retry on ValueError or TypeError, sleep 1, 2, 4, 8, ... seconds between attempts.'''
@retry((ValueError, TypeError), delay=1, backoff=2, max_delay=4)
def make_trouble():
    '''Retry on ValueError or TypeError, sleep 1, 2, 4, 4, ... seconds between attempts.'''
@retry(ValueError, delay=1, jitter=1)
def make_trouble():
    '''Retry on ValueError, sleep 1, 2, 3, 4, ... seconds between attempts.'''
# If you enable logging, you can get warnings like 'ValueError, retrying in
# 1 seconds'
if __name__ == '__main__':
    import logging
    logging.basicConfig()
    make_trouble()

冒頭の記事に書いたリトライ間隔のジッター、loggerなども引数で指定可能

def retry(exceptions=Exception, tries=-1, delay=0, max_delay=None, backoff=1, jitter=0, logger=logging_logger):
    """Return a retry decorator.

    :param exceptions: an exception or a tuple of exceptions to catch. default: Exception.
    :param tries: the maximum number of attempts. default: -1 (infinite).
    :param delay: initial delay between attempts. default: 0.
    :param max_delay: the maximum value of delay. default: None (no limit).
    :param backoff: multiplier applied to delay between attempts. default: 1 (no backoff).
    :param jitter: extra seconds added to delay between attempts. default: 0.
                   fixed if a number, random if a range tuple (min, max)
    :param logger: logger.warning(fmt, error, delay) will be called on failed attempts.
                   default: retry.logging_logger. if None, logging is disabled.
    """

なるべく使用するパッケージは少なくしたいけど、リトライする箇所が増えて、このオプションでなんとかなるのなら、いちいち自分で書かずにこれを使った方がいいと思う。


実践力を身につける Pythonの教科書

実践力を身につける Pythonの教科書

Effective Python ―Pythonプログラムを改良する59項目

Effective Python ―Pythonプログラムを改良する59項目

Dockerのイメージから過去のソースコード等を救出する

DockerとGKE(Google Container Engine)を開発に使うようになってから、gitだけでなく、Dockerのイメージもバージョンで保存されるようになった。

ソースコードを変更後、動作確認のためにDockerイメージをビルドし直すことが多いので、頻度としては、gitへのコミットよりも、Dockerのビルドの方が多い。

そのため、gitにコミットする前に、ファイルを消してしまった、変更してしまった場合、Dockerのイメージからファイルを救出することができる。

私の普段の開発の流れは下記のよう。
まだ自動テスト、自動デプロイが必要な規模にはなってないので、最後2つの順番は変わったりする。

  1. ソースコードの追加、修正
  2. Dockerイメージのビルド
  3. 開発環境でテスト
  4. gitコミット
  5. 本番環境へアップ


そして、月曜午前中に出社して、寝ぼけてコミット前なのに「git resetしちゃった!」というとき。

落ち着いて下記のようなコマンドを叩く。
grepでなくてもdocker imagesコマンドのfilterなどを使ってもいい

>>% sudo docker images | grep arms
asia.gcr.io/oceanus-dev/arms latest 0f8530abb371 5 weeks ago 278.5 MB
asia.gcr.io/oceanus-dev/arms v20161207-01 0f8530abb371 5 weeks ago 278.5 MB
asia.gcr.io/oceanus-dev/arms v20161124-01 f302e57a0ebb 7 weeks ago 278.5 MB
asia.gcr.io/oceanus-dev/arms v20161115-02 7f2fe84bb244 8 weeks ago 278.5 MB
asia.gcr.io/oceanus-dev/arms v20161115-01 be7d629f9186 8 weeks ago 278.5 MB
asia.gcr.io/oceanus-dev/arms v20161111-02 75ce674e8eed 9 weeks ago 278.5 MB
asia.gcr.io/oceanus-dev/arms v20161111-01 4bbecf26285f 9 weeks ago 278.6 MB
asia.gcr.io/oceanus-dev/arms v20161107-alpine03 b21bdc36165b 9 weeks ago 278.6 MB
asia.gcr.io/oceanus-dev/arms v20161107-alpine01 6a890b567b5a 10 weeks ago 278.6 MB
asia.gcr.io/oceanus-dev/arms v20161107-alpine02 6a890b567b5a 10 weeks ago 278.6 MB