1 package chariot 2 3 import ( 4 "context" 5 "fmt" 6 "time" 7 8 "cloud.google.com/go/pubsub" 9 ) 10 11 // IPubSubReceiver is an interface that allows Chariot to mock the pubsub.Subscription.Receive function for testing. 12 type IPubSubReceiver interface { 13 Receive(ctx context.Context, f func(context.Context, IPubSubMessage)) error 14 } 15 16 // PubSubReceiverSubscriptionWrapper wraps a pubsub.Subscription object in order to satisfy the IPubSubReceiver interface. 17 type PubSubReceiverSubscriptionWrapper struct { 18 client *pubsub.Client 19 subscription *pubsub.Subscription 20 } 21 22 var AckDeadline = time.Minute 23 24 // NewGooglePubSubReceiver subscribes to a Google PubSub subscription. 25 // 26 // The returned type satisfies the IPubSubReceiver interface. 27 func NewGooglePubSubReceiver(projectID, subscriptionID string) (*PubSubReceiverSubscriptionWrapper, error) { 28 ctx := context.Background() 29 client, err := pubsub.NewClient(ctx, projectID) 30 if err != nil { 31 return nil, fmt.Errorf("Error creating pubsub client: %w", err) 32 } 33 34 var sub = client.Subscription(subscriptionID) 35 36 if exists, err := sub.Exists(ctx); err != nil { 37 return nil, fmt.Errorf("Error checking if subscription %q exists: %w", subscriptionID, err) 38 } else if !exists { 39 return nil, fmt.Errorf("Subscription %q does not exist", subscriptionID) 40 } 41 42 return &PubSubReceiverSubscriptionWrapper{ 43 client: client, 44 subscription: sub, 45 }, nil 46 } 47 48 func (wpsr *PubSubReceiverSubscriptionWrapper) SetMaxOutstandingMessages(n int) { 49 wpsr.subscription.ReceiveSettings.MaxOutstandingMessages = n 50 } 51 52 // Receive calls the `pubsub.Subscription.Receive` function, converts the pubsub.Message into an IPubSubMessage, then 53 // passes the IPubSubMessage into the provided function `f`. 54 func (wpsr *PubSubReceiverSubscriptionWrapper) Receive(ctx context.Context, f func(context.Context, IPubSubMessage)) error { 55 return wpsr.subscription.Receive(ctx, func(fctx context.Context, msg *pubsub.Message) { 56 f(fctx, NewPubSubMessageFromMessage(msg)) 57 }) 58 } 59 60 func (wpsr *PubSubReceiverSubscriptionWrapper) Close() error { 61 if wpsr.client != nil { 62 err := wpsr.client.Close() 63 wpsr.client = nil 64 return err 65 } 66 return nil 67 } 68 69 // AckNacker represents the pubsub.Message Ack and Nack function definitions as an interface. 70 // 71 // After calling either Ack or Nack, all subsequent calls to both Ack and Nack must have no effect. 72 type AckNacker interface { 73 // Ack indicates successful processing of a Message passed to the Subscriber.Receive callback. 74 // It should not be called on any other Message value. If message acknowledgement fails, the 75 // Message will be redelivered. Client code must call Ack or Nack when finished for each 76 // received Message. Calls to Ack or Nack have no effect after the first call. 77 Ack() 78 79 // Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback. 80 // It should not be called on any other Message value. Nack will result in the Message being redelivered more 81 // quickly than if it were allowed to expire. Client code must call Ack or Nack when finished for each received 82 // Message. Calls to Ack or Nack have no effect after the first call. 83 Nack() 84 } 85 86 // IPubSubMessage is an interface that contains all fields and methods of the pubsub.Message struct as methods. 87 // 88 // IPubSubMessage is needed since the pubsub.Message type alias has an unexported internal constructor and a 89 // private AckHandler field that needs to be overridden for mocking/testing purposes. 90 type IPubSubMessage interface { 91 AckNacker 92 93 // ID identifies this message. This ID is assigned by the server and is 94 // populated for Messages obtained from a subscription. 95 ID() string 96 97 // Data is the actual data in the message. 98 Data() []byte 99 100 // Attributes represents the key-value pairs the current message is 101 // labelled with. 102 Attributes() map[string]string 103 104 // PublishTime is the time at which the message was published. This is 105 // populated by the server for Messages obtained from a subscription. 106 PublishTime() time.Time 107 108 // DeliveryAttempt is the number of times a message has been delivered. 109 // This is part of the dead lettering feature that forwards messages that 110 // fail to be processed (from nack/ack deadline timeout) to a dead letter topic. 111 // If dead lettering is enabled, this will be set on all attempts, starting 112 // with value 1. Otherwise, the value will be nil. 113 DeliveryAttempt() *int 114 115 // OrderingKey identifies related messages for which publish order should 116 // be respected. If empty string is used, message will be sent unordered. 117 OrderingKey() string 118 } 119 120 // PubSubMessage satisfies the IPubSubMessage interface so that tests can set custom Ack and Nack functions that are 121 // private fields in the pubsub.Message struct. 122 type PubSubMessage struct { 123 // ackNack is used to Ack or Nack the PubSubMessage. After the Ack or 124 // Nack function is called, the ackNack value is set to nil. 125 ackNack AckNacker 126 127 // id identifies this message. This ID is assigned by the server and is 128 // populated for Messages obtained from a subscription. 129 id string 130 131 // data is the actual data in the message. 132 data []byte 133 134 // attributes represents the key-value pairs the current message is 135 // labelled with. 136 attributes map[string]string 137 138 // publishTime is the time at which the message was published. This is 139 // populated by the server for Messages obtained from a subscription. 140 publishTime time.Time 141 142 // deliveryAttempt is the number of times a message has been delivered. 143 // This is part of the dead lettering feature that forwards messages that 144 // fail to be processed (from nack/ack deadline timeout) to a dead letter topic. 145 // If dead lettering is enabled, this will be set on all attempts, starting 146 // with value 1. Otherwise, the value will be nil. 147 deliveryAttempt *int 148 149 // orderingKey identifies related messages for which publish order should 150 // be respected. If empty string is used, message will be sent unordered. 151 orderingKey string 152 } 153 154 // NewPubSubMessageFromMessage wraps a pubsub.Message struct into a struct that satisfies the IPubSubMessage interface. 155 func NewPubSubMessageFromMessage(psm *pubsub.Message) *PubSubMessage { 156 return &PubSubMessage{ 157 id: psm.ID, 158 data: psm.Data, 159 attributes: psm.Attributes, 160 publishTime: psm.PublishTime, 161 deliveryAttempt: psm.DeliveryAttempt, 162 orderingKey: psm.OrderingKey, 163 ackNack: psm, 164 } 165 } 166 167 // Ack implements the AckNacker interface. 168 // 169 // To satisfy the AckNacker interface, subsequent calls to Ack/Nack do nothing. 170 func (psm *PubSubMessage) Ack() { 171 if psm.ackNack != nil { 172 psm.ackNack.Ack() 173 psm.ackNack = nil 174 } 175 } 176 177 // Nack implements the AckNacker interface. 178 // 179 // To satisfy the AckNacker interface, subsequent calls to Ack/Nack do nothing. 180 func (psm *PubSubMessage) Nack() { 181 if psm.ackNack != nil { 182 psm.ackNack.Nack() 183 psm.ackNack = nil 184 } 185 } 186 187 // ID identifies this message. This ID is assigned by the server and is 188 // populated for Messages obtained from a subscription. 189 func (psm *PubSubMessage) ID() string { 190 return psm.id 191 } 192 193 // Data is the actual data in the message. 194 func (psm *PubSubMessage) Data() []byte { 195 return psm.data 196 } 197 198 // Attributes represents the key-value pairs the current message is 199 // labelled with. 200 func (psm *PubSubMessage) Attributes() map[string]string { 201 return psm.attributes 202 } 203 204 // PublishTime is the time at which the message was published. This is 205 // populated by the server for Messages obtained from a subscription. 206 func (psm *PubSubMessage) PublishTime() time.Time { 207 return psm.publishTime 208 } 209 210 // DeliveryAttempt is the number of times a message has been delivered. 211 // This is part of the dead lettering feature that forwards messages that 212 // fail to be processed (from nack/ack deadline timeout) to a dead letter topic. 213 // If dead lettering is enabled, this will be set on all attempts, starting 214 // with value 1. Otherwise, the value will be nil. 215 func (psm *PubSubMessage) DeliveryAttempt() *int { 216 return psm.deliveryAttempt 217 } 218 219 // OrderingKey identifies related messages for which publish order should 220 // be respected. If empty string is used, message will be sent unordered. 221 func (psm *PubSubMessage) OrderingKey() string { 222 return psm.orderingKey 223 } 224