GAミント至上主義

Web Monomaniacal Developer.

Go言語でGoogle Cloud PUB/SUBへの高速HTTPS中継サーバーを作る

以前から本を買ったり気になっていたGo言語だけど、書き慣れてないし変更が多いと大変なので、使い所がなくPythonで全部済ましてしまっていた。


でも以前、Google Cloud PUB/SUBを使おうとした時、通信がHTTPSなのでレスポンスが遅く、WEBアプリ側から直接はとても使えないので、ローカルにRedisを立てて使っているのを思い出した。


cloud.google.com


どのくらい遅いかというと、ローカルだと10ms以下で返せていたAPI的なものが、Cloud PUB/SUBへのpublishを入れると、そこの処理だけで片道20ms~50ms、遅いときは全体で100ms以上かかってしまっていた。

サーバー台数の節約と、ユーザー側のストレス軽減も考えて大量のアクセスを瞬時に捌きたいのに。


ローカルネットワーク内に立てたTCP接続のRedisのPubSubであれば遅くても数msなので、話にならない。


で、今回はその中継に、静的言語なので処理速度も早く、簡単に非同期の並行処理を書けるGo言語で自前サーバー「GOPUBサーバー(仮)」を作ろうと思った。

フロント側は同一ローカルネットワーク内のGOPUBサーバーにTCP通信するので、HTTPSやネットワーク機器等、通信のオーバーヘッドが少ない。

さらにGOPUBサーバーは起動しっぱなしで、pub/subへのHTTPS接続を使いまわし、さらにgoroutineによる非同期の並行処理を使うので早く返せるだろうという考え。

図にするとこんな感じ。

f:id:uyamazak:20170321112530p:plain


Go自体が書き慣れていないので、いろんなところを切り貼りしたり、試行錯誤したのでコードは汚いけど、go文によるgoroutineの力は想像以上だった。

1万リクエストを直列で一つずつやった場合は、一つ100msぐらいかかり、時間を測る気にならず途中止めたけど、go文を使うと1万リクエストも1, 2秒程度でレスポンスできた。

goroutineの数を出してみると、4コア8スレッドの開発サーバーでは、最大で500ほど生成され、並列&並行処理がされていた。ver1.5以降は、デフォでCPUに合わせて並列処理も行ってくれる。

go文による非同期の並列&並行処理はHTTPS通信のようなCPU負荷は軽いけど、いろいろな待ち時間が長い処理には最適だと思う。

今のところGo言語で読んでる本は下記だけ。変に思えるGoの設計も、C言語との関わりなど説明があり納得できた。

スターティングGo言語

スターティングGo言語


コードもエラー処理、リトライ、バッファサイズの調整など、まだいろいろ手をいれる必要があるけど、TCPのlistenと、Googleの認証、Cloud PUB/SUBへのpublishを含めても1ファイルで100行程度で書けた。

package main

import (
	"log"
	"net"
	"os"
	"runtime"
	"strconv"

	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	projectID := os.Getenv("PROJECT_ID")
	listenHost := os.Getenv("LISTEN_HOST")
	listenPort := os.Getenv("LISTEN_PORT")
	topicName := os.Getenv("GOPUB_TOPIC_NAME")
	log.Printf("runtime.GOMAXPROCS: %v\nprojectID: %v\nlistenHost: %v\nlistenPort:%v\ntopicName:%v",
		runtime.GOMAXPROCS(0), projectID, listenHost, listenPort, topicName)
	// Creates a client.
	p_client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		log.Printf("Failed to create client: %v", err)
		return
	}
	topic := createTopicIfNotExists(ctx, p_client)
	var listen net.Listener

	listen, err = net.Listen("tcp", listenHost+":"+listenPort)
	if err != nil {
		log.Printf("Failed to listen tcp %v", err)
		return
	}
	defer listen.Close()

	for {
		conn, e := listen.Accept()
		if e != nil {
			log.Printf("Failed to accept %v", e)
			return
		}
		connection_handler(conn, ctx, topic)
	}
}

func connection_handler(conn net.Conn, ctx context.Context, topic *pubsub.Topic) {
	defer conn.Close()

	bufferSize, _ := strconv.ParseUint(os.Getenv("GOPUB_BUFFER_SIZE"), 10, 64)
	buf := make([]byte, bufferSize)

	for {
		n, err := conn.Read(buf)
		if err != nil {
			return
		}
		go publish_handler(ctx, topic, buf[:n])
		conn.Write(buf[:n])
	}
}

func publish_handler(ctx context.Context, topic *pubsub.Topic, buf []byte) {
	buf_copy := make([]byte, len(buf))
	copy(buf_copy, buf)

	result := topic.Publish(ctx, &pubsub.Message{Data: buf_copy})
	_, err := result.Get(ctx)
	if err != nil {
		return
	}
}

func createTopicIfNotExists(ctx context.Context, c *pubsub.Client) *pubsub.Topic {
	// Create a topic to subscribe to.
	topicName := os.Getenv("GOPUB_TOPIC_NAME")
	t := c.Topic(topicName)
	ok, err := t.Exists(ctx)
	if err != nil {
		log.Print(err)
	}
	if ok {
		return t
	}

	t, err = c.CreateTopic(ctx, topicName)
	if err != nil {
		log.Printf("Failed to create the topic: %v", err)
	}
	return t
}

これもoceanusプロジェクト内に含める予定で、速度的に問題なければWEBアプリ部分から直接このGOPUBサーバーにデータを送る予定。

もちろん開発環境や、本番環境はすべてDockerを使用。

Googleの認証については別記事に書いた。
uyamazak.hatenablog.com


https://github.com/uyamazak/oceanus

Google Cloud PUB/SUBにデータを送ることができれば、データ処理基板であるCloud Dataflowや、まだアルファ版のCloud Functions からもリアルタイムでデータを受け取れるので、夢が広がります。