// Copyright 2016 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pubsub import ( "fmt" "time" ipubsub "cloud.google.com/go/internal/pubsub" pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" ) // Message represents a Pub/Sub message. // // Message can be passed to Topic.Publish for publishing. // // If received in the callback passed to Subscription.Receive, client code must // call Message.Ack or Message.Nack when finished processing the Message. Calls // to Ack or Nack have no effect after the first call. // // Ack indicates successful processing of a Message. If message acknowledgement // fails, the Message will be redelivered. Nack indicates that the client will // not or cannot process a Message. Nack will result in the Message being // redelivered more quickly than if it were allowed to expire. // // If using exactly once delivery, you should call Message.AckWithResult and // Message.NackWithResult instead. These methods will return an AckResult, // which tracks the state of acknowledgement operation. If the AckResult returns // successful, the message is guaranteed NOT to be re-delivered. Otherwise, // the AckResult will return an error with more details about the failure // and the message may be re-delivered. type Message = ipubsub.Message // msgAckHandler performs a safe cast of the message's ack handler to psAckHandler. func msgAckHandler(m *Message, eod bool) (*psAckHandler, bool) { ackh, ok := ipubsub.MessageAckHandler(m).(*psAckHandler) ackh.exactlyOnceDelivery = eod return ackh, ok } func msgAckID(m *Message) string { if ackh, ok := msgAckHandler(m, false); ok { return ackh.ackID } return "" } // The done method of the iterator that created a Message. type iterDoneFunc func(string, bool, *AckResult, time.Time) func convertMessages(rms []*pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDoneFunc) ([]*Message, error) { msgs := make([]*Message, 0, len(rms)) for i, m := range rms { msg, err := toMessage(m, receiveTime, doneFunc) if err != nil { return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m) } msgs = append(msgs, msg) } return msgs, nil } func toMessage(resp *pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDoneFunc) (*Message, error) { ackh := &psAckHandler{ackID: resp.AckId} msg := ipubsub.NewMessage(ackh) if resp.Message == nil { return msg, nil } pubTime := resp.Message.PublishTime.AsTime() var deliveryAttempt *int if resp.DeliveryAttempt > 0 { da := int(resp.DeliveryAttempt) deliveryAttempt = &da } msg.Data = resp.Message.Data msg.Attributes = resp.Message.Attributes msg.ID = resp.Message.MessageId msg.PublishTime = pubTime msg.DeliveryAttempt = deliveryAttempt msg.OrderingKey = resp.Message.OrderingKey ackh.receiveTime = receiveTime ackh.doneFunc = doneFunc ackh.ackResult = ipubsub.NewAckResult() return msg, nil } // AckResult holds the result from a call to Ack or Nack. // // Call Get to obtain the result of the Ack/NackWithResult call. Example: // // // Get blocks until Ack/NackWithResult completes or ctx is done. // ackStatus, err := r.Get(ctx) // if err != nil { // // TODO: Handle error. // } type AckResult = ipubsub.AckResult // AcknowledgeStatus represents the status of an Ack or Nack request. type AcknowledgeStatus = ipubsub.AcknowledgeStatus const ( // AcknowledgeStatusSuccess indicates the request was a success. AcknowledgeStatusSuccess AcknowledgeStatus = iota // AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions. AcknowledgeStatusPermissionDenied // AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error. AcknowledgeStatusFailedPrecondition // AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid. AcknowledgeStatusInvalidAckID // AcknowledgeStatusOther indicates another unknown error was returned. AcknowledgeStatusOther ) // psAckHandler handles ack/nack for the pubsub package. type psAckHandler struct { // ackID is the identifier to acknowledge this message. ackID string // receiveTime is the time the message was received by the client. receiveTime time.Time calledDone bool // The done method of the iterator that created this Message. doneFunc iterDoneFunc // the ack result that will be returned for this ack handler // if AckWithResult or NackWithResult is called. ackResult *AckResult // exactlyOnceDelivery determines if the message needs to be delivered // exactly once. exactlyOnceDelivery bool } func (ah *psAckHandler) OnAck() { ah.done(true) } func (ah *psAckHandler) OnNack() { ah.done(false) } func (ah *psAckHandler) OnAckWithResult() *AckResult { // call done with true to indicate ack. ah.done(true) if !ah.exactlyOnceDelivery { return newSuccessAckResult() } return ah.ackResult } func (ah *psAckHandler) OnNackWithResult() *AckResult { // call done with false to indicate nack. ah.done(false) if !ah.exactlyOnceDelivery { return newSuccessAckResult() } return ah.ackResult } func (ah *psAckHandler) done(ack bool) { if ah.calledDone { return } ah.calledDone = true if ah.doneFunc != nil { ah.doneFunc(ah.ackID, ack, ah.ackResult, ah.receiveTime) } } // newSuccessAckResult returns an AckResult that resolves to success immediately. func newSuccessAckResult() *AckResult { ar := ipubsub.NewAckResult() ipubsub.SetAckResult(ar, AcknowledgeStatusSuccess, nil) return ar }