...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package worker
18
19 import (
20 "context"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "io/ioutil"
25 "log"
26 "net"
27 "net/http"
28 "net/url"
29 "os"
30 "time"
31
32 "github.com/sassoftware/relic/internal/workerrpc"
33 )
34
35 const (
36 initialDelay = 1 * time.Second
37 scaleFactor = 2.718
38 maxDelay = 30 * time.Second
39 )
40
41 func (t *WorkerToken) doRetry(req *http.Request) (rresp *workerrpc.Response, err error) {
42 retries := t.tconf.Retries
43 if retries == 0 {
44 retries = defaultRetries
45 }
46 timeout := time.Duration(t.tconf.Timeout) * time.Second
47 if timeout == 0 {
48 timeout = defaultTimeout
49 }
50 baseCtx := req.Context()
51 delay := float32(initialDelay)
52 var last error
53 for i := 0; i < retries; i++ {
54 if i != 0 {
55 log.Printf("token error (attempt %d of %d): %s", i, retries, last)
56
57 ctx, cancel := context.WithTimeout(baseCtx, time.Duration(delay))
58 <-ctx.Done()
59 cancel()
60 if baseCtx.Err() != nil {
61
62 return nil, baseCtx.Err()
63 }
64 delay *= scaleFactor
65 if delay > float32(maxDelay) {
66 delay = float32(maxDelay)
67 }
68 }
69 if req.GetBody != nil {
70 req.Body, err = req.GetBody()
71 if err != nil {
72 return nil, err
73 }
74 }
75 ctx, cancel := context.WithTimeout(baseCtx, timeout)
76 defer cancel()
77 resp, err := http.DefaultClient.Do(req.WithContext(ctx))
78 if err != nil {
79 if baseCtx.Err() != nil {
80
81 return nil, baseCtx.Err()
82 }
83
84 if !errIsTemporary(err) {
85 return nil, err
86 }
87 last = err
88 } else if resp.StatusCode == http.StatusOK {
89
90 blob, err := ioutil.ReadAll(resp.Body)
91 resp.Body.Close()
92 if err != nil {
93 return nil, err
94 }
95 rresp = new(workerrpc.Response)
96 if err := json.Unmarshal(blob, rresp); err != nil {
97 return nil, err
98 }
99 if rresp.Err == "" {
100
101 return rresp, nil
102 }
103 last = errors.New(rresp.Err)
104 if !rresp.Retryable {
105 break
106 }
107 } else {
108
109 body, _ := ioutil.ReadAll(resp.Body)
110 resp.Body.Close()
111 last = fmt.Errorf("HTTP request failed: %s\nRequest: %s %s\n\n%s", resp.Status, req.Method, req.URL, string(body))
112 if !statusIsTemporary(resp.StatusCode) {
113 break
114 }
115 }
116 }
117 return nil, last
118 }
119
120 type temporary interface {
121 Temporary() bool
122 }
123
124 func errIsTemporary(err error) bool {
125 if err == context.DeadlineExceeded {
126 return true
127 }
128 if e, ok := err.(temporary); ok && e.Temporary() {
129 return true
130 }
131
132 if e, ok := err.(*url.Error); ok {
133 err = e.Err
134 }
135 if e, ok := err.(*net.OpError); ok {
136 err = e.Err
137 }
138
139 if _, ok := err.(*os.SyscallError); ok {
140 return true
141 }
142 return false
143 }
144
145 func statusIsTemporary(statusCode int) bool {
146 switch statusCode {
147 case http.StatusGatewayTimeout,
148 http.StatusBadGateway,
149 http.StatusServiceUnavailable,
150 http.StatusInsufficientStorage,
151 http.StatusInternalServerError:
152 return true
153 default:
154 return false
155 }
156 }
157
View as plain text