Pub/Sub
gRPC · port 8085
Configuration
Set the emulator host environment variable:
$ export PUBSUB_EMULATOR_HOST=localhost:8085 # or use: eval $(localgcp env)
The Go, Python, Java, and Node.js Pub/Sub client libraries all respect this env var.
Go SDK example
Create a topic, create a subscription, publish a message, and receive it:
package main import ( "context" "fmt" "log" "time" "cloud.google.com/go/pubsub" ) func main() { ctx := context.Background() // PUBSUB_EMULATOR_HOST=localhost:8085 must be set client, err := pubsub.NewClient(ctx, "my-project") if err != nil { log.Fatal(err) } defer client.Close() // Create a topic topic, err := client.CreateTopic(ctx, "my-topic") if err != nil { log.Fatal(err) } // Create a subscription sub, err := client.CreateSubscription(ctx, "my-sub", pubsub.SubscriptionConfig{ Topic: topic, }) if err != nil { log.Fatal(err) } // Publish a message result := topic.Publish(ctx, &pubsub.Message{ Data: []byte("Hello, Pub/Sub!"), Attributes: map[string]string{"env": "local"}, }) id, err := result.Get(ctx) if err != nil { log.Fatal(err) } fmt.Printf("Published message ID: %s\n", id) // Receive messages ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) { fmt.Printf("Received: %s\n", msg.Data) msg.Ack() cancel() // stop after first message }) if err != nil && err != context.Canceled { log.Fatal(err) } }
Push subscriptions
Create a subscription with a push endpoint. localgcp will POST messages to your HTTP endpoint and auto-acknowledge on 2xx responses:
// Create a push subscription sub, err := client.CreateSubscription(ctx, "my-push-sub", pubsub.SubscriptionConfig{ Topic: topic, PushConfig: pubsub.PushConfig{ Endpoint: "http://localhost:9000/webhook", }, })
When a message is published to the topic, localgcp sends an HTTP POST to the configured endpoint with the message payload. If the endpoint returns a 2xx status, the message is acknowledged automatically.
Push/pull mutual exclusivity: A subscription is either push or pull, not both. If you create a subscription with a
PushConfig, it becomes a push subscription. Messages are delivered via HTTP POST and cannot be pulled via the Receive method.
Dead letter topics
Configure a dead letter topic to catch messages that fail delivery after a maximum number of attempts:
// Create a dead letter topic dlTopic, _ := client.CreateTopic(ctx, "my-dead-letters") // Create subscription with dead letter policy sub, err := client.CreateSubscription(ctx, "my-sub", pubsub.SubscriptionConfig{ Topic: topic, DeadLetterPolicy: &pubsub.DeadLetterPolicy{ DeadLetterTopic: dlTopic.String(), MaxDeliveryAttempts: 5, }, })
After 5 failed delivery attempts (nack or deadline exceeded), the message is forwarded to the dead letter topic.
Features
- Topic management -- create, get, list, delete
- Subscription management -- create, get, list, delete
- Publish and pull -- including StreamingPull for official client libraries
- Push subscriptions -- HTTP POST delivery with auto-ack
- Dead letter topics -- forward after N failed deliveries
- Message acknowledgment -- ack and deadline modification
- Fan-out -- multiple subscriptions each get all messages
Not yet supported
- Exactly-once delivery
- Message ordering
- Schema validation
- Filtering