...

Source file src/cloud.google.com/go/pubsub/message.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2016 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"fmt"
    19  	"time"
    20  
    21  	ipubsub "cloud.google.com/go/internal/pubsub"
    22  	pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
    23  )
    24  
    25  // Message represents a Pub/Sub message.
    26  //
    27  // Message can be passed to Topic.Publish for publishing.
    28  //
    29  // If received in the callback passed to Subscription.Receive, client code must
    30  // call Message.Ack or Message.Nack when finished processing the Message. Calls
    31  // to Ack or Nack have no effect after the first call.
    32  //
    33  // Ack indicates successful processing of a Message. If message acknowledgement
    34  // fails, the Message will be redelivered. Nack indicates that the client will
    35  // not or cannot process a Message. Nack will result in the Message being
    36  // redelivered more quickly than if it were allowed to expire.
    37  //
    38  // If using exactly once delivery, you should call Message.AckWithResult and
    39  // Message.NackWithResult instead. These methods will return an AckResult,
    40  // which tracks the state of acknowledgement operation. If the AckResult returns
    41  // successful, the message is guaranteed NOT to be re-delivered. Otherwise,
    42  // the AckResult will return an error with more details about the failure
    43  // and the message may be re-delivered.
    44  type Message = ipubsub.Message
    45  
    46  // msgAckHandler performs a safe cast of the message's ack handler to psAckHandler.
    47  func msgAckHandler(m *Message, eod bool) (*psAckHandler, bool) {
    48  	ackh, ok := ipubsub.MessageAckHandler(m).(*psAckHandler)
    49  	ackh.exactlyOnceDelivery = eod
    50  	return ackh, ok
    51  }
    52  
    53  func msgAckID(m *Message) string {
    54  	if ackh, ok := msgAckHandler(m, false); ok {
    55  		return ackh.ackID
    56  	}
    57  	return ""
    58  }
    59  
    60  // The done method of the iterator that created a Message.
    61  type iterDoneFunc func(string, bool, *AckResult, time.Time)
    62  
    63  func convertMessages(rms []*pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDoneFunc) ([]*Message, error) {
    64  	msgs := make([]*Message, 0, len(rms))
    65  	for i, m := range rms {
    66  		msg, err := toMessage(m, receiveTime, doneFunc)
    67  		if err != nil {
    68  			return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m)
    69  		}
    70  		msgs = append(msgs, msg)
    71  	}
    72  	return msgs, nil
    73  }
    74  
    75  func toMessage(resp *pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDoneFunc) (*Message, error) {
    76  	ackh := &psAckHandler{ackID: resp.AckId}
    77  	msg := ipubsub.NewMessage(ackh)
    78  	if resp.Message == nil {
    79  		return msg, nil
    80  	}
    81  
    82  	pubTime := resp.Message.PublishTime.AsTime()
    83  
    84  	var deliveryAttempt *int
    85  	if resp.DeliveryAttempt > 0 {
    86  		da := int(resp.DeliveryAttempt)
    87  		deliveryAttempt = &da
    88  	}
    89  
    90  	msg.Data = resp.Message.Data
    91  	msg.Attributes = resp.Message.Attributes
    92  	msg.ID = resp.Message.MessageId
    93  	msg.PublishTime = pubTime
    94  	msg.DeliveryAttempt = deliveryAttempt
    95  	msg.OrderingKey = resp.Message.OrderingKey
    96  	ackh.receiveTime = receiveTime
    97  	ackh.doneFunc = doneFunc
    98  	ackh.ackResult = ipubsub.NewAckResult()
    99  	return msg, nil
   100  }
   101  
   102  // AckResult holds the result from a call to Ack or Nack.
   103  //
   104  // Call Get to obtain the result of the Ack/NackWithResult call. Example:
   105  //
   106  //	// Get blocks until Ack/NackWithResult completes or ctx is done.
   107  //	ackStatus, err := r.Get(ctx)
   108  //	if err != nil {
   109  //	    // TODO: Handle error.
   110  //	}
   111  type AckResult = ipubsub.AckResult
   112  
   113  // AcknowledgeStatus represents the status of an Ack or Nack request.
   114  type AcknowledgeStatus = ipubsub.AcknowledgeStatus
   115  
   116  const (
   117  	// AcknowledgeStatusSuccess indicates the request was a success.
   118  	AcknowledgeStatusSuccess AcknowledgeStatus = iota
   119  	// AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions.
   120  	AcknowledgeStatusPermissionDenied
   121  	// AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error.
   122  	AcknowledgeStatusFailedPrecondition
   123  	// AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid.
   124  	AcknowledgeStatusInvalidAckID
   125  	// AcknowledgeStatusOther indicates another unknown error was returned.
   126  	AcknowledgeStatusOther
   127  )
   128  
   129  // psAckHandler handles ack/nack for the pubsub package.
   130  type psAckHandler struct {
   131  	// ackID is the identifier to acknowledge this message.
   132  	ackID string
   133  
   134  	// receiveTime is the time the message was received by the client.
   135  	receiveTime time.Time
   136  
   137  	calledDone bool
   138  
   139  	// The done method of the iterator that created this Message.
   140  	doneFunc iterDoneFunc
   141  
   142  	// the ack result that will be returned for this ack handler
   143  	// if AckWithResult or NackWithResult is called.
   144  	ackResult *AckResult
   145  
   146  	// exactlyOnceDelivery determines if the message needs to be delivered
   147  	// exactly once.
   148  	exactlyOnceDelivery bool
   149  }
   150  
   151  func (ah *psAckHandler) OnAck() {
   152  	ah.done(true)
   153  }
   154  
   155  func (ah *psAckHandler) OnNack() {
   156  	ah.done(false)
   157  }
   158  
   159  func (ah *psAckHandler) OnAckWithResult() *AckResult {
   160  	// call done with true to indicate ack.
   161  	ah.done(true)
   162  	if !ah.exactlyOnceDelivery {
   163  		return newSuccessAckResult()
   164  	}
   165  	return ah.ackResult
   166  }
   167  
   168  func (ah *psAckHandler) OnNackWithResult() *AckResult {
   169  	// call done with false to indicate nack.
   170  	ah.done(false)
   171  	if !ah.exactlyOnceDelivery {
   172  		return newSuccessAckResult()
   173  	}
   174  	return ah.ackResult
   175  }
   176  
   177  func (ah *psAckHandler) done(ack bool) {
   178  	if ah.calledDone {
   179  		return
   180  	}
   181  	ah.calledDone = true
   182  	if ah.doneFunc != nil {
   183  		ah.doneFunc(ah.ackID, ack, ah.ackResult, ah.receiveTime)
   184  	}
   185  }
   186  
   187  // newSuccessAckResult returns an AckResult that resolves to success immediately.
   188  func newSuccessAckResult() *AckResult {
   189  	ar := ipubsub.NewAckResult()
   190  	ipubsub.SetAckResult(ar, AcknowledgeStatusSuccess, nil)
   191  	return ar
   192  }
   193  

View as plain text