...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package tcpproxy
16
17 import (
18 "fmt"
19 "io"
20 "math/rand"
21 "net"
22 "sync"
23 "time"
24
25 "go.uber.org/zap"
26 )
27
28 type remote struct {
29 mu sync.Mutex
30 srv *net.SRV
31 addr string
32 inactive bool
33 }
34
35 func (r *remote) inactivate() {
36 r.mu.Lock()
37 defer r.mu.Unlock()
38 r.inactive = true
39 }
40
41 func (r *remote) tryReactivate() error {
42 conn, err := net.Dial("tcp", r.addr)
43 if err != nil {
44 return err
45 }
46 conn.Close()
47 r.mu.Lock()
48 defer r.mu.Unlock()
49 r.inactive = false
50 return nil
51 }
52
53 func (r *remote) isActive() bool {
54 r.mu.Lock()
55 defer r.mu.Unlock()
56 return !r.inactive
57 }
58
59 type TCPProxy struct {
60 Logger *zap.Logger
61 Listener net.Listener
62 Endpoints []*net.SRV
63 MonitorInterval time.Duration
64
65 donec chan struct{}
66
67 mu sync.Mutex
68 remotes []*remote
69 pickCount int
70 }
71
72 func (tp *TCPProxy) Run() error {
73 tp.donec = make(chan struct{})
74 if tp.MonitorInterval == 0 {
75 tp.MonitorInterval = 5 * time.Minute
76 }
77 for _, srv := range tp.Endpoints {
78 addr := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
79 tp.remotes = append(tp.remotes, &remote{srv: srv, addr: addr})
80 }
81
82 eps := []string{}
83 for _, ep := range tp.Endpoints {
84 eps = append(eps, fmt.Sprintf("%s:%d", ep.Target, ep.Port))
85 }
86 if tp.Logger != nil {
87 tp.Logger.Info("ready to proxy client requests", zap.Strings("endpoints", eps))
88 }
89
90 go tp.runMonitor()
91 for {
92 in, err := tp.Listener.Accept()
93 if err != nil {
94 return err
95 }
96
97 go tp.serve(in)
98 }
99 }
100
101 func (tp *TCPProxy) pick() *remote {
102 var weighted []*remote
103 var unweighted []*remote
104
105 bestPr := uint16(65535)
106 w := 0
107
108 for _, r := range tp.remotes {
109 switch {
110 case !r.isActive():
111 case r.srv.Priority < bestPr:
112 bestPr = r.srv.Priority
113 w = 0
114 weighted = nil
115 unweighted = nil
116 fallthrough
117 case r.srv.Priority == bestPr:
118 if r.srv.Weight > 0 {
119 weighted = append(weighted, r)
120 w += int(r.srv.Weight)
121 } else {
122 unweighted = append(unweighted, r)
123 }
124 }
125 }
126 if weighted != nil {
127 if len(unweighted) > 0 && rand.Intn(100) == 1 {
128
129
130
131 r := unweighted[tp.pickCount%len(unweighted)]
132 tp.pickCount++
133 return r
134 }
135
136
137
138 choose := rand.Intn(w)
139 for i := 0; i < len(weighted); i++ {
140 choose -= int(weighted[i].srv.Weight)
141 if choose <= 0 {
142 return weighted[i]
143 }
144 }
145 }
146 if unweighted != nil {
147 for i := 0; i < len(tp.remotes); i++ {
148 picked := tp.remotes[tp.pickCount%len(tp.remotes)]
149 tp.pickCount++
150 if picked.isActive() {
151 return picked
152 }
153 }
154 }
155 return nil
156 }
157
158 func (tp *TCPProxy) serve(in net.Conn) {
159 var (
160 err error
161 out net.Conn
162 )
163
164 for {
165 tp.mu.Lock()
166 remote := tp.pick()
167 tp.mu.Unlock()
168 if remote == nil {
169 break
170 }
171
172 out, err = net.Dial("tcp", remote.addr)
173 if err == nil {
174 break
175 }
176 remote.inactivate()
177 if tp.Logger != nil {
178 tp.Logger.Warn("deactivated endpoint", zap.String("address", remote.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))
179 }
180 }
181
182 if out == nil {
183 in.Close()
184 return
185 }
186
187 go func() {
188 io.Copy(in, out)
189 in.Close()
190 out.Close()
191 }()
192
193 io.Copy(out, in)
194 out.Close()
195 in.Close()
196 }
197
198 func (tp *TCPProxy) runMonitor() {
199 for {
200 select {
201 case <-time.After(tp.MonitorInterval):
202 tp.mu.Lock()
203 for _, rem := range tp.remotes {
204 if rem.isActive() {
205 continue
206 }
207 go func(r *remote) {
208 if err := r.tryReactivate(); err != nil {
209 if tp.Logger != nil {
210 tp.Logger.Warn("failed to activate endpoint (stay inactive for another interval)", zap.String("address", r.addr), zap.Duration("interval", tp.MonitorInterval), zap.Error(err))
211 }
212 } else {
213 if tp.Logger != nil {
214 tp.Logger.Info("activated", zap.String("address", r.addr))
215 }
216 }
217 }(rem)
218 }
219 tp.mu.Unlock()
220 case <-tp.donec:
221 return
222 }
223 }
224 }
225
226 func (tp *TCPProxy) Stop() {
227
228
229 tp.Listener.Close()
230 close(tp.donec)
231 }
232
View as plain text