1
16
17
18 package load
19
20 import (
21 "sync"
22 "sync/atomic"
23 "time"
24 )
25
26 const negativeOneUInt64 = ^uint64(0)
27
28
29
30
31
32
33 type Store struct {
34
35
36 mu sync.Mutex
37
38
39
40
41
42
43
44
45 clusters map[string]map[string]*perClusterStore
46 }
47
48
49 func NewStore() *Store {
50 return &Store{
51 clusters: make(map[string]map[string]*perClusterStore),
52 }
53 }
54
55
56
57
58
59
60
61
62
63 func (s *Store) Stats(clusterNames []string) []*Data {
64 var ret []*Data
65 s.mu.Lock()
66 defer s.mu.Unlock()
67
68 if len(clusterNames) == 0 {
69 for _, c := range s.clusters {
70 ret = appendClusterStats(ret, c)
71 }
72 return ret
73 }
74
75 for _, n := range clusterNames {
76 if c, ok := s.clusters[n]; ok {
77 ret = appendClusterStats(ret, c)
78 }
79 }
80 return ret
81 }
82
83
84
85
86
87 func appendClusterStats(ret []*Data, cluster map[string]*perClusterStore) []*Data {
88 for _, d := range cluster {
89 data := d.stats()
90 if data == nil {
91
92 continue
93 }
94 ret = append(ret, data)
95 }
96 return ret
97 }
98
99
100
101 func (s *Store) PerCluster(clusterName, serviceName string) PerClusterReporter {
102 if s == nil {
103 return nil
104 }
105
106 s.mu.Lock()
107 defer s.mu.Unlock()
108 c, ok := s.clusters[clusterName]
109 if !ok {
110 c = make(map[string]*perClusterStore)
111 s.clusters[clusterName] = c
112 }
113
114 if p, ok := c[serviceName]; ok {
115 return p
116 }
117 p := &perClusterStore{
118 cluster: clusterName,
119 service: serviceName,
120 }
121 c[serviceName] = p
122 return p
123 }
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139 type perClusterStore struct {
140 cluster, service string
141 drops sync.Map
142 localityRPCCount sync.Map
143
144 mu sync.Mutex
145 lastLoadReportAt time.Time
146 }
147
148
149
150
151
152 func (ls *perClusterStore) CallDropped(category string) {
153 if ls == nil {
154 return
155 }
156
157 p, ok := ls.drops.Load(category)
158 if !ok {
159 tp := new(uint64)
160 p, _ = ls.drops.LoadOrStore(category, tp)
161 }
162 atomic.AddUint64(p.(*uint64), 1)
163 }
164
165
166 func (ls *perClusterStore) CallStarted(locality string) {
167 if ls == nil {
168 return
169 }
170
171 p, ok := ls.localityRPCCount.Load(locality)
172 if !ok {
173 tp := newRPCCountData()
174 p, _ = ls.localityRPCCount.LoadOrStore(locality, tp)
175 }
176 p.(*rpcCountData).incrInProgress()
177 }
178
179
180
181 func (ls *perClusterStore) CallFinished(locality string, err error) {
182 if ls == nil {
183 return
184 }
185
186 p, ok := ls.localityRPCCount.Load(locality)
187 if !ok {
188
189
190 return
191 }
192 p.(*rpcCountData).decrInProgress()
193 if err == nil {
194 p.(*rpcCountData).incrSucceeded()
195 } else {
196 p.(*rpcCountData).incrErrored()
197 }
198 }
199
200
201
202 func (ls *perClusterStore) CallServerLoad(locality, name string, d float64) {
203 if ls == nil {
204 return
205 }
206
207 p, ok := ls.localityRPCCount.Load(locality)
208 if !ok {
209
210
211 return
212 }
213 p.(*rpcCountData).addServerLoad(name, d)
214 }
215
216
217
218 type Data struct {
219
220 Cluster string
221
222 Service string
223
224 TotalDrops uint64
225
226 Drops map[string]uint64
227
228 LocalityStats map[string]LocalityData
229
230
231 ReportInterval time.Duration
232 }
233
234
235 type LocalityData struct {
236
237 RequestStats RequestData
238
239
240 LoadStats map[string]ServerLoadData
241 }
242
243
244 type RequestData struct {
245
246 Succeeded uint64
247
248 Errored uint64
249
250 InProgress uint64
251 }
252
253
254 type ServerLoadData struct {
255
256 Count uint64
257
258 Sum float64
259 }
260
261 func newData(cluster, service string) *Data {
262 return &Data{
263 Cluster: cluster,
264 Service: service,
265 Drops: make(map[string]uint64),
266 LocalityStats: make(map[string]LocalityData),
267 }
268 }
269
270
271
272
273
274 func (ls *perClusterStore) stats() *Data {
275 if ls == nil {
276 return nil
277 }
278
279 sd := newData(ls.cluster, ls.service)
280 ls.drops.Range(func(key, val any) bool {
281 d := atomic.SwapUint64(val.(*uint64), 0)
282 if d == 0 {
283 return true
284 }
285 sd.TotalDrops += d
286 keyStr := key.(string)
287 if keyStr != "" {
288
289
290 sd.Drops[keyStr] = d
291 }
292 return true
293 })
294 ls.localityRPCCount.Range(func(key, val any) bool {
295 countData := val.(*rpcCountData)
296 succeeded := countData.loadAndClearSucceeded()
297 inProgress := countData.loadInProgress()
298 errored := countData.loadAndClearErrored()
299 if succeeded == 0 && inProgress == 0 && errored == 0 {
300 return true
301 }
302
303 ld := LocalityData{
304 RequestStats: RequestData{
305 Succeeded: succeeded,
306 Errored: errored,
307 InProgress: inProgress,
308 },
309 LoadStats: make(map[string]ServerLoadData),
310 }
311 countData.serverLoads.Range(func(key, val any) bool {
312 sum, count := val.(*rpcLoadData).loadAndClear()
313 if count == 0 {
314 return true
315 }
316 ld.LoadStats[key.(string)] = ServerLoadData{
317 Count: count,
318 Sum: sum,
319 }
320 return true
321 })
322 sd.LocalityStats[key.(string)] = ld
323 return true
324 })
325
326 ls.mu.Lock()
327 sd.ReportInterval = time.Since(ls.lastLoadReportAt)
328 ls.lastLoadReportAt = time.Now()
329 ls.mu.Unlock()
330
331 if sd.TotalDrops == 0 && len(sd.Drops) == 0 && len(sd.LocalityStats) == 0 {
332 return nil
333 }
334 return sd
335 }
336
337 type rpcCountData struct {
338
339 succeeded *uint64
340 errored *uint64
341 inProgress *uint64
342
343
344
345
346
347
348 serverLoads sync.Map
349 }
350
351 func newRPCCountData() *rpcCountData {
352 return &rpcCountData{
353 succeeded: new(uint64),
354 errored: new(uint64),
355 inProgress: new(uint64),
356 }
357 }
358
359 func (rcd *rpcCountData) incrSucceeded() {
360 atomic.AddUint64(rcd.succeeded, 1)
361 }
362
363 func (rcd *rpcCountData) loadAndClearSucceeded() uint64 {
364 return atomic.SwapUint64(rcd.succeeded, 0)
365 }
366
367 func (rcd *rpcCountData) incrErrored() {
368 atomic.AddUint64(rcd.errored, 1)
369 }
370
371 func (rcd *rpcCountData) loadAndClearErrored() uint64 {
372 return atomic.SwapUint64(rcd.errored, 0)
373 }
374
375 func (rcd *rpcCountData) incrInProgress() {
376 atomic.AddUint64(rcd.inProgress, 1)
377 }
378
379 func (rcd *rpcCountData) decrInProgress() {
380 atomic.AddUint64(rcd.inProgress, negativeOneUInt64)
381 }
382
383 func (rcd *rpcCountData) loadInProgress() uint64 {
384 return atomic.LoadUint64(rcd.inProgress)
385 }
386
387 func (rcd *rpcCountData) addServerLoad(name string, d float64) {
388 loads, ok := rcd.serverLoads.Load(name)
389 if !ok {
390 tl := newRPCLoadData()
391 loads, _ = rcd.serverLoads.LoadOrStore(name, tl)
392 }
393 loads.(*rpcLoadData).add(d)
394 }
395
396
397
398
399
400
401 type rpcLoadData struct {
402 mu sync.Mutex
403 sum float64
404 count uint64
405 }
406
407 func newRPCLoadData() *rpcLoadData {
408 return &rpcLoadData{}
409 }
410
411 func (rld *rpcLoadData) add(v float64) {
412 rld.mu.Lock()
413 rld.sum += v
414 rld.count++
415 rld.mu.Unlock()
416 }
417
418 func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) {
419 rld.mu.Lock()
420 s = rld.sum
421 rld.sum = 0
422 c = rld.count
423 rld.count = 0
424 rld.mu.Unlock()
425 return
426 }
427
View as plain text