...
1
18
19 package channelz
20
21 import (
22 "fmt"
23 "sync/atomic"
24
25 "google.golang.org/grpc/connectivity"
26 )
27
28
29
30 type Channel struct {
31 Entity
32
33 ID int64
34
35 RefName string
36
37 closeCalled bool
38 nestedChans map[int64]string
39 subChans map[int64]string
40 Parent *Channel
41 trace *ChannelTrace
42
43
44 traceRefCount int32
45
46 ChannelMetrics ChannelMetrics
47 }
48
49
50
51 func (c *Channel) channelzIdentifier() {}
52
53 func (c *Channel) String() string {
54 if c.Parent == nil {
55 return fmt.Sprintf("Channel #%d", c.ID)
56 }
57 return fmt.Sprintf("%s Channel #%d", c.Parent, c.ID)
58 }
59
60 func (c *Channel) id() int64 {
61 return c.ID
62 }
63
64 func (c *Channel) SubChans() map[int64]string {
65 db.mu.RLock()
66 defer db.mu.RUnlock()
67 return copyMap(c.subChans)
68 }
69
70 func (c *Channel) NestedChans() map[int64]string {
71 db.mu.RLock()
72 defer db.mu.RUnlock()
73 return copyMap(c.nestedChans)
74 }
75
76 func (c *Channel) Trace() *ChannelTrace {
77 db.mu.RLock()
78 defer db.mu.RUnlock()
79 return c.trace.copy()
80 }
81
82 type ChannelMetrics struct {
83
84 State atomic.Pointer[connectivity.State]
85
86 Target atomic.Pointer[string]
87
88 CallsStarted atomic.Int64
89
90 CallsSucceeded atomic.Int64
91
92 CallsFailed atomic.Int64
93
94 LastCallStartedTimestamp atomic.Int64
95 }
96
97
98 func (c *ChannelMetrics) CopyFrom(o *ChannelMetrics) {
99 c.State.Store(o.State.Load())
100 c.Target.Store(o.Target.Load())
101 c.CallsStarted.Store(o.CallsStarted.Load())
102 c.CallsSucceeded.Store(o.CallsSucceeded.Load())
103 c.CallsFailed.Store(o.CallsFailed.Load())
104 c.LastCallStartedTimestamp.Store(o.LastCallStartedTimestamp.Load())
105 }
106
107
108
109 func (c *ChannelMetrics) Equal(o any) bool {
110 oc, ok := o.(*ChannelMetrics)
111 if !ok {
112 return false
113 }
114 if (c.State.Load() == nil) != (oc.State.Load() == nil) {
115 return false
116 }
117 if c.State.Load() != nil && *c.State.Load() != *oc.State.Load() {
118 return false
119 }
120 if (c.Target.Load() == nil) != (oc.Target.Load() == nil) {
121 return false
122 }
123 if c.Target.Load() != nil && *c.Target.Load() != *oc.Target.Load() {
124 return false
125 }
126 return c.CallsStarted.Load() == oc.CallsStarted.Load() &&
127 c.CallsFailed.Load() == oc.CallsFailed.Load() &&
128 c.CallsSucceeded.Load() == oc.CallsSucceeded.Load() &&
129 c.LastCallStartedTimestamp.Load() == oc.LastCallStartedTimestamp.Load()
130 }
131
132 func strFromPointer(s *string) string {
133 if s == nil {
134 return ""
135 }
136 return *s
137 }
138
139 func (c *ChannelMetrics) String() string {
140 return fmt.Sprintf("State: %v, Target: %s, CallsStarted: %v, CallsSucceeded: %v, CallsFailed: %v, LastCallStartedTimestamp: %v",
141 c.State.Load(), strFromPointer(c.Target.Load()), c.CallsStarted.Load(), c.CallsSucceeded.Load(), c.CallsFailed.Load(), c.LastCallStartedTimestamp.Load(),
142 )
143 }
144
145 func NewChannelMetricForTesting(state connectivity.State, target string, started, succeeded, failed, timestamp int64) *ChannelMetrics {
146 c := &ChannelMetrics{}
147 c.State.Store(&state)
148 c.Target.Store(&target)
149 c.CallsStarted.Store(started)
150 c.CallsSucceeded.Store(succeeded)
151 c.CallsFailed.Store(failed)
152 c.LastCallStartedTimestamp.Store(timestamp)
153 return c
154 }
155
156 func (c *Channel) addChild(id int64, e entry) {
157 switch v := e.(type) {
158 case *SubChannel:
159 c.subChans[id] = v.RefName
160 case *Channel:
161 c.nestedChans[id] = v.RefName
162 default:
163 logger.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e)
164 }
165 }
166
167 func (c *Channel) deleteChild(id int64) {
168 delete(c.subChans, id)
169 delete(c.nestedChans, id)
170 c.deleteSelfIfReady()
171 }
172
173 func (c *Channel) triggerDelete() {
174 c.closeCalled = true
175 c.deleteSelfIfReady()
176 }
177
178 func (c *Channel) getParentID() int64 {
179 if c.Parent == nil {
180 return -1
181 }
182 return c.Parent.ID
183 }
184
185
186
187
188
189
190
191
192 func (c *Channel) deleteSelfFromTree() (deleted bool) {
193 if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 {
194 return false
195 }
196
197 if c.Parent != nil {
198 c.Parent.deleteChild(c.ID)
199 }
200 return true
201 }
202
203
204
205
206
207
208
209
210
211
212
213
214
215 func (c *Channel) deleteSelfFromMap() (delete bool) {
216 return c.getTraceRefCount() == 0
217 }
218
219
220
221
222
223
224
225 func (c *Channel) deleteSelfIfReady() {
226 if !c.deleteSelfFromTree() {
227 return
228 }
229 if !c.deleteSelfFromMap() {
230 return
231 }
232 db.deleteEntry(c.ID)
233 c.trace.clear()
234 }
235
236 func (c *Channel) getChannelTrace() *ChannelTrace {
237 return c.trace
238 }
239
240 func (c *Channel) incrTraceRefCount() {
241 atomic.AddInt32(&c.traceRefCount, 1)
242 }
243
244 func (c *Channel) decrTraceRefCount() {
245 atomic.AddInt32(&c.traceRefCount, -1)
246 }
247
248 func (c *Channel) getTraceRefCount() int {
249 i := atomic.LoadInt32(&c.traceRefCount)
250 return int(i)
251 }
252
253 func (c *Channel) getRefName() string {
254 return c.RefName
255 }
256
View as plain text