...

Source file src/cloud.google.com/go/internal/pubsub/message.go

Documentation: cloud.google.com/go/internal/pubsub

     1  // Copyright 2020 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  
    14  package pubsub
    15  
    16  import (
    17  	"context"
    18  	"time"
    19  )
    20  
    21  // AckHandler implements ack/nack handling.
    22  type AckHandler interface {
    23  	// OnAck processes a message ack.
    24  	OnAck()
    25  
    26  	// OnNack processes a message nack.
    27  	OnNack()
    28  
    29  	// OnAckWithResult processes a message ack and returns
    30  	// a result that shows if it succeeded.
    31  	OnAckWithResult() *AckResult
    32  
    33  	// OnNackWithResult processes a message nack and returns
    34  	// a result that shows if it succeeded.
    35  	OnNackWithResult() *AckResult
    36  }
    37  
    38  // Message represents a Pub/Sub message.
    39  type Message struct {
    40  	// ID identifies this message. This ID is assigned by the server and is
    41  	// populated for Messages obtained from a subscription.
    42  	//
    43  	// This field is read-only.
    44  	ID string
    45  
    46  	// Data is the actual data in the message.
    47  	Data []byte
    48  
    49  	// Attributes represents the key-value pairs the current message is
    50  	// labelled with.
    51  	Attributes map[string]string
    52  
    53  	// PublishTime is the time at which the message was published. This is
    54  	// populated by the server for Messages obtained from a subscription.
    55  	//
    56  	// This field is read-only.
    57  	PublishTime time.Time
    58  
    59  	// DeliveryAttempt is the number of times a message has been delivered.
    60  	// This is part of the dead lettering feature that forwards messages that
    61  	// fail to be processed (from nack/ack deadline timeout) to a dead letter topic.
    62  	// If dead lettering is enabled, this will be set on all attempts, starting
    63  	// with value 1. Otherwise, the value will be nil.
    64  	// This field is read-only.
    65  	DeliveryAttempt *int
    66  
    67  	// OrderingKey identifies related messages for which publish order should
    68  	// be respected. If empty string is used, message will be sent unordered.
    69  	OrderingKey string
    70  
    71  	// ackh handles Ack() or Nack().
    72  	ackh AckHandler
    73  }
    74  
    75  // Ack indicates successful processing of a Message passed to the Subscriber.Receive callback.
    76  // It should not be called on any other Message value.
    77  // If message acknowledgement fails, the Message will be redelivered.
    78  // Client code must call Ack or Nack when finished for each received Message.
    79  // Calls to Ack or Nack have no effect after the first call.
    80  func (m *Message) Ack() {
    81  	if m.ackh != nil {
    82  		m.ackh.OnAck()
    83  	}
    84  }
    85  
    86  // Nack indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback.
    87  // It should not be called on any other Message value.
    88  // Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
    89  // Client code must call Ack or Nack when finished for each received Message.
    90  // Calls to Ack or Nack have no effect after the first call.
    91  func (m *Message) Nack() {
    92  	if m.ackh != nil {
    93  		m.ackh.OnNack()
    94  	}
    95  }
    96  
    97  // AcknowledgeStatus represents the status of an Ack or Nack request.
    98  type AcknowledgeStatus int
    99  
   100  const (
   101  	// AcknowledgeStatusSuccess indicates the request was a success.
   102  	AcknowledgeStatusSuccess AcknowledgeStatus = iota
   103  	// AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions.
   104  	AcknowledgeStatusPermissionDenied
   105  	// AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error.
   106  	AcknowledgeStatusFailedPrecondition
   107  	// AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid.
   108  	AcknowledgeStatusInvalidAckID
   109  	// AcknowledgeStatusOther indicates another unknown error was returned.
   110  	AcknowledgeStatusOther
   111  )
   112  
   113  // AckResult holds the result from a call to Ack or Nack.
   114  type AckResult struct {
   115  	ready chan struct{}
   116  	res   AcknowledgeStatus
   117  	err   error
   118  }
   119  
   120  // Ready returns a channel that is closed when the result is ready.
   121  // When the Ready channel is closed, Get is guaranteed not to block.
   122  func (r *AckResult) Ready() <-chan struct{} { return r.ready }
   123  
   124  // Get returns the status and/or error result of a Ack, Nack, or Modack call.
   125  // Get blocks until the Ack/Nack completes or the context is done.
   126  func (r *AckResult) Get(ctx context.Context) (res AcknowledgeStatus, err error) {
   127  	// If the result is already ready, return it even if the context is done.
   128  	select {
   129  	case <-r.Ready():
   130  		return r.res, r.err
   131  	default:
   132  	}
   133  	select {
   134  	case <-ctx.Done():
   135  		// Explicitly return AcknowledgeStatusOther for context cancelled cases,
   136  		// since the default is success.
   137  		return AcknowledgeStatusOther, ctx.Err()
   138  	case <-r.Ready():
   139  		return r.res, r.err
   140  	}
   141  }
   142  
   143  // NewAckResult creates a AckResult.
   144  func NewAckResult() *AckResult {
   145  	return &AckResult{
   146  		ready: make(chan struct{}),
   147  	}
   148  }
   149  
   150  // SetAckResult sets the ack response and error for a ack result and closes
   151  // the Ready channel. Any call after the first for the same AckResult
   152  // is a no-op.
   153  func SetAckResult(r *AckResult, res AcknowledgeStatus, err error) {
   154  	select {
   155  	case <-r.Ready():
   156  		return
   157  	default:
   158  		r.res = res
   159  		r.err = err
   160  		close(r.ready)
   161  	}
   162  }
   163  
   164  // AckWithResult acknowledges a message in Pub/Sub and it will not be
   165  // delivered to this subscription again.
   166  //
   167  // You should avoid acknowledging messages until you have
   168  // *finished* processing them, so that in the event of a failure,
   169  // you receive the message again.
   170  //
   171  // If exactly-once delivery is enabled on the subscription, the
   172  // AckResult returned by this method tracks the state of acknowledgement
   173  // operation. If the operation completes successfully, the message is
   174  // guaranteed NOT to be re-delivered. Otherwise, the result will
   175  // contain an error with more details about the failure and the
   176  // message may be re-delivered.
   177  //
   178  // If exactly-once delivery is NOT enabled on the subscription, or
   179  // if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success.
   180  // Since acks in Cloud Pub/Sub are best effort when exactly-once
   181  // delivery is disabled, the message may be re-delivered. Because
   182  // re-deliveries are possible, you should ensure that your processing
   183  // code is idempotent, as you may receive any given message more than
   184  // once.
   185  func (m *Message) AckWithResult() *AckResult {
   186  	if m.ackh != nil {
   187  		return m.ackh.OnAckWithResult()
   188  	}
   189  	// When the message was constructed directly rather passed in the callback in `sub.Receive`,
   190  	// ready the message with success so calling `AckResult.Get` doesn't panic.
   191  	return newSuccessAckResult()
   192  }
   193  
   194  // NackWithResult declines to acknowledge the message which indicates that
   195  // the client will not or cannot process a Message. This will cause the message
   196  // to be re-delivered to subscribers. Re-deliveries may take place immediately
   197  // or after a delay.
   198  //
   199  // If exactly-once delivery is enabled on the subscription, the
   200  // AckResult returned by this method tracks the state of nack
   201  // operation. If the operation completes successfully, the result will
   202  // contain AckResponse.Success. Otherwise, the result will contain an error
   203  // with more details about the failure.
   204  //
   205  // If exactly-once delivery is NOT enabled on the subscription, or
   206  // if using Pub/Sub Lite, AckResult readies immediately with a AcknowledgeStatus.Success.
   207  func (m *Message) NackWithResult() *AckResult {
   208  	if m.ackh != nil {
   209  		return m.ackh.OnNackWithResult()
   210  	}
   211  	// When the message was constructed directly rather passed in the callback in `sub.Receive`,
   212  	// ready the message with success so calling `AckResult.Get` doesn't panic.
   213  	return newSuccessAckResult()
   214  }
   215  
   216  // NewMessage creates a message with an AckHandler implementation, which should
   217  // not be nil.
   218  func NewMessage(ackh AckHandler) *Message {
   219  	return &Message{ackh: ackh}
   220  }
   221  
   222  // MessageAckHandler provides access to the internal field Message.ackh.
   223  func MessageAckHandler(m *Message) AckHandler {
   224  	return m.ackh
   225  }
   226  
   227  func newSuccessAckResult() *AckResult {
   228  	ar := NewAckResult()
   229  	SetAckResult(ar, AcknowledgeStatusSuccess, nil)
   230  	return ar
   231  }
   232  

View as plain text