...
1
18
19 package xdsclient
20
21 import (
22 "fmt"
23 "sync"
24 "sync/atomic"
25 )
26
27 type clusterNameAndServiceName struct {
28 clusterName, edsServiceName string
29 }
30
31 type clusterRequestsCounter struct {
32 mu sync.Mutex
33 clusters map[clusterNameAndServiceName]*ClusterRequestsCounter
34 }
35
36 var src = &clusterRequestsCounter{
37 clusters: make(map[clusterNameAndServiceName]*ClusterRequestsCounter),
38 }
39
40
41
42 type ClusterRequestsCounter struct {
43 ClusterName string
44 EDSServiceName string
45 numRequests uint32
46 }
47
48
49
50 func GetClusterRequestsCounter(clusterName, edsServiceName string) *ClusterRequestsCounter {
51 src.mu.Lock()
52 defer src.mu.Unlock()
53 k := clusterNameAndServiceName{
54 clusterName: clusterName,
55 edsServiceName: edsServiceName,
56 }
57 c, ok := src.clusters[k]
58 if !ok {
59 c = &ClusterRequestsCounter{ClusterName: clusterName}
60 src.clusters[k] = c
61 }
62 return c
63 }
64
65
66
67 func (c *ClusterRequestsCounter) StartRequest(max uint32) error {
68
69
70
71
72 if atomic.LoadUint32(&c.numRequests) >= max {
73 return fmt.Errorf("max requests %v exceeded on service %v", max, c.ClusterName)
74 }
75 atomic.AddUint32(&c.numRequests, 1)
76 return nil
77 }
78
79
80
81 func (c *ClusterRequestsCounter) EndRequest() {
82 atomic.AddUint32(&c.numRequests, ^uint32(0))
83 }
84
85
86
87 func ClearCounterForTesting(clusterName, edsServiceName string) {
88 src.mu.Lock()
89 defer src.mu.Unlock()
90 k := clusterNameAndServiceName{
91 clusterName: clusterName,
92 edsServiceName: edsServiceName,
93 }
94 c, ok := src.clusters[k]
95 if !ok {
96 return
97 }
98 c.numRequests = 0
99 }
100
101
102
103 func ClearAllCountersForTesting() {
104 src.mu.Lock()
105 defer src.mu.Unlock()
106 src.clusters = make(map[clusterNameAndServiceName]*ClusterRequestsCounter)
107 }
108
View as plain text