...
1
18
19 package grpclb
20
21 import (
22 "sync"
23 "sync/atomic"
24
25 "google.golang.org/grpc/balancer"
26 lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/internal/grpcrand"
29 "google.golang.org/grpc/status"
30 )
31
32
33
34 type rpcStats struct {
35
36 numCallsStarted int64
37 numCallsFinished int64
38 numCallsFinishedWithClientFailedToSend int64
39 numCallsFinishedKnownReceived int64
40
41 mu sync.Mutex
42
43 numCallsDropped map[string]int64
44 }
45
46 func newRPCStats() *rpcStats {
47 return &rpcStats{
48 numCallsDropped: make(map[string]int64),
49 }
50 }
51
52 func isZeroStats(stats *lbpb.ClientStats) bool {
53 return len(stats.CallsFinishedWithDrop) == 0 &&
54 stats.NumCallsStarted == 0 &&
55 stats.NumCallsFinished == 0 &&
56 stats.NumCallsFinishedWithClientFailedToSend == 0 &&
57 stats.NumCallsFinishedKnownReceived == 0
58 }
59
60
61 func (s *rpcStats) toClientStats() *lbpb.ClientStats {
62 stats := &lbpb.ClientStats{
63 NumCallsStarted: atomic.SwapInt64(&s.numCallsStarted, 0),
64 NumCallsFinished: atomic.SwapInt64(&s.numCallsFinished, 0),
65 NumCallsFinishedWithClientFailedToSend: atomic.SwapInt64(&s.numCallsFinishedWithClientFailedToSend, 0),
66 NumCallsFinishedKnownReceived: atomic.SwapInt64(&s.numCallsFinishedKnownReceived, 0),
67 }
68 s.mu.Lock()
69 dropped := s.numCallsDropped
70 s.numCallsDropped = make(map[string]int64)
71 s.mu.Unlock()
72 for token, count := range dropped {
73 stats.CallsFinishedWithDrop = append(stats.CallsFinishedWithDrop, &lbpb.ClientStatsPerToken{
74 LoadBalanceToken: token,
75 NumCalls: count,
76 })
77 }
78 return stats
79 }
80
81 func (s *rpcStats) drop(token string) {
82 atomic.AddInt64(&s.numCallsStarted, 1)
83 s.mu.Lock()
84 s.numCallsDropped[token]++
85 s.mu.Unlock()
86 atomic.AddInt64(&s.numCallsFinished, 1)
87 }
88
89 func (s *rpcStats) failedToSend() {
90 atomic.AddInt64(&s.numCallsStarted, 1)
91 atomic.AddInt64(&s.numCallsFinishedWithClientFailedToSend, 1)
92 atomic.AddInt64(&s.numCallsFinished, 1)
93 }
94
95 func (s *rpcStats) knownReceived() {
96 atomic.AddInt64(&s.numCallsStarted, 1)
97 atomic.AddInt64(&s.numCallsFinishedKnownReceived, 1)
98 atomic.AddInt64(&s.numCallsFinished, 1)
99 }
100
101
102
103
104
105
106 type rrPicker struct {
107 mu sync.Mutex
108 subConns []balancer.SubConn
109 subConnsNext int
110 }
111
112 func newRRPicker(readySCs []balancer.SubConn) *rrPicker {
113 return &rrPicker{
114 subConns: readySCs,
115 subConnsNext: grpcrand.Intn(len(readySCs)),
116 }
117 }
118
119 func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
120 p.mu.Lock()
121 defer p.mu.Unlock()
122 sc := p.subConns[p.subConnsNext]
123 p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
124 return balancer.PickResult{SubConn: sc}, nil
125 }
126
127
128
129
130
131
132
133
134
135
136 type lbPicker struct {
137 mu sync.Mutex
138 serverList []*lbpb.Server
139 serverListNext int
140 subConns []balancer.SubConn
141 subConnsNext int
142
143 stats *rpcStats
144 }
145
146 func newLBPicker(serverList []*lbpb.Server, readySCs []balancer.SubConn, stats *rpcStats) *lbPicker {
147 return &lbPicker{
148 serverList: serverList,
149 subConns: readySCs,
150 subConnsNext: grpcrand.Intn(len(readySCs)),
151 stats: stats,
152 }
153 }
154
155 func (p *lbPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
156 p.mu.Lock()
157 defer p.mu.Unlock()
158
159
160 s := p.serverList[p.serverListNext]
161 p.serverListNext = (p.serverListNext + 1) % len(p.serverList)
162
163
164 if s.Drop {
165 p.stats.drop(s.LoadBalanceToken)
166 return balancer.PickResult{}, status.Errorf(codes.Unavailable, "request dropped by grpclb")
167 }
168
169
170 if len(p.subConns) <= 0 {
171 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
172 }
173
174
175 sc := p.subConns[p.subConnsNext]
176 p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
177 done := func(info balancer.DoneInfo) {
178 if !info.BytesSent {
179 p.stats.failedToSend()
180 } else if info.BytesReceived {
181 p.stats.knownReceived()
182 }
183 }
184 return balancer.PickResult{SubConn: sc, Done: done}, nil
185 }
186
187 func (p *lbPicker) updateReadySCs(readySCs []balancer.SubConn) {
188 p.mu.Lock()
189 defer p.mu.Unlock()
190
191 p.subConns = readySCs
192 p.subConnsNext = p.subConnsNext % len(readySCs)
193 }
194
View as plain text