...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package httpproxy
16
17 import (
18 "math/rand"
19 "net/url"
20 "sync"
21 "time"
22
23 "go.uber.org/zap"
24 )
25
26
27
28 const defaultRefreshInterval = 30000 * time.Millisecond
29
30 var once sync.Once
31
32 func init() {
33 rand.Seed(time.Now().UnixNano())
34 }
35
36 func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
37 if lg == nil {
38 lg = zap.NewNop()
39 }
40 d := &director{
41 lg: lg,
42 uf: urlsFunc,
43 failureWait: failureWait,
44 }
45 d.refresh()
46 go func() {
47
48
49
50
51 for {
52 es := d.endpoints()
53 ri := refreshInterval
54 if ri >= defaultRefreshInterval {
55 if len(es) == 0 {
56 ri = time.Second
57 }
58 }
59 if len(es) > 0 {
60 once.Do(func() {
61 var sl []string
62 for _, e := range es {
63 sl = append(sl, e.URL.String())
64 }
65 lg.Info("endpoints found", zap.Strings("endpoints", sl))
66 })
67 }
68 time.Sleep(ri)
69 d.refresh()
70 }
71 }()
72 return d
73 }
74
75 type director struct {
76 sync.Mutex
77 lg *zap.Logger
78 ep []*endpoint
79 uf GetProxyURLs
80 failureWait time.Duration
81 }
82
83 func (d *director) refresh() {
84 urls := d.uf()
85 d.Lock()
86 defer d.Unlock()
87 var endpoints []*endpoint
88 for _, u := range urls {
89 uu, err := url.Parse(u)
90 if err != nil {
91 d.lg.Info("upstream URL invalid", zap.Error(err))
92 continue
93 }
94 endpoints = append(endpoints, newEndpoint(d.lg, *uu, d.failureWait))
95 }
96
97
98 for i := range endpoints {
99 j := rand.Intn(i + 1)
100 endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
101 }
102
103 d.ep = endpoints
104 }
105
106 func (d *director) endpoints() []*endpoint {
107 d.Lock()
108 defer d.Unlock()
109 filtered := make([]*endpoint, 0)
110 for _, ep := range d.ep {
111 if ep.Available {
112 filtered = append(filtered, ep)
113 }
114 }
115
116 return filtered
117 }
118
119 func newEndpoint(lg *zap.Logger, u url.URL, failureWait time.Duration) *endpoint {
120 ep := endpoint{
121 lg: lg,
122 URL: u,
123 Available: true,
124 failFunc: timedUnavailabilityFunc(failureWait),
125 }
126
127 return &ep
128 }
129
130 type endpoint struct {
131 sync.Mutex
132
133 lg *zap.Logger
134 URL url.URL
135 Available bool
136
137 failFunc func(ep *endpoint)
138 }
139
140 func (ep *endpoint) Failed() {
141 ep.Lock()
142 if !ep.Available {
143 ep.Unlock()
144 return
145 }
146
147 ep.Available = false
148 ep.Unlock()
149
150 if ep.lg != nil {
151 ep.lg.Info("marked endpoint unavailable", zap.String("endpoint", ep.URL.String()))
152 }
153
154 if ep.failFunc == nil {
155 if ep.lg != nil {
156 ep.lg.Info(
157 "no failFunc defined, endpoint will be unavailable forever",
158 zap.String("endpoint", ep.URL.String()),
159 )
160 }
161 return
162 }
163
164 ep.failFunc(ep)
165 }
166
167 func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) {
168 return func(ep *endpoint) {
169 time.AfterFunc(wait, func() {
170 ep.Available = true
171 if ep.lg != nil {
172 ep.lg.Info(
173 "marked endpoint available, to retest connectivity",
174 zap.String("endpoint", ep.URL.String()),
175 )
176 }
177 })
178 }
179 }
180
View as plain text