...

Source file src/cloud.google.com/go/pubsub/iterator.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2016 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"io"
    21  	"log"
    22  	"strings"
    23  	"sync"
    24  	"time"
    25  
    26  	ipubsub "cloud.google.com/go/internal/pubsub"
    27  	vkit "cloud.google.com/go/pubsub/apiv1"
    28  	pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
    29  	"cloud.google.com/go/pubsub/internal/distribution"
    30  	gax "github.com/googleapis/gax-go/v2"
    31  	"github.com/googleapis/gax-go/v2/apierror"
    32  	"google.golang.org/grpc"
    33  	"google.golang.org/grpc/codes"
    34  	"google.golang.org/grpc/status"
    35  	"google.golang.org/protobuf/encoding/protowire"
    36  )
    37  
    38  // Between message receipt and ack (that is, the time spent processing a message) we want to extend the message
    39  // deadline by way of modack. However, we don't want to extend the deadline right as soon as the deadline expires;
    40  // instead, we'd want to extend the deadline a little bit of time ahead. gracePeriod is that amount of time ahead
    41  // of the actual deadline.
    42  const gracePeriod = 5 * time.Second
    43  
    44  // ackIDBatchSize is the maximum number of ACK IDs to send in a single Ack/Modack RPC.
    45  // The backend imposes a maximum request size limit of 524288 bytes (512 KiB) per
    46  // acknowledge / modifyAckDeadline request. ACK IDs have a maximum size of 164
    47  // bytes, thus we cannot send more than 524288/176 ~= 2979 ACK IDs in an Ack/ModAc
    48  
    49  // Accounting for some overhead, we should thus only send a maximum of 2500 ACK
    50  // IDs at a time.
    51  // This is a var such that it can be modified for tests.
    52  const ackIDBatchSize int = 2500
    53  
    54  // These are vars so tests can change them.
    55  var (
    56  	maxDurationPerLeaseExtension            = 10 * time.Minute
    57  	minDurationPerLeaseExtension            = 10 * time.Second
    58  	minDurationPerLeaseExtensionExactlyOnce = 1 * time.Minute
    59  
    60  	// The total amount of time to retry acks/modacks with exactly once delivery enabled subscriptions.
    61  	exactlyOnceDeliveryRetryDeadline = 600 * time.Second
    62  )
    63  
    64  type messageIterator struct {
    65  	ctx        context.Context
    66  	cancel     func() // the function that will cancel ctx; called in stop
    67  	po         *pullOptions
    68  	ps         *pullStream
    69  	subc       *vkit.SubscriberClient
    70  	subName    string
    71  	kaTick     <-chan time.Time // keep-alive (deadline extensions)
    72  	ackTicker  *time.Ticker     // message acks
    73  	nackTicker *time.Ticker     // message nacks
    74  	pingTicker *time.Ticker     //  sends to the stream to keep it open
    75  	failed     chan struct{}    // closed on stream error
    76  	drained    chan struct{}    // closed when stopped && no more pending messages
    77  	wg         sync.WaitGroup
    78  
    79  	mu          sync.Mutex
    80  	ackTimeDist *distribution.D // dist uses seconds
    81  
    82  	// keepAliveDeadlines is a map of id to expiration time. This map is used in conjunction with
    83  	// subscription.ReceiveSettings.MaxExtension to record the maximum amount of time (the
    84  	// deadline, more specifically) we're willing to extend a message's ack deadline. As each
    85  	// message arrives, we'll record now+MaxExtension in this table; whenever we have a chance
    86  	// to update ack deadlines (via modack), we'll consult this table and only include IDs
    87  	// that are not beyond their deadline.
    88  	keepAliveDeadlines map[string]time.Time
    89  	pendingAcks        map[string]*AckResult
    90  	pendingNacks       map[string]*AckResult
    91  	// ack IDs whose ack deadline is to be modified
    92  	// ModAcks don't have AckResults but allows reuse of the SendModAck function.
    93  	pendingModAcks map[string]*AckResult
    94  	err            error // error from stream failure
    95  
    96  	eoMu                      sync.RWMutex
    97  	enableExactlyOnceDelivery bool
    98  	sendNewAckDeadline        bool
    99  
   100  	orderingMu sync.RWMutex
   101  	// enableOrdering determines if messages should be processed in order. This is populated
   102  	// by the response in StreamingPull and can change mid Receive. Must be accessed
   103  	// with the lock held.
   104  	enableOrdering bool
   105  }
   106  
   107  // newMessageIterator starts and returns a new messageIterator.
   108  // subName is the full name of the subscription to pull messages from.
   109  // Stop must be called on the messageIterator when it is no longer needed.
   110  // The iterator always uses the background context for acking messages and extending message deadlines.
   111  func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOptions) *messageIterator {
   112  	var ps *pullStream
   113  	if !po.synchronous {
   114  		maxMessages := po.maxOutstandingMessages
   115  		maxBytes := po.maxOutstandingBytes
   116  		if po.useLegacyFlowControl {
   117  			maxMessages = 0
   118  			maxBytes = 0
   119  		}
   120  		ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes, po.maxExtensionPeriod)
   121  	}
   122  	// The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
   123  	// the first keepAlive halfway towards the minimum ack deadline.
   124  	keepAlivePeriod := minDurationPerLeaseExtension / 2
   125  
   126  	// Ack promptly so users don't lose work if client crashes.
   127  	ackTicker := time.NewTicker(100 * time.Millisecond)
   128  	nackTicker := time.NewTicker(100 * time.Millisecond)
   129  	pingTicker := time.NewTicker(30 * time.Second)
   130  	cctx, cancel := context.WithCancel(context.Background())
   131  	cctx = withSubscriptionKey(cctx, subName)
   132  	it := &messageIterator{
   133  		ctx:                cctx,
   134  		cancel:             cancel,
   135  		ps:                 ps,
   136  		po:                 po,
   137  		subc:               subc,
   138  		subName:            subName,
   139  		kaTick:             time.After(keepAlivePeriod),
   140  		ackTicker:          ackTicker,
   141  		nackTicker:         nackTicker,
   142  		pingTicker:         pingTicker,
   143  		failed:             make(chan struct{}),
   144  		drained:            make(chan struct{}),
   145  		ackTimeDist:        distribution.New(int(maxDurationPerLeaseExtension/time.Second) + 1),
   146  		keepAliveDeadlines: map[string]time.Time{},
   147  		pendingAcks:        map[string]*AckResult{},
   148  		pendingNacks:       map[string]*AckResult{},
   149  		pendingModAcks:     map[string]*AckResult{},
   150  	}
   151  	it.wg.Add(1)
   152  	go it.sender()
   153  	return it
   154  }
   155  
   156  // Subscription.receive will call stop on its messageIterator when finished with it.
   157  // Stop will block until Done has been called on all Messages that have been
   158  // returned by Next, or until the context with which the messageIterator was created
   159  // is cancelled or exceeds its deadline.
   160  func (it *messageIterator) stop() {
   161  	it.cancel()
   162  	it.mu.Lock()
   163  	it.checkDrained()
   164  	it.mu.Unlock()
   165  	it.wg.Wait()
   166  }
   167  
   168  // checkDrained closes the drained channel if the iterator has been stopped and all
   169  // pending messages have either been n/acked or expired.
   170  //
   171  // Called with the lock held.
   172  func (it *messageIterator) checkDrained() {
   173  	select {
   174  	case <-it.drained:
   175  		return
   176  	default:
   177  	}
   178  	select {
   179  	case <-it.ctx.Done():
   180  		if len(it.keepAliveDeadlines) == 0 {
   181  			close(it.drained)
   182  		}
   183  	default:
   184  	}
   185  }
   186  
   187  // Given a receiveTime, add the elapsed time to the iterator's ack distribution.
   188  // These values are bounded by the ModifyAckDeadline limits, which are
   189  // min/maxDurationPerLeaseExtension.
   190  func (it *messageIterator) addToDistribution(receiveTime time.Time) {
   191  	d := time.Since(receiveTime)
   192  	d = maxDuration(d, minDurationPerLeaseExtension)
   193  	d = minDuration(d, maxDurationPerLeaseExtension)
   194  	it.ackTimeDist.Record(int(d / time.Second))
   195  }
   196  
   197  // Called when a message is acked/nacked.
   198  func (it *messageIterator) done(ackID string, ack bool, r *AckResult, receiveTime time.Time) {
   199  	it.addToDistribution(receiveTime)
   200  	it.mu.Lock()
   201  	defer it.mu.Unlock()
   202  	delete(it.keepAliveDeadlines, ackID)
   203  	if ack {
   204  		it.pendingAcks[ackID] = r
   205  	} else {
   206  		it.pendingNacks[ackID] = r
   207  	}
   208  	it.checkDrained()
   209  }
   210  
   211  // fail is called when a stream method returns a permanent error.
   212  // fail returns it.err. This may be err, or it may be the error
   213  // set by an earlier call to fail.
   214  func (it *messageIterator) fail(err error) error {
   215  	it.mu.Lock()
   216  	defer it.mu.Unlock()
   217  	if it.err == nil {
   218  		it.err = err
   219  		close(it.failed)
   220  	}
   221  	return it.err
   222  }
   223  
   224  // receive makes a call to the stream's Recv method, or the Pull RPC, and returns
   225  // its messages.
   226  // maxToPull is the maximum number of messages for the Pull RPC.
   227  func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
   228  	it.mu.Lock()
   229  	ierr := it.err
   230  	it.mu.Unlock()
   231  	if ierr != nil {
   232  		return nil, ierr
   233  	}
   234  
   235  	// Stop retrieving messages if the iterator's Stop method was called.
   236  	select {
   237  	case <-it.ctx.Done():
   238  		it.wg.Wait()
   239  		return nil, io.EOF
   240  	default:
   241  	}
   242  
   243  	var rmsgs []*pb.ReceivedMessage
   244  	var err error
   245  	if it.po.synchronous {
   246  		rmsgs, err = it.pullMessages(maxToPull)
   247  	} else {
   248  		rmsgs, err = it.recvMessages()
   249  	}
   250  	// Any error here is fatal.
   251  	if err != nil {
   252  		return nil, it.fail(err)
   253  	}
   254  	recordStat(it.ctx, PullCount, int64(len(rmsgs)))
   255  	now := time.Now()
   256  	msgs, err := convertMessages(rmsgs, now, it.done)
   257  	if err != nil {
   258  		return nil, it.fail(err)
   259  	}
   260  	// We received some messages. Remember them so we can keep them alive. Also,
   261  	// do a receipt mod-ack when streaming.
   262  	maxExt := time.Now().Add(it.po.maxExtension)
   263  	ackIDs := map[string]*AckResult{}
   264  	it.eoMu.RLock()
   265  	exactlyOnceDelivery := it.enableExactlyOnceDelivery
   266  	it.eoMu.RUnlock()
   267  	it.mu.Lock()
   268  
   269  	// pendingMessages maps ackID -> message, and is used
   270  	// only when exactly once delivery is enabled.
   271  	// At first, all messages are pending, and they
   272  	// are removed if the modack call fails. All other
   273  	// messages are returned to the client for processing.
   274  	pendingMessages := make(map[string]*ipubsub.Message)
   275  	for _, m := range msgs {
   276  		ackID := msgAckID(m)
   277  		addRecv(m.ID, ackID, now)
   278  		it.keepAliveDeadlines[ackID] = maxExt
   279  		// Don't change the mod-ack if the message is going to be nacked. This is
   280  		// possible if there are retries.
   281  		if _, ok := it.pendingNacks[ackID]; !ok {
   282  			// Don't use the message's AckResult here since these are only for receipt modacks.
   283  			// modack results are transparent to the user so these can automatically succeed unless
   284  			// exactly once is enabled.
   285  			// We can't use an empty AckResult here either since SetAckResult will try to
   286  			// close the channel without checking if it exists.
   287  			if !exactlyOnceDelivery {
   288  				ackIDs[ackID] = newSuccessAckResult()
   289  			} else {
   290  				ackIDs[ackID] = ipubsub.NewAckResult()
   291  				pendingMessages[ackID] = m
   292  			}
   293  		}
   294  	}
   295  	deadline := it.ackDeadline()
   296  	it.mu.Unlock()
   297  
   298  	if len(ackIDs) > 0 {
   299  		// When exactly once delivery is not enabled, modacks are fire and forget.
   300  		if !exactlyOnceDelivery {
   301  			go func() {
   302  				it.sendModAck(ackIDs, deadline, false)
   303  			}()
   304  			return msgs, nil
   305  		}
   306  
   307  		// If exactly once is enabled, we should wait until modack responses are successes
   308  		// before attempting to process messages.
   309  		it.sendModAck(ackIDs, deadline, false)
   310  		for ackID, ar := range ackIDs {
   311  			ctx := context.Background()
   312  			_, err := ar.Get(ctx)
   313  			if err != nil {
   314  				delete(pendingMessages, ackID)
   315  				it.mu.Lock()
   316  				// Remove the message from lease management if modack fails here.
   317  				delete(it.keepAliveDeadlines, ackID)
   318  				it.mu.Unlock()
   319  			}
   320  		}
   321  		// Only return for processing messages that were successfully modack'ed.
   322  		// Iterate over the original messages slice for ordering.
   323  		v := make([]*ipubsub.Message, 0, len(pendingMessages))
   324  		for _, m := range msgs {
   325  			ackID := msgAckID(m)
   326  			if _, ok := pendingMessages[ackID]; ok {
   327  				v = append(v, m)
   328  			}
   329  		}
   330  		return v, nil
   331  	}
   332  	return nil, nil
   333  }
   334  
   335  // Get messages using the Pull RPC.
   336  // This may block indefinitely. It may also return zero messages, after some time waiting.
   337  func (it *messageIterator) pullMessages(maxToPull int32) ([]*pb.ReceivedMessage, error) {
   338  	// Use it.ctx as the RPC context, so that if the iterator is stopped, the call
   339  	// will return immediately.
   340  	res, err := it.subc.Pull(it.ctx, &pb.PullRequest{
   341  		Subscription: it.subName,
   342  		MaxMessages:  maxToPull,
   343  	}, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
   344  	switch {
   345  	case err == context.Canceled:
   346  		return nil, nil
   347  	case status.Code(err) == codes.Canceled:
   348  		return nil, nil
   349  	case err != nil:
   350  		return nil, err
   351  	default:
   352  		return res.ReceivedMessages, nil
   353  	}
   354  }
   355  
   356  func (it *messageIterator) recvMessages() ([]*pb.ReceivedMessage, error) {
   357  	res, err := it.ps.Recv()
   358  	if err != nil {
   359  		return nil, err
   360  	}
   361  
   362  	// If the new exactly once settings are different than the current settings, update it.
   363  	it.eoMu.RLock()
   364  	enableEOD := it.enableExactlyOnceDelivery
   365  	it.eoMu.RUnlock()
   366  
   367  	subProp := res.GetSubscriptionProperties()
   368  	if got := subProp.GetExactlyOnceDeliveryEnabled(); got != enableEOD {
   369  		it.eoMu.Lock()
   370  		it.sendNewAckDeadline = true
   371  		it.enableExactlyOnceDelivery = got
   372  		it.eoMu.Unlock()
   373  	}
   374  
   375  	// Also update the subscriber's ordering setting if stale.
   376  	it.orderingMu.RLock()
   377  	enableOrdering := it.enableOrdering
   378  	it.orderingMu.RUnlock()
   379  
   380  	if got := subProp.GetMessageOrderingEnabled(); got != enableOrdering {
   381  		it.orderingMu.Lock()
   382  		it.enableOrdering = got
   383  		it.orderingMu.Unlock()
   384  	}
   385  	return res.ReceivedMessages, nil
   386  }
   387  
   388  // sender runs in a goroutine and handles all sends to the stream.
   389  func (it *messageIterator) sender() {
   390  	defer it.wg.Done()
   391  	defer it.ackTicker.Stop()
   392  	defer it.nackTicker.Stop()
   393  	defer it.pingTicker.Stop()
   394  	defer func() {
   395  		if it.ps != nil {
   396  			it.ps.CloseSend()
   397  		}
   398  	}()
   399  
   400  	done := false
   401  	for !done {
   402  		sendAcks := false
   403  		sendNacks := false
   404  		sendModAcks := false
   405  		sendPing := false
   406  
   407  		dl := it.ackDeadline()
   408  
   409  		select {
   410  		case <-it.failed:
   411  			// Stream failed: nothing to do, so stop immediately.
   412  			return
   413  
   414  		case <-it.drained:
   415  			// All outstanding messages have been marked done:
   416  			// nothing left to do except make the final calls.
   417  			it.mu.Lock()
   418  			sendAcks = (len(it.pendingAcks) > 0)
   419  			sendNacks = (len(it.pendingNacks) > 0)
   420  			// No point in sending modacks.
   421  			done = true
   422  
   423  		case <-it.kaTick:
   424  			it.mu.Lock()
   425  			it.handleKeepAlives()
   426  			sendModAcks = (len(it.pendingModAcks) > 0)
   427  
   428  			nextTick := dl - gracePeriod
   429  			if nextTick <= 0 {
   430  				// If the deadline is <= gracePeriod, let's tick again halfway to
   431  				// the deadline.
   432  				nextTick = dl / 2
   433  			}
   434  			it.kaTick = time.After(nextTick)
   435  
   436  		case <-it.nackTicker.C:
   437  			it.mu.Lock()
   438  			sendNacks = (len(it.pendingNacks) > 0)
   439  
   440  		case <-it.ackTicker.C:
   441  			it.mu.Lock()
   442  			sendAcks = (len(it.pendingAcks) > 0)
   443  
   444  		case <-it.pingTicker.C:
   445  			it.mu.Lock()
   446  			// Ping only if we are processing messages via streaming.
   447  			sendPing = !it.po.synchronous
   448  		}
   449  		// Lock is held here.
   450  		var acks, nacks, modAcks map[string]*AckResult
   451  		if sendAcks {
   452  			acks = it.pendingAcks
   453  			it.pendingAcks = map[string]*AckResult{}
   454  		}
   455  		if sendNacks {
   456  			nacks = it.pendingNacks
   457  			it.pendingNacks = map[string]*AckResult{}
   458  		}
   459  		if sendModAcks {
   460  			modAcks = it.pendingModAcks
   461  			it.pendingModAcks = map[string]*AckResult{}
   462  		}
   463  		it.mu.Unlock()
   464  		// Make Ack and ModAck RPCs.
   465  		if sendAcks {
   466  			it.sendAck(acks)
   467  		}
   468  		if sendNacks {
   469  			// Nack indicated by modifying the deadline to zero.
   470  			it.sendModAck(nacks, 0, false)
   471  		}
   472  		if sendModAcks {
   473  			it.sendModAck(modAcks, dl, true)
   474  		}
   475  		if sendPing {
   476  			it.pingStream()
   477  		}
   478  	}
   479  }
   480  
   481  // handleKeepAlives modifies the pending request to include deadline extensions
   482  // for live messages. It also purges expired messages.
   483  //
   484  // Called with the lock held.
   485  func (it *messageIterator) handleKeepAlives() {
   486  	now := time.Now()
   487  	for id, expiry := range it.keepAliveDeadlines {
   488  		if expiry.Before(now) {
   489  			// This delete will not result in skipping any map items, as implied by
   490  			// the spec at https://golang.org/ref/spec#For_statements, "For
   491  			// statements with range clause", note 3, and stated explicitly at
   492  			// https://groups.google.com/forum/#!msg/golang-nuts/UciASUb03Js/pzSq5iVFAQAJ.
   493  			delete(it.keepAliveDeadlines, id)
   494  		} else {
   495  			// Use a success AckResult since we don't propagate ModAcks back to the user.
   496  			it.pendingModAcks[id] = newSuccessAckResult()
   497  		}
   498  	}
   499  	it.checkDrained()
   500  }
   501  
   502  // sendAck is used to confirm acknowledgement of a message. If exactly once delivery is
   503  // enabled, we'll retry these messages for a short duration in a goroutine.
   504  func (it *messageIterator) sendAck(m map[string]*AckResult) {
   505  	ackIDs := make([]string, 0, len(m))
   506  	for k := range m {
   507  		ackIDs = append(ackIDs, k)
   508  	}
   509  	it.eoMu.RLock()
   510  	exactlyOnceDelivery := it.enableExactlyOnceDelivery
   511  	it.eoMu.RUnlock()
   512  
   513  	var toSend []string
   514  	for len(ackIDs) > 0 {
   515  		toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize)
   516  
   517  		recordStat(it.ctx, AckCount, int64(len(toSend)))
   518  		addAcks(toSend)
   519  		// Use context.Background() as the call's context, not it.ctx. We don't
   520  		// want to cancel this RPC when the iterator is stopped.
   521  		cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
   522  		defer cancel2()
   523  		err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{
   524  			Subscription: it.subName,
   525  			AckIds:       toSend,
   526  		})
   527  		if exactlyOnceDelivery {
   528  			resultsByAckID := make(map[string]*AckResult)
   529  			for _, ackID := range toSend {
   530  				resultsByAckID[ackID] = m[ackID]
   531  			}
   532  			st, md := extractMetadata(err)
   533  			_, toRetry := processResults(st, resultsByAckID, md)
   534  			if len(toRetry) > 0 {
   535  				// Retry acks in a separate goroutine.
   536  				go func() {
   537  					it.retryAcks(toRetry)
   538  				}()
   539  			}
   540  		}
   541  	}
   542  }
   543  
   544  // sendModAck is used to extend the lease of messages or nack them.
   545  // The receipt mod-ack amount is derived from a percentile distribution based
   546  // on the time it takes to process messages. The percentile chosen is the 99%th
   547  // percentile in order to capture the highest amount of time necessary without
   548  // considering 1% outliers. If the ModAck RPC fails and exactly once delivery is
   549  // enabled, we retry it in a separate goroutine for a short duration.
   550  func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) {
   551  	deadlineSec := int32(deadline / time.Second)
   552  	ackIDs := make([]string, 0, len(m))
   553  	for k := range m {
   554  		ackIDs = append(ackIDs, k)
   555  	}
   556  	it.eoMu.RLock()
   557  	exactlyOnceDelivery := it.enableExactlyOnceDelivery
   558  	it.eoMu.RUnlock()
   559  	var toSend []string
   560  	for len(ackIDs) > 0 {
   561  		toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize)
   562  		if deadline == 0 {
   563  			recordStat(it.ctx, NackCount, int64(len(toSend)))
   564  		} else {
   565  			recordStat(it.ctx, ModAckCount, int64(len(toSend)))
   566  		}
   567  		addModAcks(toSend, deadlineSec)
   568  		// Use context.Background() as the call's context, not it.ctx. We don't
   569  		// want to cancel this RPC when the iterator is stopped.
   570  		cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
   571  		defer cancel2()
   572  		err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{
   573  			Subscription:       it.subName,
   574  			AckDeadlineSeconds: deadlineSec,
   575  			AckIds:             toSend,
   576  		})
   577  		if exactlyOnceDelivery {
   578  			resultsByAckID := make(map[string]*AckResult)
   579  			for _, ackID := range toSend {
   580  				resultsByAckID[ackID] = m[ackID]
   581  			}
   582  
   583  			st, md := extractMetadata(err)
   584  			_, toRetry := processResults(st, resultsByAckID, md)
   585  			if len(toRetry) > 0 {
   586  				// Retry modacks/nacks in a separate goroutine.
   587  				go func() {
   588  					it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
   589  				}()
   590  			}
   591  		}
   592  	}
   593  }
   594  
   595  // retryAcks retries the ack RPC with backoff. This must be called in a goroutine
   596  // in it.sendAck(), with a max of 2500 ackIDs.
   597  func (it *messageIterator) retryAcks(m map[string]*AckResult) {
   598  	ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline)
   599  	defer cancel()
   600  	bo := newExactlyOnceBackoff()
   601  	for {
   602  		if ctx.Err() != nil {
   603  			for _, r := range m {
   604  				ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
   605  			}
   606  			return
   607  		}
   608  		// Don't need to split map since this is the retry function and
   609  		// there is already a max of 2500 ackIDs here.
   610  		ackIDs := make([]string, 0, len(m))
   611  		for k := range m {
   612  			ackIDs = append(ackIDs, k)
   613  		}
   614  		cctx2, cancel2 := context.WithTimeout(ctx, 60*time.Second)
   615  		defer cancel2()
   616  		err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{
   617  			Subscription: it.subName,
   618  			AckIds:       ackIDs,
   619  		})
   620  		st, md := extractMetadata(err)
   621  		_, toRetry := processResults(st, m, md)
   622  		if len(toRetry) == 0 {
   623  			return
   624  		}
   625  		time.Sleep(bo.Pause())
   626  		m = toRetry
   627  	}
   628  }
   629  
   630  // retryModAcks retries the modack RPC with backoff. This must be called in a goroutine
   631  // in it.sendModAck(), with a max of 2500 ackIDs. Modacks are retried up to 3 times
   632  // since after that, the message will have expired. Nacks are retried up until the default
   633  // deadline of 10 minutes.
   634  func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32, logOnInvalid bool) {
   635  	bo := newExactlyOnceBackoff()
   636  	retryCount := 0
   637  	ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline)
   638  	defer cancel()
   639  	for {
   640  		// If context is done, complete all AckResults with errors.
   641  		if ctx.Err() != nil {
   642  			for _, r := range m {
   643  				ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
   644  			}
   645  			return
   646  		}
   647  		// Only retry modack requests up to 3 times.
   648  		if deadlineSec != 0 && retryCount > 3 {
   649  			ackIDs := make([]string, 0, len(m))
   650  			for k, ar := range m {
   651  				ackIDs = append(ackIDs, k)
   652  				ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed"))
   653  			}
   654  			if logOnInvalid {
   655  				log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs)
   656  			}
   657  			return
   658  		}
   659  		// Don't need to split map since this is the retry function and
   660  		// there is already a max of 2500 ackIDs here.
   661  		ackIDs := make([]string, 0, len(m))
   662  		for k := range m {
   663  			ackIDs = append(ackIDs, k)
   664  		}
   665  		cctx2, cancel2 := context.WithTimeout(ctx, 60*time.Second)
   666  		defer cancel2()
   667  		err := it.subc.ModifyAckDeadline(cctx2, &pb.ModifyAckDeadlineRequest{
   668  			Subscription:       it.subName,
   669  			AckIds:             ackIDs,
   670  			AckDeadlineSeconds: deadlineSec,
   671  		})
   672  		st, md := extractMetadata(err)
   673  		_, toRetry := processResults(st, m, md)
   674  		if len(toRetry) == 0 {
   675  			return
   676  		}
   677  		time.Sleep(bo.Pause())
   678  		m = toRetry
   679  		retryCount++
   680  	}
   681  }
   682  
   683  // Send a message to the stream to keep it open. The stream will close if there's no
   684  // traffic on it for a while. By keeping it open, we delay the start of the
   685  // expiration timer on messages that are buffered by gRPC or elsewhere in the
   686  // network. This matters if it takes a long time to process messages relative to the
   687  // default ack deadline, and if the messages are small enough so that many can fit
   688  // into the buffer.
   689  func (it *messageIterator) pingStream() {
   690  	spr := &pb.StreamingPullRequest{}
   691  	it.eoMu.RLock()
   692  	if it.sendNewAckDeadline {
   693  		spr.StreamAckDeadlineSeconds = int32(it.ackDeadline())
   694  		it.sendNewAckDeadline = false
   695  	}
   696  	it.eoMu.RUnlock()
   697  	it.ps.Send(spr)
   698  }
   699  
   700  // calcFieldSizeString returns the number of bytes string fields
   701  // will take up in an encoded proto message.
   702  func calcFieldSizeString(fields ...string) int {
   703  	overhead := 0
   704  	for _, field := range fields {
   705  		overhead += 1 + len(field) + protowire.SizeVarint(uint64(len(field)))
   706  	}
   707  	return overhead
   708  }
   709  
   710  // calcFieldSizeInt returns the number of bytes int fields
   711  // will take up in an encoded proto message.
   712  func calcFieldSizeInt(fields ...int) int {
   713  	overhead := 0
   714  	for _, field := range fields {
   715  		overhead += 1 + protowire.SizeVarint(uint64(field))
   716  	}
   717  	return overhead
   718  }
   719  
   720  // splitRequestIDs takes a slice of ackIDs and returns two slices such that the first
   721  // ackID slice can be used in a request where the payload does not exceed ackIDBatchSize.
   722  func splitRequestIDs(ids []string, maxBatchSize int) (prefix, remainder []string) {
   723  	if len(ids) < maxBatchSize {
   724  		return ids, []string{}
   725  	}
   726  	return ids[:maxBatchSize], ids[maxBatchSize:]
   727  }
   728  
   729  // The deadline to ack is derived from a percentile distribution based
   730  // on the time it takes to process messages. The percentile chosen is the 99%th
   731  // percentile - that is, processing times up to the 99%th longest processing
   732  // times should be safe. The highest 1% may expire. This number was chosen
   733  // as a way to cover most users' usecases without losing the value of
   734  // expiration.
   735  func (it *messageIterator) ackDeadline() time.Duration {
   736  	pt := time.Duration(it.ackTimeDist.Percentile(.99)) * time.Second
   737  	it.eoMu.RLock()
   738  	enableExactlyOnce := it.enableExactlyOnceDelivery
   739  	it.eoMu.RUnlock()
   740  	return boundedDuration(pt, it.po.minExtensionPeriod, it.po.maxExtensionPeriod, enableExactlyOnce)
   741  }
   742  
   743  func boundedDuration(ackDeadline, minExtension, maxExtension time.Duration, exactlyOnce bool) time.Duration {
   744  	// If the user explicitly sets a maxExtensionPeriod, respect it.
   745  	if maxExtension > 0 {
   746  		ackDeadline = minDuration(ackDeadline, maxExtension)
   747  	}
   748  
   749  	// If the user explicitly sets a minExtensionPeriod, respect it.
   750  	if minExtension > 0 {
   751  		ackDeadline = maxDuration(ackDeadline, minExtension)
   752  	} else if exactlyOnce {
   753  		// Higher minimum ack_deadline for subscriptions with
   754  		// exactly-once delivery enabled.
   755  		ackDeadline = maxDuration(ackDeadline, minDurationPerLeaseExtensionExactlyOnce)
   756  	} else if ackDeadline < minDurationPerLeaseExtension {
   757  		// Otherwise, lower bound is min ack extension. This is normally bounded
   758  		// when adding datapoints to the distribution, but this is needed for
   759  		// the initial few calls to ackDeadline.
   760  		ackDeadline = minDurationPerLeaseExtension
   761  	}
   762  
   763  	return ackDeadline
   764  }
   765  
   766  func minDuration(x, y time.Duration) time.Duration {
   767  	if x < y {
   768  		return x
   769  	}
   770  	return y
   771  }
   772  
   773  func maxDuration(x, y time.Duration) time.Duration {
   774  	if x > y {
   775  		return x
   776  	}
   777  	return y
   778  }
   779  
   780  const (
   781  	transientErrStringPrefix     = "TRANSIENT_"
   782  	permanentInvalidAckErrString = "PERMANENT_FAILURE_INVALID_ACK_ID"
   783  )
   784  
   785  // extracts information from an API error for exactly once delivery's ack/modack err responses.
   786  func extractMetadata(err error) (*status.Status, map[string]string) {
   787  	apiErr, ok := apierror.FromError(err)
   788  	if ok {
   789  		return apiErr.GRPCStatus(), apiErr.Metadata()
   790  	}
   791  	return nil, nil
   792  }
   793  
   794  // processResults processes AckResults by referring to errorStatus and errorsByAckID.
   795  // The errors returned by the server in `errorStatus` or in `errorsByAckID`
   796  // are used to complete the AckResults in `ackResMap` (with a success
   797  // or error) or to return requests for further retries.
   798  // This function returns two maps of ackID to ack results, one for completed results and the other for ones to retry.
   799  // Logic is derived from python-pubsub: https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py#L161-L220
   800  func processResults(errorStatus *status.Status, ackResMap map[string]*AckResult, errorsByAckID map[string]string) (map[string]*AckResult, map[string]*AckResult) {
   801  	completedResults := make(map[string]*AckResult)
   802  	retryResults := make(map[string]*AckResult)
   803  	for ackID, ar := range ackResMap {
   804  		// Handle special errors returned for ack/modack RPCs via the ErrorInfo
   805  		// sidecar metadata when exactly-once delivery is enabled.
   806  		if errAckID, ok := errorsByAckID[ackID]; ok {
   807  			if strings.HasPrefix(errAckID, transientErrStringPrefix) {
   808  				retryResults[ackID] = ar
   809  			} else {
   810  				if errAckID == permanentInvalidAckErrString {
   811  					ipubsub.SetAckResult(ar, AcknowledgeStatusInvalidAckID, errors.New(errAckID))
   812  				} else {
   813  					ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New(errAckID))
   814  				}
   815  				completedResults[ackID] = ar
   816  			}
   817  		} else if errorStatus != nil && contains(errorStatus.Code(), exactlyOnceDeliveryTemporaryRetryErrors) {
   818  			retryResults[ackID] = ar
   819  		} else if errorStatus != nil {
   820  			// Other gRPC errors are not retried.
   821  			switch errorStatus.Code() {
   822  			case codes.PermissionDenied:
   823  				ipubsub.SetAckResult(ar, AcknowledgeStatusPermissionDenied, errorStatus.Err())
   824  			case codes.FailedPrecondition:
   825  				ipubsub.SetAckResult(ar, AcknowledgeStatusFailedPrecondition, errorStatus.Err())
   826  			default:
   827  				ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errorStatus.Err())
   828  			}
   829  			completedResults[ackID] = ar
   830  		} else if ar != nil {
   831  			// Since no error occurred, requests with AckResults are completed successfully.
   832  			ipubsub.SetAckResult(ar, AcknowledgeStatusSuccess, nil)
   833  			completedResults[ackID] = ar
   834  		} else {
   835  			// All other requests are considered completed.
   836  			completedResults[ackID] = ar
   837  		}
   838  	}
   839  	return completedResults, retryResults
   840  }
   841  

View as plain text