...
1
16
17 package rest
18
19 import (
20 "net/url"
21 "time"
22
23 "k8s.io/apimachinery/pkg/util/sets"
24 "k8s.io/client-go/util/flowcontrol"
25 "k8s.io/klog/v2"
26 )
27
28
29
30
31
32 var serverIsOverloadedSet = sets.NewInt(429)
33 var maxResponseCode = 499
34
35 type BackoffManager interface {
36 UpdateBackoff(actualUrl *url.URL, err error, responseCode int)
37 CalculateBackoff(actualUrl *url.URL) time.Duration
38 Sleep(d time.Duration)
39 }
40
41
42
43 type URLBackoff struct {
44
45 Backoff *flowcontrol.Backoff
46 }
47
48
49 type NoBackoff struct {
50 }
51
52 func (n *NoBackoff) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
53
54 }
55
56 func (n *NoBackoff) CalculateBackoff(actualUrl *url.URL) time.Duration {
57 return 0 * time.Second
58 }
59
60 func (n *NoBackoff) Sleep(d time.Duration) {
61 time.Sleep(d)
62 }
63
64
65
66 func (b *URLBackoff) Disable() {
67 klog.V(4).Infof("Disabling backoff strategy")
68 b.Backoff = flowcontrol.NewBackOff(0*time.Second, 0*time.Second)
69 }
70
71
72
73 func (b *URLBackoff) baseUrlKey(rawurl *url.URL) string {
74
75
76
77 host, err := url.Parse(rawurl.String())
78 if err != nil {
79 klog.V(4).Infof("Error extracting url: %v", rawurl)
80 panic("bad url!")
81 }
82 return host.Host
83 }
84
85
86 func (b *URLBackoff) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
87
88 if responseCode > maxResponseCode || serverIsOverloadedSet.Has(responseCode) {
89 b.Backoff.Next(b.baseUrlKey(actualUrl), b.Backoff.Clock.Now())
90 return
91 } else if responseCode >= 300 || err != nil {
92 klog.V(4).Infof("Client is returning errors: code %v, error %v", responseCode, err)
93 }
94
95
96 b.Backoff.Reset(b.baseUrlKey(actualUrl))
97 }
98
99
100
101 func (b *URLBackoff) CalculateBackoff(actualUrl *url.URL) time.Duration {
102 return b.Backoff.Get(b.baseUrlKey(actualUrl))
103 }
104
105 func (b *URLBackoff) Sleep(d time.Duration) {
106 b.Backoff.Clock.Sleep(d)
107 }
108
View as plain text