package msgsvc import ( "context" "fmt" "cloud.google.com/go/pubsub" ) // PublishResult type publishResult struct { r *pubsub.PublishResult } func (r publishResult) Get(ctx context.Context) (serverID string, err error) { return r.r.Get(ctx) } // Topic type topic struct { *pubsub.Topic } func (t topic) Publish(ctx context.Context, msg messageItfc) publishResultItfc { pubsubMsg := &pubsub.Message{ Data: msg.Data(), Attributes: msg.Attributes(), OrderingKey: msg.OrderingKey(), } return publishResult{t.Topic.Publish(ctx, pubsubMsg)} } func (t topic) SetOrdering(val bool) { t.Topic.EnableMessageOrdering = val } // Subscription type subscription struct { *pubsub.Subscription } func (s subscription) Receive(ctx context.Context, f func(ctx context.Context, msg messageItfc)) error { handler := func(pubsubCtx context.Context, pubsubMsg *pubsub.Message) { myMessage := message{pubsubMsg} f(pubsubCtx, myMessage) } return s.Subscription.Receive(ctx, handler) } // NewClient func newClient(ctx context.Context, projectID string) (*client, error) { clnt, err := pubsub.NewClient(ctx, projectID) if err != nil { return nil, err } return &client{clnt}, nil } type client struct { m *pubsub.Client } func (c client) SubscriptionInProject(id string, projectID string) subscriptionItfc { return subscription{c.m.SubscriptionInProject(id, projectID)} } func (c client) TopicInProject(id string, projectID string) topicItfc { return topic{c.m.TopicInProject(id, projectID)} } func (c client) CreateSubscription(ctx context.Context, subscriptionID string, cfg subscriptionCfg) (subscriptionItfc, error) { // Create a new client which is scoped to a specific project. The c.m client, // which is not project scoped cannot be used for creating a subscription, // as CreateSubscription doesn't seem to have a CreateSubscriptionInProject // variant cl, err := pubsub.NewClient(ctx, cfg.projectID) if err != nil { return nil, fmt.Errorf("error creating new client: %w", err) } top := c.m.TopicInProject(cfg.topicName, cfg.projectID) sub, err := cl.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{ Topic: top, RetentionDuration: cfg.retentionDuration, ExpirationPolicy: cfg.expirationPolicy, EnableMessageOrdering: true, Filter: cfg.filter, }) return subscription{sub}, err } func newMessage(data []byte, attributes map[string]string) messageItfc { return message{&pubsub.Message{ Data: data, Attributes: attributes, }} } type message struct { *pubsub.Message } func (m message) ID() string { return m.Message.ID } func (m message) Data() []byte { return m.Message.Data } func (m message) Attributes() map[string]string { return m.Message.Attributes } func (m message) OrderingKey() string { return m.Message.OrderingKey } func (m message) SetOrderingKey(orderingKey string) { m.Message.OrderingKey = orderingKey }