...

Source file src/edge-infra.dev/pkg/edge/chariot/pubsubpull.go

Documentation: edge-infra.dev/pkg/edge/chariot

     1  package chariot
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"time"
     7  
     8  	"cloud.google.com/go/pubsub"
     9  )
    10  
    11  // IPubSubReceiver is an interface that allows Chariot to mock the pubsub.Subscription.Receive function for testing.
    12  type IPubSubReceiver interface {
    13  	Receive(ctx context.Context, f func(context.Context, IPubSubMessage)) error
    14  }
    15  
    16  // PubSubReceiverSubscriptionWrapper wraps a pubsub.Subscription object in order to satisfy the IPubSubReceiver interface.
    17  type PubSubReceiverSubscriptionWrapper struct {
    18  	client       *pubsub.Client
    19  	subscription *pubsub.Subscription
    20  }
    21  
    22  var AckDeadline = time.Minute
    23  
    24  // NewGooglePubSubReceiver subscribes to a Google PubSub subscription.
    25  //
    26  // The returned type satisfies the IPubSubReceiver interface.
    27  func NewGooglePubSubReceiver(projectID, subscriptionID string) (*PubSubReceiverSubscriptionWrapper, error) {
    28  	ctx := context.Background()
    29  	client, err := pubsub.NewClient(ctx, projectID)
    30  	if err != nil {
    31  		return nil, fmt.Errorf("Error creating pubsub client: %w", err)
    32  	}
    33  
    34  	var sub = client.Subscription(subscriptionID)
    35  
    36  	if exists, err := sub.Exists(ctx); err != nil {
    37  		return nil, fmt.Errorf("Error checking if subscription %q exists: %w", subscriptionID, err)
    38  	} else if !exists {
    39  		return nil, fmt.Errorf("Subscription %q does not exist", subscriptionID)
    40  	}
    41  
    42  	return &PubSubReceiverSubscriptionWrapper{
    43  		client:       client,
    44  		subscription: sub,
    45  	}, nil
    46  }
    47  
    48  func (wpsr *PubSubReceiverSubscriptionWrapper) SetMaxOutstandingMessages(n int) {
    49  	wpsr.subscription.ReceiveSettings.MaxOutstandingMessages = n
    50  }
    51  
    52  // Receive calls the `pubsub.Subscription.Receive` function, converts the pubsub.Message into an IPubSubMessage, then
    53  // passes the IPubSubMessage into the provided function `f`.
    54  func (wpsr *PubSubReceiverSubscriptionWrapper) Receive(ctx context.Context, f func(context.Context, IPubSubMessage)) error {
    55  	return wpsr.subscription.Receive(ctx, func(fctx context.Context, msg *pubsub.Message) {
    56  		f(fctx, NewPubSubMessageFromMessage(msg))
    57  	})
    58  }
    59  
    60  func (wpsr *PubSubReceiverSubscriptionWrapper) Close() error {
    61  	if wpsr.client != nil {
    62  		err := wpsr.client.Close()
    63  		wpsr.client = nil
    64  		return err
    65  	}
    66  	return nil
    67  }
    68  
    69  // AckNacker represents the pubsub.Message Ack and Nack function definitions as an interface.
    70  //
    71  // After calling either Ack or Nack, all subsequent calls to both Ack and Nack must have no effect.
    72  type AckNacker interface {
    73  	// Ack indicates successful processing of a Message passed to the Subscriber.Receive callback.
    74  	// It should not be called on any other Message value. If message acknowledgement fails, the
    75  	// Message will be redelivered. Client code must call Ack or Nack when finished for each
    76  	// received Message. Calls to Ack or Nack have no effect after the first call.
    77  	Ack()
    78  
    79  	// Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback.
    80  	// It should not be called on any other Message value. Nack will result in the Message being redelivered more
    81  	// quickly than if it were allowed to expire. Client code must call Ack or Nack when finished for each received
    82  	// Message. Calls to Ack or Nack have no effect after the first call.
    83  	Nack()
    84  }
    85  
    86  // IPubSubMessage is an interface that contains all fields and methods of the pubsub.Message struct as methods.
    87  //
    88  // IPubSubMessage is needed since the pubsub.Message type alias has an unexported internal constructor and a
    89  // private AckHandler field that needs to be overridden for mocking/testing purposes.
    90  type IPubSubMessage interface {
    91  	AckNacker
    92  
    93  	// ID identifies this message. This ID is assigned by the server and is
    94  	// populated for Messages obtained from a subscription.
    95  	ID() string
    96  
    97  	// Data is the actual data in the message.
    98  	Data() []byte
    99  
   100  	// Attributes represents the key-value pairs the current message is
   101  	// labelled with.
   102  	Attributes() map[string]string
   103  
   104  	// PublishTime is the time at which the message was published. This is
   105  	// populated by the server for Messages obtained from a subscription.
   106  	PublishTime() time.Time
   107  
   108  	// DeliveryAttempt is the number of times a message has been delivered.
   109  	// This is part of the dead lettering feature that forwards messages that
   110  	// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
   111  	// If dead lettering is enabled, this will be set on all attempts, starting
   112  	// with value 1. Otherwise, the value will be nil.
   113  	DeliveryAttempt() *int
   114  
   115  	// OrderingKey identifies related messages for which publish order should
   116  	// be respected. If empty string is used, message will be sent unordered.
   117  	OrderingKey() string
   118  }
   119  
   120  // PubSubMessage satisfies the IPubSubMessage interface so that tests can set custom Ack and Nack functions that are
   121  // private fields in the pubsub.Message struct.
   122  type PubSubMessage struct {
   123  	// ackNack is used to Ack or Nack the PubSubMessage. After the Ack or
   124  	// Nack function is called, the ackNack value is set to nil.
   125  	ackNack AckNacker
   126  
   127  	// id identifies this message. This ID is assigned by the server and is
   128  	// populated for Messages obtained from a subscription.
   129  	id string
   130  
   131  	// data is the actual data in the message.
   132  	data []byte
   133  
   134  	// attributes represents the key-value pairs the current message is
   135  	// labelled with.
   136  	attributes map[string]string
   137  
   138  	// publishTime is the time at which the message was published. This is
   139  	// populated by the server for Messages obtained from a subscription.
   140  	publishTime time.Time
   141  
   142  	// deliveryAttempt is the number of times a message has been delivered.
   143  	// This is part of the dead lettering feature that forwards messages that
   144  	// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
   145  	// If dead lettering is enabled, this will be set on all attempts, starting
   146  	// with value 1. Otherwise, the value will be nil.
   147  	deliveryAttempt *int
   148  
   149  	// orderingKey identifies related messages for which publish order should
   150  	// be respected. If empty string is used, message will be sent unordered.
   151  	orderingKey string
   152  }
   153  
   154  // NewPubSubMessageFromMessage wraps a pubsub.Message struct into a struct that satisfies the IPubSubMessage interface.
   155  func NewPubSubMessageFromMessage(psm *pubsub.Message) *PubSubMessage {
   156  	return &PubSubMessage{
   157  		id:              psm.ID,
   158  		data:            psm.Data,
   159  		attributes:      psm.Attributes,
   160  		publishTime:     psm.PublishTime,
   161  		deliveryAttempt: psm.DeliveryAttempt,
   162  		orderingKey:     psm.OrderingKey,
   163  		ackNack:         psm,
   164  	}
   165  }
   166  
   167  // Ack implements the AckNacker interface.
   168  //
   169  // To satisfy the AckNacker interface, subsequent calls to Ack/Nack do nothing.
   170  func (psm *PubSubMessage) Ack() {
   171  	if psm.ackNack != nil {
   172  		psm.ackNack.Ack()
   173  		psm.ackNack = nil
   174  	}
   175  }
   176  
   177  // Nack implements the AckNacker interface.
   178  //
   179  // To satisfy the AckNacker interface, subsequent calls to Ack/Nack do nothing.
   180  func (psm *PubSubMessage) Nack() {
   181  	if psm.ackNack != nil {
   182  		psm.ackNack.Nack()
   183  		psm.ackNack = nil
   184  	}
   185  }
   186  
   187  // ID identifies this message. This ID is assigned by the server and is
   188  // populated for Messages obtained from a subscription.
   189  func (psm *PubSubMessage) ID() string {
   190  	return psm.id
   191  }
   192  
   193  // Data is the actual data in the message.
   194  func (psm *PubSubMessage) Data() []byte {
   195  	return psm.data
   196  }
   197  
   198  // Attributes represents the key-value pairs the current message is
   199  // labelled with.
   200  func (psm *PubSubMessage) Attributes() map[string]string {
   201  	return psm.attributes
   202  }
   203  
   204  // PublishTime is the time at which the message was published. This is
   205  // populated by the server for Messages obtained from a subscription.
   206  func (psm *PubSubMessage) PublishTime() time.Time {
   207  	return psm.publishTime
   208  }
   209  
   210  // DeliveryAttempt is the number of times a message has been delivered.
   211  // This is part of the dead lettering feature that forwards messages that
   212  // fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
   213  // If dead lettering is enabled, this will be set on all attempts, starting
   214  // with value 1. Otherwise, the value will be nil.
   215  func (psm *PubSubMessage) DeliveryAttempt() *int {
   216  	return psm.deliveryAttempt
   217  }
   218  
   219  // OrderingKey identifies related messages for which publish order should
   220  // be respected. If empty string is used, message will be sent unordered.
   221  func (psm *PubSubMessage) OrderingKey() string {
   222  	return psm.orderingKey
   223  }
   224  

View as plain text