以前から本を買ったり気になっていたGo言語だけど、書き慣れてないし変更が多いと大変なので、使い所がなくPythonで全部済ましてしまっていた。
でも以前、Google Cloud PUB/SUBを使おうとした時、通信がHTTPSなのでレスポンスが遅く、WEBアプリ側から直接はとても使えないので、ローカルにRedisを立てて使っているのを思い出した。
どのくらい遅いかというと、ローカルだと10ms以下で返せていたAPI的なものが、Cloud PUB/SUBへのpublishを入れると、そこの処理だけで片道20ms~50ms、遅いときは全体で100ms以上かかってしまっていた。
サーバー台数の節約と、ユーザー側のストレス軽減も考えて大量のアクセスを瞬時に捌きたいのに。
ローカルネットワーク内に立てたTCP接続のRedisのPubSubであれば遅くても数msなので、話にならない。
で、今回はその中継に、静的言語なので処理速度も早く、簡単に非同期の並行処理を書けるGo言語で自前サーバー「GOPUBサーバー(仮)」を作ろうと思った。
フロント側は同一ローカルネットワーク内のGOPUBサーバーにTCP通信するので、HTTPSやネットワーク機器等、通信のオーバーヘッドが少ない。
さらにGOPUBサーバーは起動しっぱなしで、pub/subへのHTTPS接続を使いまわし、さらにgoroutineによる非同期の並行処理を使うので早く返せるだろうという考え。
図にするとこんな感じ。
Go自体が書き慣れていないので、いろんなところを切り貼りしたり、試行錯誤したのでコードは汚いけど、go文によるgoroutineの力は想像以上だった。
1万リクエストを直列で一つずつやった場合は、一つ100msぐらいかかり、時間を測る気にならず途中止めたけど、go文を使うと1万リクエストも1, 2秒程度でレスポンスできた。
goroutineの数を出してみると、4コア8スレッドの開発サーバーでは、最大で500ほど生成され、並列&並行処理がされていた。ver1.5以降は、デフォでCPUに合わせて並列処理も行ってくれる。
go文による非同期の並列&並行処理はHTTPS通信のようなCPU負荷は軽いけど、いろいろな待ち時間が長い処理には最適だと思う。
今のところGo言語で読んでる本は下記だけ。変に思えるGoの設計も、C言語との関わりなど説明があり納得できた。
- 作者: 松尾愛賀
- 出版社/メーカー: 翔泳社
- 発売日: 2016/05/11
- メディア: Kindle版
- この商品を含むブログを見る
コードもエラー処理、リトライ、バッファサイズの調整など、まだいろいろ手をいれる必要があるけど、TCPのlistenと、Googleの認証、Cloud PUB/SUBへのpublishを含めても1ファイルで100行程度で書けた。
package main import ( "log" "net" "os" "runtime" "strconv" "cloud.google.com/go/pubsub" "golang.org/x/net/context" ) func main() { ctx := context.Background() projectID := os.Getenv("PROJECT_ID") listenHost := os.Getenv("LISTEN_HOST") listenPort := os.Getenv("LISTEN_PORT") topicName := os.Getenv("GOPUB_TOPIC_NAME") log.Printf("runtime.GOMAXPROCS: %v\nprojectID: %v\nlistenHost: %v\nlistenPort:%v\ntopicName:%v", runtime.GOMAXPROCS(0), projectID, listenHost, listenPort, topicName) // Creates a client. p_client, err := pubsub.NewClient(ctx, projectID) if err != nil { log.Printf("Failed to create client: %v", err) return } topic := createTopicIfNotExists(ctx, p_client) var listen net.Listener listen, err = net.Listen("tcp", listenHost+":"+listenPort) if err != nil { log.Printf("Failed to listen tcp %v", err) return } defer listen.Close() for { conn, e := listen.Accept() if e != nil { log.Printf("Failed to accept %v", e) return } connection_handler(conn, ctx, topic) } } func connection_handler(conn net.Conn, ctx context.Context, topic *pubsub.Topic) { defer conn.Close() bufferSize, _ := strconv.ParseUint(os.Getenv("GOPUB_BUFFER_SIZE"), 10, 64) buf := make([]byte, bufferSize) for { n, err := conn.Read(buf) if err != nil { return } go publish_handler(ctx, topic, buf[:n]) conn.Write(buf[:n]) } } func publish_handler(ctx context.Context, topic *pubsub.Topic, buf []byte) { buf_copy := make([]byte, len(buf)) copy(buf_copy, buf) result := topic.Publish(ctx, &pubsub.Message{Data: buf_copy}) _, err := result.Get(ctx) if err != nil { return } } func createTopicIfNotExists(ctx context.Context, c *pubsub.Client) *pubsub.Topic { // Create a topic to subscribe to. topicName := os.Getenv("GOPUB_TOPIC_NAME") t := c.Topic(topicName) ok, err := t.Exists(ctx) if err != nil { log.Print(err) } if ok { return t } t, err = c.CreateTopic(ctx, topicName) if err != nil { log.Printf("Failed to create the topic: %v", err) } return t }
これもoceanusプロジェクト内に含める予定で、速度的に問題なければWEBアプリ部分から直接このGOPUBサーバーにデータを送る予定。
もちろん開発環境や、本番環境はすべてDockerを使用。
Googleの認証については別記事に書いた。
uyamazak.hatenablog.com
https://github.com/uyamazak/oceanus
Google Cloud PUB/SUBにデータを送ることができれば、データ処理基板であるCloud Dataflowや、まだアルファ版のCloud Functions からもリアルタイムでデータを受け取れるので、夢が広がります。