Pub/Sub

gRPC · port 8085

Configuration

Set the emulator host environment variable:

$ export PUBSUB_EMULATOR_HOST=localhost:8085
# or use: eval $(localgcp env)

The Go, Python, Java, and Node.js Pub/Sub client libraries all respect this env var.

Go SDK example

Create a topic, create a subscription, publish a message, and receive it:

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "cloud.google.com/go/pubsub"
)

func main() {
    ctx := context.Background()

    // PUBSUB_EMULATOR_HOST=localhost:8085 must be set
    client, err := pubsub.NewClient(ctx, "my-project")
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Create a topic
    topic, err := client.CreateTopic(ctx, "my-topic")
    if err != nil {
        log.Fatal(err)
    }

    // Create a subscription
    sub, err := client.CreateSubscription(ctx, "my-sub", pubsub.SubscriptionConfig{
        Topic: topic,
    })
    if err != nil {
        log.Fatal(err)
    }

    // Publish a message
    result := topic.Publish(ctx, &pubsub.Message{
        Data: []byte("Hello, Pub/Sub!"),
        Attributes: map[string]string{"env": "local"},
    })
    id, err := result.Get(ctx)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Published message ID: %s\n", id)

    // Receive messages
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
        fmt.Printf("Received: %s\n", msg.Data)
        msg.Ack()
        cancel() // stop after first message
    })
    if err != nil && err != context.Canceled {
        log.Fatal(err)
    }
}

Push subscriptions

Create a subscription with a push endpoint. localgcp will POST messages to your HTTP endpoint and auto-acknowledge on 2xx responses:

// Create a push subscription
sub, err := client.CreateSubscription(ctx, "my-push-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    PushConfig: pubsub.PushConfig{
        Endpoint: "http://localhost:9000/webhook",
    },
})

When a message is published to the topic, localgcp sends an HTTP POST to the configured endpoint with the message payload. If the endpoint returns a 2xx status, the message is acknowledged automatically.

Push/pull mutual exclusivity: A subscription is either push or pull, not both. If you create a subscription with a PushConfig, it becomes a push subscription. Messages are delivered via HTTP POST and cannot be pulled via the Receive method.

Dead letter topics

Configure a dead letter topic to catch messages that fail delivery after a maximum number of attempts:

// Create a dead letter topic
dlTopic, _ := client.CreateTopic(ctx, "my-dead-letters")

// Create subscription with dead letter policy
sub, err := client.CreateSubscription(ctx, "my-sub", pubsub.SubscriptionConfig{
    Topic: topic,
    DeadLetterPolicy: &pubsub.DeadLetterPolicy{
        DeadLetterTopic:     dlTopic.String(),
        MaxDeliveryAttempts: 5,
    },
})

After 5 failed delivery attempts (nack or deadline exceeded), the message is forwarded to the dead letter topic.

Features

Not yet supported