package pubsub import ( "context" "fmt" "sync" "time" "cloud.google.com/go/pubsub" "google.golang.org/api/option" ) const schema = "projects/%s/schemas/%s" // Manager contains a Client and a Project ID to make requests against type Manager struct { Client *pubsub.Client ProjectID string topicMap *sync.Map } // New creates a new Manager for a given project id, with the provided // googleAppCredsPath. if one isn't provided, it lets the google libraries // resolve auth. func New(ctx context.Context, projectID string) (Manager, error) { client, err := pubsub.NewClient(ctx, projectID) if err != nil { return Manager{}, err } return Manager{ Client: client, ProjectID: projectID, topicMap: &sync.Map{}, }, nil } // NewWithOptions creates a new Manager for a given project id, with the provided options. func NewWithOptions(ctx context.Context, projectID string, opts ...option.ClientOption) (Manager, error) { client, err := pubsub.NewClient(ctx, projectID, opts...) if err != nil { return Manager{}, err } return Manager{ Client: client, ProjectID: projectID, topicMap: &sync.Map{}, }, nil } // CreateSchema returns the correct schema format. func (p *Manager) CreateSchema(schemaID string) string { return fmt.Sprintf(schema, p.ProjectID, schemaID) } // CreateTopic creates a pubsub topic. func (p Manager) CreateTopic(ctx context.Context, topicID string) (*pubsub.Topic, error) { return p.Client.CreateTopic(ctx, topicID) } // DeleteTopic deletes a pubsub topic // with the supplied topic id. func (p Manager) DeleteTopic(ctx context.Context, topicID string) error { topic := p.Client.Topic(topicID) return topic.Delete(ctx) } // Send publishes a pubsub message to the supplied // topic and applies attributes func (p Manager) Send(ctx context.Context, topicID string, message []byte, attributes map[string]string) error { t, ok := p.topicMap.Load(topicID) if !ok { newTopic := p.Client.Topic(topicID) t, _ = p.topicMap.LoadOrStore(topicID, newTopic) } topic := t.(*pubsub.Topic) result := topic.Publish(ctx, &pubsub.Message{ Data: message, Attributes: attributes, }) _, err := result.Get(ctx) return err } // CreateSubscription creates a pubsub subscription with the provided subscription id // topic and acknowledgement deadline. func (p Manager) CreateSubscription(ctx context.Context, topicID, subscriptionID string, ackDeadline time.Duration, filter string) (*pubsub.Subscription, error) { topic := p.Client.Topic(topicID) if ackDeadline == 0 { ackDeadline = 20 * time.Second } return p.Client.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{ Topic: topic, AckDeadline: ackDeadline, Filter: filter, }) }