...

Source file src/cloud.google.com/go/pubsub/service.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2016 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"math"
    19  	"strings"
    20  	"time"
    21  
    22  	gax "github.com/googleapis/gax-go/v2"
    23  	"google.golang.org/grpc/codes"
    24  	"google.golang.org/grpc/status"
    25  )
    26  
    27  // maxPayload is the maximum number of bytes to devote to the
    28  // encoded AcknowledgementRequest / ModifyAckDeadline proto message.
    29  //
    30  // With gRPC there is no way for the client to know the server's max message size (it is
    31  // configurable on the server). We know from experience that it
    32  // it 512K.
    33  const (
    34  	maxPayload       = 512 * 1024
    35  	maxSendRecvBytes = 20 * 1024 * 1024 // 20M
    36  )
    37  
    38  func trunc32(i int64) int32 {
    39  	if i > math.MaxInt32 {
    40  		i = math.MaxInt32
    41  	}
    42  	return int32(i)
    43  }
    44  
    45  type defaultRetryer struct {
    46  	bo gax.Backoff
    47  }
    48  
    49  // Logic originally from
    50  // https://github.com/googleapis/java-pubsub/blob/main/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StatusUtil.java
    51  func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
    52  	s, ok := status.FromError(err)
    53  	if !ok { // includes io.EOF, normal stream close, which causes us to reopen
    54  		return r.bo.Pause(), true
    55  	}
    56  	switch s.Code() {
    57  	case codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Aborted:
    58  		return r.bo.Pause(), true
    59  	case codes.Unavailable:
    60  		c := strings.Contains(s.Message(), "Server shutdownNow invoked")
    61  		if !c {
    62  			return r.bo.Pause(), true
    63  		}
    64  		return 0, false
    65  	case codes.Unknown:
    66  		// Retry GOAWAY, see https://github.com/googleapis/google-cloud-go/issues/4257.
    67  		isGoaway := strings.Contains(s.Message(), "received prior goaway: code: NO_ERROR")
    68  		if isGoaway {
    69  			return r.bo.Pause(), true
    70  		}
    71  		return 0, false
    72  	default:
    73  		return 0, false
    74  	}
    75  }
    76  
    77  type streamingPullRetryer struct {
    78  	defaultRetryer gax.Retryer
    79  }
    80  
    81  // Does not retry ResourceExhausted. See: https://github.com/GoogleCloudPlatform/google-cloud-go/issues/1166#issuecomment-443744705
    82  func (r *streamingPullRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
    83  	s, ok := status.FromError(err)
    84  	if !ok { // call defaultRetryer so that its backoff can be used
    85  		return r.defaultRetryer.Retry(err)
    86  	}
    87  	switch s.Code() {
    88  	case codes.ResourceExhausted:
    89  		return 0, false
    90  	default:
    91  		return r.defaultRetryer.Retry(err)
    92  	}
    93  }
    94  
    95  type publishRetryer struct {
    96  	defaultRetryer gax.Retryer
    97  }
    98  
    99  func (r *publishRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
   100  	s, ok := status.FromError(err)
   101  	if !ok {
   102  		return r.defaultRetryer.Retry(err)
   103  	}
   104  	if s.Code() == codes.Internal && strings.Contains(s.Message(), "string field contains invalid UTF-8") {
   105  		return 0, false
   106  	}
   107  	return r.defaultRetryer.Retry(err)
   108  }
   109  
   110  var (
   111  	exactlyOnceDeliveryTemporaryRetryErrors = map[codes.Code]struct{}{
   112  		codes.DeadlineExceeded:  {},
   113  		codes.ResourceExhausted: {},
   114  		codes.Aborted:           {},
   115  		codes.Internal:          {},
   116  		codes.Unavailable:       {},
   117  	}
   118  )
   119  
   120  // contains checks if grpc code v is in t, a set of retryable error codes.
   121  func contains(v codes.Code, t map[codes.Code]struct{}) bool {
   122  	_, ok := t[v]
   123  	return ok
   124  }
   125  
   126  func newExactlyOnceBackoff() gax.Backoff {
   127  	return gax.Backoff{
   128  		Initial:    1 * time.Second,
   129  		Max:        64 * time.Second,
   130  		Multiplier: 2,
   131  	}
   132  }
   133  

View as plain text