1 // Copyright 2020 Google LLC 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 14 package pubsub 15 16 import ( 17 "context" 18 "time" 19 ) 20 21 // AckHandler implements ack/nack handling. 22 type AckHandler interface { 23 // OnAck processes a message ack. 24 OnAck() 25 26 // OnNack processes a message nack. 27 OnNack() 28 29 // OnAckWithResult processes a message ack and returns 30 // a result that shows if it succeeded. 31 OnAckWithResult() *AckResult 32 33 // OnNackWithResult processes a message nack and returns 34 // a result that shows if it succeeded. 35 OnNackWithResult() *AckResult 36 } 37 38 // Message represents a Pub/Sub message. 39 type Message struct { 40 // ID identifies this message. This ID is assigned by the server and is 41 // populated for Messages obtained from a subscription. 42 // 43 // This field is read-only. 44 ID string 45 46 // Data is the actual data in the message. 47 Data []byte 48 49 // Attributes represents the key-value pairs the current message is 50 // labelled with. 51 Attributes map[string]string 52 53 // PublishTime is the time at which the message was published. This is 54 // populated by the server for Messages obtained from a subscription. 55 // 56 // This field is read-only. 57 PublishTime time.Time 58 59 // DeliveryAttempt is the number of times a message has been delivered. 60 // This is part of the dead lettering feature that forwards messages that 61 // fail to be processed (from nack/ack deadline timeout) to a dead letter topic. 62 // If dead lettering is enabled, this will be set on all attempts, starting 63 // with value 1. Otherwise, the value will be nil. 64 // This field is read-only. 65 DeliveryAttempt *int 66 67 // OrderingKey identifies related messages for which publish order should 68 // be respected. If empty string is used, message will be sent unordered. 69 OrderingKey string 70 71 // ackh handles Ack() or Nack(). 72 ackh AckHandler 73 } 74 75 // Ack indicates successful processing of a Message passed to the Subscriber.Receive callback. 76 // It should not be called on any other Message value. 77 // If message acknowledgement fails, the Message will be redelivered. 78 // Client code must call Ack or Nack when finished for each received Message. 79 // Calls to Ack or Nack have no effect after the first call. 80 func (m *Message) Ack() { 81 if m.ackh != nil { 82 m.ackh.OnAck() 83 } 84 } 85 86 // Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback. 87 // It should not be called on any other Message value. 88 // Nack will result in the Message being redelivered more quickly than if it were allowed to expire. 89 // Client code must call Ack or Nack when finished for each received Message. 90 // Calls to Ack or Nack have no effect after the first call. 91 func (m *Message) Nack() { 92 if m.ackh != nil { 93 m.ackh.OnNack() 94 } 95 } 96 97 // AcknowledgeStatus represents the status of an Ack or Nack request. 98 type AcknowledgeStatus int 99 100 const ( 101 // AcknowledgeStatusSuccess indicates the request was a success. 102 AcknowledgeStatusSuccess AcknowledgeStatus = iota 103 // AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions. 104 AcknowledgeStatusPermissionDenied 105 // AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error. 106 AcknowledgeStatusFailedPrecondition 107 // AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid. 108 AcknowledgeStatusInvalidAckID 109 // AcknowledgeStatusOther indicates another unknown error was returned. 110 AcknowledgeStatusOther 111 ) 112 113 // AckResult holds the result from a call to Ack or Nack. 114 type AckResult struct { 115 ready chan struct{} 116 res AcknowledgeStatus 117 err error 118 } 119 120 // Ready returns a channel that is closed when the result is ready. 121 // When the Ready channel is closed, Get is guaranteed not to block. 122 func (r *AckResult) Ready() <-chan struct{} { return r.ready } 123 124 // Get returns the status and/or error result of a Ack, Nack, or Modack call. 125 // Get blocks until the Ack/Nack completes or the context is done. 126 func (r *AckResult) Get(ctx context.Context) (res AcknowledgeStatus, err error) { 127 // If the result is already ready, return it even if the context is done. 128 select { 129 case <-r.Ready(): 130 return r.res, r.err 131 default: 132 } 133 select { 134 case <-ctx.Done(): 135 // Explicitly return AcknowledgeStatusOther for context cancelled cases, 136 // since the default is success. 137 return AcknowledgeStatusOther, ctx.Err() 138 case <-r.Ready(): 139 return r.res, r.err 140 } 141 } 142 143 // NewAckResult creates a AckResult. 144 func NewAckResult() *AckResult { 145 return &AckResult{ 146 ready: make(chan struct{}), 147 } 148 } 149 150 // SetAckResult sets the ack response and error for a ack result and closes 151 // the Ready channel. Any call after the first for the same AckResult 152 // is a no-op. 153 func SetAckResult(r *AckResult, res AcknowledgeStatus, err error) { 154 select { 155 case <-r.Ready(): 156 return 157 default: 158 r.res = res 159 r.err = err 160 close(r.ready) 161 } 162 } 163 164 // AckWithResult acknowledges a message in Pub/Sub and it will not be 165 // delivered to this subscription again. 166 // 167 // You should avoid acknowledging messages until you have 168 // *finished* processing them, so that in the event of a failure, 169 // you receive the message again. 170 // 171 // If exactly-once delivery is enabled on the subscription, the 172 // AckResult returned by this method tracks the state of acknowledgement 173 // operation. If the operation completes successfully, the message is 174 // guaranteed NOT to be re-delivered. Otherwise, the result will 175 // contain an error with more details about the failure and the 176 // message may be re-delivered. 177 // 178 // If exactly-once delivery is NOT enabled on the subscription, or 179 // if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success. 180 // Since acks in Cloud Pub/Sub are best effort when exactly-once 181 // delivery is disabled, the message may be re-delivered. Because 182 // re-deliveries are possible, you should ensure that your processing 183 // code is idempotent, as you may receive any given message more than 184 // once. 185 func (m *Message) AckWithResult() *AckResult { 186 if m.ackh != nil { 187 return m.ackh.OnAckWithResult() 188 } 189 // When the message was constructed directly rather passed in the callback in `sub.Receive`, 190 // ready the message with success so calling `AckResult.Get` doesn't panic. 191 return newSuccessAckResult() 192 } 193 194 // NackWithResult declines to acknowledge the message which indicates that 195 // the client will not or cannot process a Message. This will cause the message 196 // to be re-delivered to subscribers. Re-deliveries may take place immediately 197 // or after a delay. 198 // 199 // If exactly-once delivery is enabled on the subscription, the 200 // AckResult returned by this method tracks the state of nack 201 // operation. If the operation completes successfully, the result will 202 // contain AckResponse.Success. Otherwise, the result will contain an error 203 // with more details about the failure. 204 // 205 // If exactly-once delivery is NOT enabled on the subscription, or 206 // if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success. 207 func (m *Message) NackWithResult() *AckResult { 208 if m.ackh != nil { 209 return m.ackh.OnNackWithResult() 210 } 211 // When the message was constructed directly rather passed in the callback in `sub.Receive`, 212 // ready the message with success so calling `AckResult.Get` doesn't panic. 213 return newSuccessAckResult() 214 } 215 216 // NewMessage creates a message with an AckHandler implementation, which should 217 // not be nil. 218 func NewMessage(ackh AckHandler) *Message { 219 return &Message{ackh: ackh} 220 } 221 222 // MessageAckHandler provides access to the internal field Message.ackh. 223 func MessageAckHandler(m *Message) AckHandler { 224 return m.ackh 225 } 226 227 func newSuccessAckResult() *AckResult { 228 ar := NewAckResult() 229 SetAckResult(ar, AcknowledgeStatusSuccess, nil) 230 return ar 231 } 232