以前から本を買ったり気になっていたGo言語だけど、書き慣れてないし変更が多いと大変なので、使い所がなくPythonで全部済ましてしまっていた。
でも以前、Google Cloud PUB/SUBを使おうとした時、通信がHTTPSなのでレスポンスが遅く、WEBアプリ側から直接はとても使えないので、ローカルにRedisを立てて使っているのを思い出した。
cloud.google.com
どのくらい遅いかというと、ローカルだと10ms以下で返せていたAPI的なものが、Cloud PUB/SUBへのpublishを入れると、そこの処理だけで片道20ms~50ms、遅いときは全体で100ms以上かかってしまっていた。
サーバー台数の節約と、ユーザー側のストレス軽減も考えて大量のアクセスを瞬時に捌きたいのに。
ローカルネットワーク内に立てたTCP接続のRedisのPubSubであれば遅くても数msなので、話にならない。
で、今回はその中継に、静的言語なので処理速度も早く、簡単に非同期の並行処理を書けるGo言語で自前サーバー「GOPUBサーバー(仮)」を作ろうと思った。
フロント側は同一ローカルネットワーク内のGOPUBサーバーにTCP通信するので、HTTPSやネットワーク機器等、通信のオーバーヘッドが少ない。
さらにGOPUBサーバーは起動しっぱなしで、pub/subへのHTTPS接続を使いまわし、さらにgoroutineによる非同期の並行処理を使うので早く返せるだろうという考え。
図にするとこんな感じ。
Go自体が書き慣れていないので、いろんなところを切り貼りしたり、試行錯誤したのでコードは汚いけど、go文によるgoroutineの力は想像以上だった。
1万リクエストを直列で一つずつやった場合は、一つ100msぐらいかかり、時間を測る気にならず途中止めたけど、go文を使うと1万リクエストも1, 2秒程度でレスポンスできた。
goroutineの数を出してみると、4コア8スレッドの開発サーバーでは、最大で500ほど生成され、並列&並行処理がされていた。ver1.5以降は、デフォでCPUに合わせて並列処理も行ってくれる。
go文による非同期の並列&並行処理はHTTPS通信のようなCPU負荷は軽いけど、いろいろな待ち時間が長い処理には最適だと思う。
今のところGo言語で読んでる本は下記だけ。変に思えるGoの設計も、C言語との関わりなど説明があり納得できた。
コードもエラー処理、リトライ、バッファサイズの調整など、まだいろいろ手をいれる必要があるけど、TCPのlistenと、Googleの認証、Cloud PUB/SUBへのpublishを含めても1ファイルで100行程度で書けた。
package main
import (
"log"
"net"
"os"
"runtime"
"strconv"
"cloud.google.com/go/pubsub"
"golang.org/x/net/context"
)
func main() {
ctx := context.Background()
projectID := os.Getenv("PROJECT_ID")
listenHost := os.Getenv("LISTEN_HOST")
listenPort := os.Getenv("LISTEN_PORT")
topicName := os.Getenv("GOPUB_TOPIC_NAME")
log.Printf("runtime.GOMAXPROCS: %v\nprojectID: %v\nlistenHost: %v\nlistenPort:%v\ntopicName:%v",
runtime.GOMAXPROCS(0), projectID, listenHost, listenPort, topicName)
p_client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
log.Printf("Failed to create client: %v", err)
return
}
topic := createTopicIfNotExists(ctx, p_client)
var listen net.Listener
listen, err = net.Listen("tcp", listenHost+":"+listenPort)
if err != nil {
log.Printf("Failed to listen tcp %v", err)
return
}
defer listen.Close()
for {
conn, e := listen.Accept()
if e != nil {
log.Printf("Failed to accept %v", e)
return
}
connection_handler(conn, ctx, topic)
}
}
func connection_handler(conn net.Conn, ctx context.Context, topic *pubsub.Topic) {
defer conn.Close()
bufferSize, _ := strconv.ParseUint(os.Getenv("GOPUB_BUFFER_SIZE"), 10, 64)
buf := make([]byte, bufferSize)
for {
n, err := conn.Read(buf)
if err != nil {
return
}
go publish_handler(ctx, topic, buf[:n])
conn.Write(buf[:n])
}
}
func publish_handler(ctx context.Context, topic *pubsub.Topic, buf []byte) {
buf_copy := make([]byte, len(buf))
copy(buf_copy, buf)
result := topic.Publish(ctx, &pubsub.Message{Data: buf_copy})
_, err := result.Get(ctx)
if err != nil {
return
}
}
func createTopicIfNotExists(ctx context.Context, c *pubsub.Client) *pubsub.Topic {
topicName := os.Getenv("GOPUB_TOPIC_NAME")
t := c.Topic(topicName)
ok, err := t.Exists(ctx)
if err != nil {
log.Print(err)
}
if ok {
return t
}
t, err = c.CreateTopic(ctx, topicName)
if err != nil {
log.Printf("Failed to create the topic: %v", err)
}
return t
}
これもoceanusプロジェクト内に含める予定で、速度的に問題なければWEBアプリ部分から直接このGOPUBサーバーにデータを送る予定。
もちろん開発環境や、本番環境はすべてDockerを使用。
Googleの認証については別記事に書いた。
uyamazak.hatenablog.com
https://github.com/uyamazak/oceanus
Google Cloud PUB/SUBにデータを送ることができれば、データ処理基板であるCloud Dataflowや、まだアルファ版のCloud Functions からもリアルタイムでデータを受け取れるので、夢が広がります。