読者です 読者をやめる 読者になる 読者になる

uyamazakのブログ

仕事中の問題と解決メモ。PythonとGoogle Cloudがメイン。bizoceanで新規事業の企画と開発担当。 BigQueryを使ったビッグデータ収集・解析・リアルタイム処理プロジェクト進行中 https://github.com/uyamazak/oceanus

Go言語でGoogle Cloud PUB/SUBへの高速HTTPS中継サーバーを作る

以前から本を買ったり気になっていたGo言語だけど、書き慣れてないし変更が多いと大変なので、使い所がなくPythonで全部済ましてしまっていた。


でも以前、Google Cloud PUB/SUBを使おうとした時、通信がHTTPSなのでレスポンスが遅く、WEBアプリ側から直接はとても使えないので、ローカルにRedisを立てて使っているのを思い出した。


cloud.google.com


どのくらい遅いかというと、ローカルだと10ms以下で返せていたAPI的なものが、Cloud PUB/SUBへのpublishを入れると、そこの処理だけで片道20ms~50ms、遅いときは全体で100ms以上かかってしまっていた。

サーバー台数の節約と、ユーザー側のストレス軽減も考えて大量のアクセスを瞬時に捌きたいのに。


ローカルネットワーク内に立てたTCP接続のRedisのPubSubであれば遅くても数msなので、話にならない。


で、今回はその中継に、静的言語なので処理速度も早く、簡単に非同期の並行処理を書けるGo言語で自前サーバー「GOPUBサーバー(仮)」を作ろうと思った。

フロント側はローカル内のGOPUBサーバーに渡すので、HTTPSや外部のネットワーク等、通信のオーバーヘッドが少なく、GOPUBサーバーは起動しっぱなしで、HTTPS接続を使いまわし、さらにgoroutineによる並行処理を使い、非同期で早く返せるだろうという考え。

図にするとこんな感じ。

f:id:uyamazak:20170321112530p:plain


Go自体が書き慣れていないので、いろんなところを切り貼りしたり、試行錯誤したのでコードは汚いけど、go文によるgoroutineの力は想像以上だった。

1万リクエストを直列で一つずつやった場合は、一つ100msぐらいかかり、時間を測る気にならず途中止めたけど、go文を使うと1万リクエストも1, 2秒程度でレスポンスできた。

goroutineの数を出してみると、4コア8スレッドの開発サーバーでは、最大で500ほど生成され、並列&並行処理がされていた。ver1.5以降は、デフォでCPUに合わせて並列処理も行ってくれる。

go文による非同期の並列&並行処理はHTTPS通信のような処理は軽いけど待ち時間が長い処理には最適だと思う。

今のところGo言語で読んでる本は下記だけ。変に思えるGoの設計も、C言語との関わりなど説明があり納得できた。

スターティングGo言語

スターティングGo言語


コードもエラー処理、リトライ、バッファサイズの調整など、まだいろいろ手をいれる必要があるけど、TCPのlistenと、Googleの認証、Cloud PUB/SUBへのpublishを含めても1ファイルで100行程度で書けた。

package main

import (
	"log"
	"net"
	"os"
	"runtime"
	"strconv"

	"cloud.google.com/go/pubsub"
	"golang.org/x/net/context"
)

func main() {
	ctx := context.Background()
	projectID := os.Getenv("PROJECT_ID")
	listenHost := os.Getenv("LISTEN_HOST")
	listenPort := os.Getenv("LISTEN_PORT")
	topicName := os.Getenv("GOPUB_TOPIC_NAME")
	log.Printf("runtime.GOMAXPROCS: %v\nprojectID: %v\nlistenHost: %v\nlistenPort:%v\ntopicName:%v",
		runtime.GOMAXPROCS(0), projectID, listenHost, listenPort, topicName)
	// Creates a client.
	p_client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		log.Printf("Failed to create client: %v", err)
		return
	}
	topic := createTopicIfNotExists(ctx, p_client)
	var listen net.Listener

	listen, err = net.Listen("tcp", listenHost+":"+listenPort)
	if err != nil {
		log.Printf("Failed to listen tcp %v", err)
		return
	}
	defer listen.Close()

	for {
		conn, e := listen.Accept()
		if e != nil {
			log.Printf("Failed to accept %v", e)
			return
		}
		connection_handler(conn, ctx, topic)
	}
}

func connection_handler(conn net.Conn, ctx context.Context, topic *pubsub.Topic) {
	defer conn.Close()

	bufferSize, _ := strconv.ParseUint(os.Getenv("GOPUB_BUFFER_SIZE"), 10, 64)
	buf := make([]byte, bufferSize)

	for {
		n, err := conn.Read(buf)
		if err != nil {
			return
		}
		go publish_handler(ctx, topic, buf[:n])
		conn.Write(buf[:n])
	}
}

func publish_handler(ctx context.Context, topic *pubsub.Topic, buf []byte) {
	buf_copy := make([]byte, len(buf))
	copy(buf_copy, buf)

	result := topic.Publish(ctx, &pubsub.Message{Data: buf_copy})
	_, err := result.Get(ctx)
	if err != nil {
		return
	}
}

func createTopicIfNotExists(ctx context.Context, c *pubsub.Client) *pubsub.Topic {
	// Create a topic to subscribe to.
	topicName := os.Getenv("GOPUB_TOPIC_NAME")
	t := c.Topic(topicName)
	ok, err := t.Exists(ctx)
	if err != nil {
		log.Print(err)
	}
	if ok {
		return t
	}

	t, err = c.CreateTopic(ctx, topicName)
	if err != nil {
		log.Printf("Failed to create the topic: %v", err)
	}
	return t
}

これもoceanusプロジェクト内に含める予定で、速度的に問題なければWEBアプリ部分から直接このGOPUBサーバーにデータを送る予定。

もちろん開発環境や、本番環境はすべてDockerを使用。

Googleの認証については別記事に書いた。
uyamazak.hatenablog.com


https://github.com/uyamazak/oceanus

Google Cloud PUB/SUBにデータを送ることができれば、データ処理基板であるCloud Dataflowや、まだアルファ版のCloud Functions からもリアルタイムでデータを受け取れるので、夢が広がります。