GCP: Cloud Pub/Sub

CLI

トピックを作成する

gcloud pubsub topics create <TOPIC_NAME>

サブスクリプションを作成する

gcloud pubsub subscriptions create <SUBSCRIPTION_NAME> --topic <TOPIC_NAME>

トピックにメッセージを送信する

gcloud pubsub topics publish <TOPIC_NAME> --message 'Hello world.'

サブスクリプションの一覧を表示する

gcloud pubsub subscriptions list --format='table(name)'

Go でメッセージを Publish, Subscribe するアプリを作る

トピックを作成する

my-topicトピックを作成する。

gcloud pubsub topics create my-topic

サブスクリプションmy-subを作成する。

gcloud pubsub subscriptions create my-sub --topic my-topic

サービスアカウントを作成する

Pub/Sub パブリッシャー、Pub/Sub サブスクライバーの権限を持つサービスアカウントを作成する。

ACCOUNT='pubsub-quickstart'
PROJECT_ID='my-project'

gcloud beta iam service-accounts create ${ACCOUNT} \
    --description "pub/subのテスト用アカウント" \
    --display-name "pub/subテスト"

gcloud projects add-iam-policy-binding ${PROJECT_ID} \
  --member serviceAccount:${ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com \
  --role roles/pubsub.publisher

gcloud projects add-iam-policy-binding ${PROJECT_ID} \
  --member serviceAccount:${ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com \
  --role roles/pubsub.subscriber

サービスアカウントの鍵を生成する

~/key.jsonに生成された鍵が保存される。

gcloud iam service-accounts keys create ~/key.json \
  --iam-account ${ACCOUNT}@${PROJECT_ID}.iam.gserviceaccount.com

Go 言語用のクライアントライブラリをインストールする。

go get -u cloud.google.com/go/pubsub

Go のプロジェクトを作る。 ディレクトリ構成は次のようにする。

pub
  - main.go
sub
  - main.go

pub/main.goを書く。

package main

import (
 "context"
 "fmt"
 "log"

 "cloud.google.com/go/pubsub"
 "google.golang.org/api/option"
)

const (
 topic     string = "my-topic"                  // トピック名
 projectID string = "my-project"              // GCPのプロジェクト名
 credPath  string = "/home/user/key.json" // サービスアカウントの鍵(.json)のパス
)

func main() {
 ctx := context.Background()

 client := getPubsubClient(ctx, projectID, credPath)

 t := client.Topic(topic)
 result := t.Publish(ctx, &pubsub.Message{
  Data: []byte("test message."),
 })

 id, err := result.Get(ctx)
 if err != nil {
  log.Fatal("publish topic:", err)
  return
 }
 fmt.Printf("Published a message; msg ID: %v\n", id)
}

func getPubsubClient(ctx context.Context, projectID string, credPath string) (c *pubsub.Client) {
 c, err := pubsub.NewClient(ctx, projectID, option.WithCredentialsFile(credPath))
 if err != nil {
  log.Fatal("new client:", err)
 }
 return c
}

sub/main.goを書く。

package main

import (
 "context"
 "fmt"
 "log"
 "sync"

 "cloud.google.com/go/pubsub"
 "google.golang.org/api/option"
)

const (
 subscription string = "my-sub"                    // サブスクリプション名
 projectID    string = "my-project"              // GCPのプロジェクト名
 credPath     string = "/home/user/key.json" // サービスアカウントの鍵(.json)のパス
)

func getPubsubClient(ctx context.Context, projectID string, credPath string) (c *pubsub.Client) {
 c, err := pubsub.NewClient(ctx, projectID, option.WithCredentialsFile(credPath))
 if err != nil {
  log.Fatal("new client:", err)
 }
 return c
}

func main() {

 var mu sync.Mutex
 received := 0

 ctx := context.Background()
 client := getPubsubClient(ctx, projectID, credPath)

 sub := client.Subscription(subscription)
 cctx, cancel := context.WithCancel(ctx)

 err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
  msg.Ack()
  fmt.Printf("Got message: %q\n", string(msg.Data))
  mu.Lock()
  defer mu.Unlock()
  received++
  if received == 10 {
   cancel()
  }
 })
 if err != nil {
  log.Fatal("subscribe:", err)
  return
 }
}

2 つターミナルを開き、それぞれで次のコマンドを実行する。

  • go run sub/main.go
  • go run pub/main.go

パブリッシュしたメッセージがサブスクライブできていることが確認できる。