...

Source file src/edge-infra.dev/pkg/lib/gcp/pubsub/pubsub.go

Documentation: edge-infra.dev/pkg/lib/gcp/pubsub

     1  package pubsub
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"sync"
     7  	"time"
     8  
     9  	"cloud.google.com/go/pubsub"
    10  	"google.golang.org/api/option"
    11  )
    12  
    13  const schema = "projects/%s/schemas/%s"
    14  
    15  // Manager contains a Client and a Project ID to make requests against
    16  type Manager struct {
    17  	Client    *pubsub.Client
    18  	ProjectID string
    19  	topicMap  *sync.Map
    20  }
    21  
    22  // New creates a new Manager for a given project id, with the provided
    23  // googleAppCredsPath. if one isn't provided, it lets the google libraries
    24  // resolve auth.
    25  func New(ctx context.Context, projectID string) (Manager, error) {
    26  	client, err := pubsub.NewClient(ctx, projectID)
    27  	if err != nil {
    28  		return Manager{}, err
    29  	}
    30  
    31  	return Manager{
    32  		Client:    client,
    33  		ProjectID: projectID,
    34  		topicMap:  &sync.Map{},
    35  	}, nil
    36  }
    37  
    38  // NewWithOptions creates a new Manager for a given project id, with the provided options.
    39  func NewWithOptions(ctx context.Context, projectID string, opts ...option.ClientOption) (Manager, error) {
    40  	client, err := pubsub.NewClient(ctx, projectID, opts...)
    41  	if err != nil {
    42  		return Manager{}, err
    43  	}
    44  
    45  	return Manager{
    46  		Client:    client,
    47  		ProjectID: projectID,
    48  		topicMap:  &sync.Map{},
    49  	}, nil
    50  }
    51  
    52  // CreateSchema returns the correct schema format.
    53  func (p *Manager) CreateSchema(schemaID string) string {
    54  	return fmt.Sprintf(schema, p.ProjectID, schemaID)
    55  }
    56  
    57  // CreateTopic creates a pubsub topic.
    58  func (p Manager) CreateTopic(ctx context.Context, topicID string) (*pubsub.Topic, error) {
    59  	return p.Client.CreateTopic(ctx, topicID)
    60  }
    61  
    62  // DeleteTopic deletes a pubsub topic
    63  // with the supplied topic id.
    64  func (p Manager) DeleteTopic(ctx context.Context, topicID string) error {
    65  	topic := p.Client.Topic(topicID)
    66  	return topic.Delete(ctx)
    67  }
    68  
    69  // Send publishes a pubsub message to the supplied
    70  // topic and applies attributes
    71  func (p Manager) Send(ctx context.Context, topicID string, message []byte, attributes map[string]string) error {
    72  	t, ok := p.topicMap.Load(topicID)
    73  
    74  	if !ok {
    75  		newTopic := p.Client.Topic(topicID)
    76  		t, _ = p.topicMap.LoadOrStore(topicID, newTopic)
    77  	}
    78  	topic := t.(*pubsub.Topic)
    79  	result := topic.Publish(ctx, &pubsub.Message{
    80  		Data:       message,
    81  		Attributes: attributes,
    82  	})
    83  	_, err := result.Get(ctx)
    84  	return err
    85  }
    86  
    87  // CreateSubscription creates a pubsub subscription with the provided subscription id
    88  // topic and acknowledgement deadline.
    89  func (p Manager) CreateSubscription(ctx context.Context, topicID, subscriptionID string, ackDeadline time.Duration, filter string) (*pubsub.Subscription, error) {
    90  	topic := p.Client.Topic(topicID)
    91  	if ackDeadline == 0 {
    92  		ackDeadline = 20 * time.Second
    93  	}
    94  	return p.Client.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{
    95  		Topic:       topic,
    96  		AckDeadline: ackDeadline,
    97  		Filter:      filter,
    98  	})
    99  }
   100  

View as plain text