package chariot import ( "context" "fmt" "time" "cloud.google.com/go/pubsub" ) // IPubSubReceiver is an interface that allows Chariot to mock the pubsub.Subscription.Receive function for testing. type IPubSubReceiver interface { Receive(ctx context.Context, f func(context.Context, IPubSubMessage)) error } // PubSubReceiverSubscriptionWrapper wraps a pubsub.Subscription object in order to satisfy the IPubSubReceiver interface. type PubSubReceiverSubscriptionWrapper struct { client *pubsub.Client subscription *pubsub.Subscription } var AckDeadline = time.Minute // NewGooglePubSubReceiver subscribes to a Google PubSub subscription. // // The returned type satisfies the IPubSubReceiver interface. func NewGooglePubSubReceiver(projectID, subscriptionID string) (*PubSubReceiverSubscriptionWrapper, error) { ctx := context.Background() client, err := pubsub.NewClient(ctx, projectID) if err != nil { return nil, fmt.Errorf("Error creating pubsub client: %w", err) } var sub = client.Subscription(subscriptionID) if exists, err := sub.Exists(ctx); err != nil { return nil, fmt.Errorf("Error checking if subscription %q exists: %w", subscriptionID, err) } else if !exists { return nil, fmt.Errorf("Subscription %q does not exist", subscriptionID) } return &PubSubReceiverSubscriptionWrapper{ client: client, subscription: sub, }, nil } func (wpsr *PubSubReceiverSubscriptionWrapper) SetMaxOutstandingMessages(n int) { wpsr.subscription.ReceiveSettings.MaxOutstandingMessages = n } // Receive calls the `pubsub.Subscription.Receive` function, converts the pubsub.Message into an IPubSubMessage, then // passes the IPubSubMessage into the provided function `f`. func (wpsr *PubSubReceiverSubscriptionWrapper) Receive(ctx context.Context, f func(context.Context, IPubSubMessage)) error { return wpsr.subscription.Receive(ctx, func(fctx context.Context, msg *pubsub.Message) { f(fctx, NewPubSubMessageFromMessage(msg)) }) } func (wpsr *PubSubReceiverSubscriptionWrapper) Close() error { if wpsr.client != nil { err := wpsr.client.Close() wpsr.client = nil return err } return nil } // AckNacker represents the pubsub.Message Ack and Nack function definitions as an interface. // // After calling either Ack or Nack, all subsequent calls to both Ack and Nack must have no effect. type AckNacker interface { // Ack indicates successful processing of a Message passed to the Subscriber.Receive callback. // It should not be called on any other Message value. If message acknowledgement fails, the // Message will be redelivered. Client code must call Ack or Nack when finished for each // received Message. Calls to Ack or Nack have no effect after the first call. Ack() // Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback. // It should not be called on any other Message value. Nack will result in the Message being redelivered more // quickly than if it were allowed to expire. Client code must call Ack or Nack when finished for each received // Message. Calls to Ack or Nack have no effect after the first call. Nack() } // IPubSubMessage is an interface that contains all fields and methods of the pubsub.Message struct as methods. // // IPubSubMessage is needed since the pubsub.Message type alias has an unexported internal constructor and a // private AckHandler field that needs to be overridden for mocking/testing purposes. type IPubSubMessage interface { AckNacker // ID identifies this message. This ID is assigned by the server and is // populated for Messages obtained from a subscription. ID() string // Data is the actual data in the message. Data() []byte // Attributes represents the key-value pairs the current message is // labelled with. Attributes() map[string]string // PublishTime is the time at which the message was published. This is // populated by the server for Messages obtained from a subscription. PublishTime() time.Time // DeliveryAttempt is the number of times a message has been delivered. // This is part of the dead lettering feature that forwards messages that // fail to be processed (from nack/ack deadline timeout) to a dead letter topic. // If dead lettering is enabled, this will be set on all attempts, starting // with value 1. Otherwise, the value will be nil. DeliveryAttempt() *int // OrderingKey identifies related messages for which publish order should // be respected. If empty string is used, message will be sent unordered. OrderingKey() string } // PubSubMessage satisfies the IPubSubMessage interface so that tests can set custom Ack and Nack functions that are // private fields in the pubsub.Message struct. type PubSubMessage struct { // ackNack is used to Ack or Nack the PubSubMessage. After the Ack or // Nack function is called, the ackNack value is set to nil. ackNack AckNacker // id identifies this message. This ID is assigned by the server and is // populated for Messages obtained from a subscription. id string // data is the actual data in the message. data []byte // attributes represents the key-value pairs the current message is // labelled with. attributes map[string]string // publishTime is the time at which the message was published. This is // populated by the server for Messages obtained from a subscription. publishTime time.Time // deliveryAttempt is the number of times a message has been delivered. // This is part of the dead lettering feature that forwards messages that // fail to be processed (from nack/ack deadline timeout) to a dead letter topic. // If dead lettering is enabled, this will be set on all attempts, starting // with value 1. Otherwise, the value will be nil. deliveryAttempt *int // orderingKey identifies related messages for which publish order should // be respected. If empty string is used, message will be sent unordered. orderingKey string } // NewPubSubMessageFromMessage wraps a pubsub.Message struct into a struct that satisfies the IPubSubMessage interface. func NewPubSubMessageFromMessage(psm *pubsub.Message) *PubSubMessage { return &PubSubMessage{ id: psm.ID, data: psm.Data, attributes: psm.Attributes, publishTime: psm.PublishTime, deliveryAttempt: psm.DeliveryAttempt, orderingKey: psm.OrderingKey, ackNack: psm, } } // Ack implements the AckNacker interface. // // To satisfy the AckNacker interface, subsequent calls to Ack/Nack do nothing. func (psm *PubSubMessage) Ack() { if psm.ackNack != nil { psm.ackNack.Ack() psm.ackNack = nil } } // Nack implements the AckNacker interface. // // To satisfy the AckNacker interface, subsequent calls to Ack/Nack do nothing. func (psm *PubSubMessage) Nack() { if psm.ackNack != nil { psm.ackNack.Nack() psm.ackNack = nil } } // ID identifies this message. This ID is assigned by the server and is // populated for Messages obtained from a subscription. func (psm *PubSubMessage) ID() string { return psm.id } // Data is the actual data in the message. func (psm *PubSubMessage) Data() []byte { return psm.data } // Attributes represents the key-value pairs the current message is // labelled with. func (psm *PubSubMessage) Attributes() map[string]string { return psm.attributes } // PublishTime is the time at which the message was published. This is // populated by the server for Messages obtained from a subscription. func (psm *PubSubMessage) PublishTime() time.Time { return psm.publishTime } // DeliveryAttempt is the number of times a message has been delivered. // This is part of the dead lettering feature that forwards messages that // fail to be processed (from nack/ack deadline timeout) to a dead letter topic. // If dead lettering is enabled, this will be set on all attempts, starting // with value 1. Otherwise, the value will be nil. func (psm *PubSubMessage) DeliveryAttempt() *int { return psm.deliveryAttempt } // OrderingKey identifies related messages for which publish order should // be respected. If empty string is used, message will be sent unordered. func (psm *PubSubMessage) OrderingKey() string { return psm.orderingKey }