...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/retry.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"errors"
    19  	"io"
    20  	"math/rand"
    21  	"strings"
    22  	"sync"
    23  	"time"
    24  
    25  	"github.com/googleapis/gax-go/v2"
    26  	"google.golang.org/grpc/codes"
    27  	"google.golang.org/grpc/status"
    28  )
    29  
    30  var (
    31  	defaultRetryAttempts = 4
    32  )
    33  
    34  // This retry predicate is used for higher level retries, enqueing appends onto to a bidi
    35  // channel and evaluating whether an append should be retried (re-enqueued).
    36  func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) {
    37  	if err == nil {
    38  		return
    39  	}
    40  
    41  	s, ok := status.FromError(err)
    42  	// non-status based error conditions.
    43  	if !ok {
    44  		// EOF can happen in the case of connection close.
    45  		if errors.Is(err, io.EOF) {
    46  			shouldRetry = true
    47  			return
    48  		}
    49  		// All other non-status errors are treated as non-retryable (including context errors).
    50  		return
    51  	}
    52  	switch s.Code() {
    53  	case codes.Aborted,
    54  		codes.Canceled,
    55  		codes.DeadlineExceeded,
    56  		codes.FailedPrecondition,
    57  		codes.Internal,
    58  		codes.Unavailable:
    59  		shouldRetry = true
    60  		return
    61  	case codes.ResourceExhausted:
    62  		if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") {
    63  			// Note: internal b/246031522 opened to give this a structured error
    64  			// and avoid string parsing.  Should be a QuotaFailure or similar.
    65  			shouldRetry = true
    66  			return
    67  		}
    68  	}
    69  	return
    70  }
    71  
    72  // unaryRetryer is for retrying a unary-style operation, like (re)-opening the bidi connection.
    73  type unaryRetryer struct {
    74  	bo gax.Backoff
    75  }
    76  
    77  func (ur *unaryRetryer) Retry(err error) (time.Duration, bool) {
    78  	shouldRetry, _ := retryPredicate(err)
    79  	return ur.bo.Pause(), shouldRetry
    80  }
    81  
    82  // statelessRetryer is used for backing off within a continuous process, like processing the responses
    83  // from the receive side of the bidi stream.  An individual item in that process has a notion of an attempt
    84  // count, and we use maximum retries as a way of evicting bad items.
    85  type statelessRetryer struct {
    86  	mu sync.Mutex // guards r
    87  	r  *rand.Rand
    88  
    89  	minBackoff       time.Duration
    90  	jitter           time.Duration
    91  	aggressiveFactor int
    92  	maxAttempts      int
    93  }
    94  
    95  func newStatelessRetryer() *statelessRetryer {
    96  	return &statelessRetryer{
    97  		r:           rand.New(rand.NewSource(time.Now().UnixNano())),
    98  		minBackoff:  50 * time.Millisecond,
    99  		jitter:      time.Second,
   100  		maxAttempts: defaultRetryAttempts,
   101  	}
   102  }
   103  
   104  func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration {
   105  	jitter := sr.jitter.Nanoseconds()
   106  	if jitter > 0 {
   107  		sr.mu.Lock()
   108  		jitter = sr.r.Int63n(jitter)
   109  		sr.mu.Unlock()
   110  	}
   111  	pause := sr.minBackoff.Nanoseconds() + jitter
   112  	if aggressiveBackoff {
   113  		pause = pause * int64(sr.aggressiveFactor)
   114  	}
   115  	return time.Duration(pause)
   116  }
   117  
   118  func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, bool) {
   119  	if attemptCount >= sr.maxAttempts {
   120  		return 0, false
   121  	}
   122  	shouldRetry, aggressive := retryPredicate(err)
   123  	if shouldRetry {
   124  		return sr.pause(aggressive), true
   125  	}
   126  	return 0, false
   127  }
   128  
   129  // shouldReconnect is akin to a retry predicate, in that it evaluates whether we should force
   130  // our bidi stream to close/reopen based on the responses error.  Errors here signal that no
   131  // further appends will succeed.
   132  func shouldReconnect(err error) bool {
   133  
   134  	// io.EOF is the typical not connected signal.
   135  	if errors.Is(err, io.EOF) {
   136  		return true
   137  	}
   138  	// Backend responses that trigger reconnection on send.
   139  	reconnectCodes := []codes.Code{
   140  		codes.Aborted,
   141  		codes.Canceled,
   142  		codes.Unavailable,
   143  		codes.DeadlineExceeded,
   144  	}
   145  	if s, ok := status.FromError(err); ok {
   146  		for _, c := range reconnectCodes {
   147  			if s.Code() == c {
   148  				return true
   149  			}
   150  		}
   151  	}
   152  	return false
   153  }
   154  

View as plain text