...

Source file src/edge-infra.dev/pkg/sds/emergencyaccess/msgsvc/pubsub.go

Documentation: edge-infra.dev/pkg/sds/emergencyaccess/msgsvc

     1  package msgsvc
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	"cloud.google.com/go/pubsub"
     8  )
     9  
    10  // PublishResult
    11  type publishResult struct {
    12  	r *pubsub.PublishResult
    13  }
    14  
    15  func (r publishResult) Get(ctx context.Context) (serverID string, err error) {
    16  	return r.r.Get(ctx)
    17  }
    18  
    19  // Topic
    20  type topic struct {
    21  	*pubsub.Topic
    22  }
    23  
    24  func (t topic) Publish(ctx context.Context, msg messageItfc) publishResultItfc {
    25  	pubsubMsg := &pubsub.Message{
    26  		Data:        msg.Data(),
    27  		Attributes:  msg.Attributes(),
    28  		OrderingKey: msg.OrderingKey(),
    29  	}
    30  	return publishResult{t.Topic.Publish(ctx, pubsubMsg)}
    31  }
    32  
    33  func (t topic) SetOrdering(val bool) {
    34  	t.Topic.EnableMessageOrdering = val
    35  }
    36  
    37  // Subscription
    38  type subscription struct {
    39  	*pubsub.Subscription
    40  }
    41  
    42  func (s subscription) Receive(ctx context.Context, f func(ctx context.Context, msg messageItfc)) error {
    43  	handler := func(pubsubCtx context.Context, pubsubMsg *pubsub.Message) {
    44  		myMessage := message{pubsubMsg}
    45  		f(pubsubCtx, myMessage)
    46  	}
    47  	return s.Subscription.Receive(ctx, handler)
    48  }
    49  
    50  // NewClient
    51  func newClient(ctx context.Context, projectID string) (*client, error) {
    52  	clnt, err := pubsub.NewClient(ctx, projectID)
    53  	if err != nil {
    54  		return nil, err
    55  	}
    56  	return &client{clnt}, nil
    57  }
    58  
    59  type client struct {
    60  	m *pubsub.Client
    61  }
    62  
    63  func (c client) SubscriptionInProject(id string, projectID string) subscriptionItfc {
    64  	return subscription{c.m.SubscriptionInProject(id, projectID)}
    65  }
    66  
    67  func (c client) TopicInProject(id string, projectID string) topicItfc {
    68  	return topic{c.m.TopicInProject(id, projectID)}
    69  }
    70  
    71  func (c client) CreateSubscription(ctx context.Context, subscriptionID string, cfg subscriptionCfg) (subscriptionItfc, error) {
    72  	// Create a new client which is scoped to a specific project. The c.m client,
    73  	// which is not project scoped cannot be used for creating a subscription,
    74  	// as CreateSubscription doesn't seem to have a CreateSubscriptionInProject
    75  	// variant
    76  	cl, err := pubsub.NewClient(ctx, cfg.projectID)
    77  	if err != nil {
    78  		return nil, fmt.Errorf("error creating new client: %w", err)
    79  	}
    80  	top := c.m.TopicInProject(cfg.topicName, cfg.projectID)
    81  	sub, err := cl.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{
    82  		Topic:                 top,
    83  		RetentionDuration:     cfg.retentionDuration,
    84  		ExpirationPolicy:      cfg.expirationPolicy,
    85  		EnableMessageOrdering: true,
    86  		Filter:                cfg.filter,
    87  	})
    88  	return subscription{sub}, err
    89  }
    90  
    91  func newMessage(data []byte, attributes map[string]string) messageItfc {
    92  	return message{&pubsub.Message{
    93  		Data:       data,
    94  		Attributes: attributes,
    95  	}}
    96  }
    97  
    98  type message struct {
    99  	*pubsub.Message
   100  }
   101  
   102  func (m message) ID() string {
   103  	return m.Message.ID
   104  }
   105  
   106  func (m message) Data() []byte {
   107  	return m.Message.Data
   108  }
   109  
   110  func (m message) Attributes() map[string]string {
   111  	return m.Message.Attributes
   112  }
   113  
   114  func (m message) OrderingKey() string {
   115  	return m.Message.OrderingKey
   116  }
   117  
   118  func (m message) SetOrderingKey(orderingKey string) {
   119  	m.Message.OrderingKey = orderingKey
   120  }
   121  

View as plain text