...

Source file src/github.com/go-kit/kit/sd/lb/retry.go

Documentation: github.com/go-kit/kit/sd/lb

     1  package lb
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strings"
     7  	"time"
     8  
     9  	"github.com/go-kit/kit/endpoint"
    10  )
    11  
    12  // RetryError is an error wrapper that is used by the retry mechanism. All
    13  // errors returned by the retry mechanism via its endpoint will be RetryErrors.
    14  type RetryError struct {
    15  	RawErrors []error // all errors encountered from endpoints directly
    16  	Final     error   // the final, terminating error
    17  }
    18  
    19  func (e RetryError) Error() string {
    20  	var suffix string
    21  	if len(e.RawErrors) > 1 {
    22  		a := make([]string, len(e.RawErrors)-1)
    23  		for i := 0; i < len(e.RawErrors)-1; i++ { // last one is Final
    24  			a[i] = e.RawErrors[i].Error()
    25  		}
    26  		suffix = fmt.Sprintf(" (previously: %s)", strings.Join(a, "; "))
    27  	}
    28  	return fmt.Sprintf("%v%s", e.Final, suffix)
    29  }
    30  
    31  // Callback is a function that is given the current attempt count and the error
    32  // received from the underlying endpoint. It should return whether the Retry
    33  // function should continue trying to get a working endpoint, and a custom error
    34  // if desired. The error message may be nil, but a true/false is always
    35  // expected. In all cases, if the replacement error is supplied, the received
    36  // error will be replaced in the calling context.
    37  type Callback func(n int, received error) (keepTrying bool, replacement error)
    38  
    39  // Retry wraps a service load balancer and returns an endpoint oriented load
    40  // balancer for the specified service method. Requests to the endpoint will be
    41  // automatically load balanced via the load balancer. Requests that return
    42  // errors will be retried until they succeed, up to max times, or until the
    43  // timeout is elapsed, whichever comes first.
    44  func Retry(max int, timeout time.Duration, b Balancer) endpoint.Endpoint {
    45  	return RetryWithCallback(timeout, b, maxRetries(max))
    46  }
    47  
    48  func maxRetries(max int) Callback {
    49  	return func(n int, err error) (keepTrying bool, replacement error) {
    50  		return n < max, nil
    51  	}
    52  }
    53  
    54  func alwaysRetry(int, error) (keepTrying bool, replacement error) {
    55  	return true, nil
    56  }
    57  
    58  // RetryWithCallback wraps a service load balancer and returns an endpoint
    59  // oriented load balancer for the specified service method. Requests to the
    60  // endpoint will be automatically load balanced via the load balancer. Requests
    61  // that return errors will be retried until they succeed, up to max times, until
    62  // the callback returns false, or until the timeout is elapsed, whichever comes
    63  // first.
    64  func RetryWithCallback(timeout time.Duration, b Balancer, cb Callback) endpoint.Endpoint {
    65  	if cb == nil {
    66  		cb = alwaysRetry
    67  	}
    68  	if b == nil {
    69  		panic("nil Balancer")
    70  	}
    71  
    72  	return func(ctx context.Context, request interface{}) (response interface{}, err error) {
    73  		var (
    74  			newctx, cancel = context.WithTimeout(ctx, timeout)
    75  			responses      = make(chan interface{}, 1)
    76  			errs           = make(chan error, 1)
    77  			final          RetryError
    78  		)
    79  		defer cancel()
    80  
    81  		for i := 1; ; i++ {
    82  			go func() {
    83  				e, err := b.Endpoint()
    84  				if err != nil {
    85  					errs <- err
    86  					return
    87  				}
    88  				response, err := e(newctx, request)
    89  				if err != nil {
    90  					errs <- err
    91  					return
    92  				}
    93  				responses <- response
    94  			}()
    95  
    96  			select {
    97  			case <-newctx.Done():
    98  				return nil, newctx.Err()
    99  
   100  			case response := <-responses:
   101  				return response, nil
   102  
   103  			case err := <-errs:
   104  				final.RawErrors = append(final.RawErrors, err)
   105  				keepTrying, replacement := cb(i, err)
   106  				if replacement != nil {
   107  					err = replacement
   108  				}
   109  				if !keepTrying {
   110  					final.Final = err
   111  					return nil, final
   112  				}
   113  				continue
   114  			}
   115  		}
   116  	}
   117  }
   118  

View as plain text