1
18
19 package channelz
20
21 import (
22 "fmt"
23 "sort"
24 "sync"
25 "time"
26 )
27
28
29 type entry interface {
30
31 addChild(id int64, e entry)
32
33 deleteChild(id int64)
34
35
36
37 triggerDelete()
38
39
40
41 deleteSelfIfReady()
42
43 getParentID() int64
44 Entity
45 }
46
47
48
49
50
51
52
53
54
55
56 type channelMap struct {
57 mu sync.RWMutex
58 topLevelChannels map[int64]struct{}
59 channels map[int64]*Channel
60 subChannels map[int64]*SubChannel
61 sockets map[int64]*Socket
62 servers map[int64]*Server
63 }
64
65 func newChannelMap() *channelMap {
66 return &channelMap{
67 topLevelChannels: make(map[int64]struct{}),
68 channels: make(map[int64]*Channel),
69 subChannels: make(map[int64]*SubChannel),
70 sockets: make(map[int64]*Socket),
71 servers: make(map[int64]*Server),
72 }
73 }
74
75 func (c *channelMap) addServer(id int64, s *Server) {
76 c.mu.Lock()
77 defer c.mu.Unlock()
78 s.cm = c
79 c.servers[id] = s
80 }
81
82 func (c *channelMap) addChannel(id int64, cn *Channel, isTopChannel bool, pid int64) {
83 c.mu.Lock()
84 defer c.mu.Unlock()
85 cn.trace.cm = c
86 c.channels[id] = cn
87 if isTopChannel {
88 c.topLevelChannels[id] = struct{}{}
89 } else if p := c.channels[pid]; p != nil {
90 p.addChild(id, cn)
91 } else {
92 logger.Infof("channel %d references invalid parent ID %d", id, pid)
93 }
94 }
95
96 func (c *channelMap) addSubChannel(id int64, sc *SubChannel, pid int64) {
97 c.mu.Lock()
98 defer c.mu.Unlock()
99 sc.trace.cm = c
100 c.subChannels[id] = sc
101 if p := c.channels[pid]; p != nil {
102 p.addChild(id, sc)
103 } else {
104 logger.Infof("subchannel %d references invalid parent ID %d", id, pid)
105 }
106 }
107
108 func (c *channelMap) addSocket(s *Socket) {
109 c.mu.Lock()
110 defer c.mu.Unlock()
111 s.cm = c
112 c.sockets[s.ID] = s
113 if s.Parent == nil {
114 logger.Infof("normal socket %d has no parent", s.ID)
115 }
116 s.Parent.(entry).addChild(s.ID, s)
117 }
118
119
120
121
122
123
124 func (c *channelMap) removeEntry(id int64) {
125 c.mu.Lock()
126 defer c.mu.Unlock()
127 c.findEntry(id).triggerDelete()
128 }
129
130
131
132 type tracedChannel interface {
133 getChannelTrace() *ChannelTrace
134 incrTraceRefCount()
135 decrTraceRefCount()
136 getRefName() string
137 }
138
139
140 func (c *channelMap) decrTraceRefCount(id int64) {
141 e := c.findEntry(id)
142 if v, ok := e.(tracedChannel); ok {
143 v.decrTraceRefCount()
144 e.deleteSelfIfReady()
145 }
146 }
147
148
149 func (c *channelMap) findEntry(id int64) entry {
150 if v, ok := c.channels[id]; ok {
151 return v
152 }
153 if v, ok := c.subChannels[id]; ok {
154 return v
155 }
156 if v, ok := c.servers[id]; ok {
157 return v
158 }
159 if v, ok := c.sockets[id]; ok {
160 return v
161 }
162 return &dummyEntry{idNotFound: id}
163 }
164
165
166
167
168
169
170 func (c *channelMap) deleteEntry(id int64) entry {
171 if v, ok := c.sockets[id]; ok {
172 delete(c.sockets, id)
173 return v
174 }
175 if v, ok := c.subChannels[id]; ok {
176 delete(c.subChannels, id)
177 return v
178 }
179 if v, ok := c.channels[id]; ok {
180 delete(c.channels, id)
181 delete(c.topLevelChannels, id)
182 return v
183 }
184 if v, ok := c.servers[id]; ok {
185 delete(c.servers, id)
186 return v
187 }
188 return &dummyEntry{idNotFound: id}
189 }
190
191 func (c *channelMap) traceEvent(id int64, desc *TraceEvent) {
192 c.mu.Lock()
193 defer c.mu.Unlock()
194 child := c.findEntry(id)
195 childTC, ok := child.(tracedChannel)
196 if !ok {
197 return
198 }
199 childTC.getChannelTrace().append(&traceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
200 if desc.Parent != nil {
201 parent := c.findEntry(child.getParentID())
202 var chanType RefChannelType
203 switch child.(type) {
204 case *Channel:
205 chanType = RefChannel
206 case *SubChannel:
207 chanType = RefSubChannel
208 }
209 if parentTC, ok := parent.(tracedChannel); ok {
210 parentTC.getChannelTrace().append(&traceEvent{
211 Desc: desc.Parent.Desc,
212 Severity: desc.Parent.Severity,
213 Timestamp: time.Now(),
214 RefID: id,
215 RefName: childTC.getRefName(),
216 RefType: chanType,
217 })
218 childTC.incrTraceRefCount()
219 }
220 }
221 }
222
223 type int64Slice []int64
224
225 func (s int64Slice) Len() int { return len(s) }
226 func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
227 func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
228
229 func copyMap(m map[int64]string) map[int64]string {
230 n := make(map[int64]string)
231 for k, v := range m {
232 n[k] = v
233 }
234 return n
235 }
236
237 func min(a, b int) int {
238 if a < b {
239 return a
240 }
241 return b
242 }
243
244 func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) {
245 if maxResults <= 0 {
246 maxResults = EntriesPerPage
247 }
248 c.mu.RLock()
249 defer c.mu.RUnlock()
250 l := int64(len(c.topLevelChannels))
251 ids := make([]int64, 0, l)
252
253 for k := range c.topLevelChannels {
254 ids = append(ids, k)
255 }
256 sort.Sort(int64Slice(ids))
257 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
258 end := true
259 var t []*Channel
260 for _, v := range ids[idx:] {
261 if len(t) == maxResults {
262 end = false
263 break
264 }
265 if cn, ok := c.channels[v]; ok {
266 t = append(t, cn)
267 }
268 }
269 return t, end
270 }
271
272 func (c *channelMap) getServers(id int64, maxResults int) ([]*Server, bool) {
273 if maxResults <= 0 {
274 maxResults = EntriesPerPage
275 }
276 c.mu.RLock()
277 defer c.mu.RUnlock()
278 ids := make([]int64, 0, len(c.servers))
279 for k := range c.servers {
280 ids = append(ids, k)
281 }
282 sort.Sort(int64Slice(ids))
283 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
284 end := true
285 var s []*Server
286 for _, v := range ids[idx:] {
287 if len(s) == maxResults {
288 end = false
289 break
290 }
291 if svr, ok := c.servers[v]; ok {
292 s = append(s, svr)
293 }
294 }
295 return s, end
296 }
297
298 func (c *channelMap) getServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) {
299 if maxResults <= 0 {
300 maxResults = EntriesPerPage
301 }
302 c.mu.RLock()
303 defer c.mu.RUnlock()
304 svr, ok := c.servers[id]
305 if !ok {
306
307 return nil, true
308 }
309 svrskts := svr.sockets
310 ids := make([]int64, 0, len(svrskts))
311 sks := make([]*Socket, 0, min(len(svrskts), maxResults))
312 for k := range svrskts {
313 ids = append(ids, k)
314 }
315 sort.Sort(int64Slice(ids))
316 idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
317 end := true
318 for _, v := range ids[idx:] {
319 if len(sks) == maxResults {
320 end = false
321 break
322 }
323 if ns, ok := c.sockets[v]; ok {
324 sks = append(sks, ns)
325 }
326 }
327 return sks, end
328 }
329
330 func (c *channelMap) getChannel(id int64) *Channel {
331 c.mu.RLock()
332 defer c.mu.RUnlock()
333 return c.channels[id]
334 }
335
336 func (c *channelMap) getSubChannel(id int64) *SubChannel {
337 c.mu.RLock()
338 defer c.mu.RUnlock()
339 return c.subChannels[id]
340 }
341
342 func (c *channelMap) getSocket(id int64) *Socket {
343 c.mu.RLock()
344 defer c.mu.RUnlock()
345 return c.sockets[id]
346 }
347
348 func (c *channelMap) getServer(id int64) *Server {
349 c.mu.RLock()
350 defer c.mu.RUnlock()
351 return c.servers[id]
352 }
353
354 type dummyEntry struct {
355
356 idNotFound int64
357 Entity
358 }
359
360 func (d *dummyEntry) String() string {
361 return fmt.Sprintf("non-existent entity #%d", d.idNotFound)
362 }
363
364 func (d *dummyEntry) ID() int64 { return d.idNotFound }
365
366 func (d *dummyEntry) addChild(id int64, e entry) {
367
368
369
370
371
372
373
374
375
376 logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
377 }
378
379 func (d *dummyEntry) deleteChild(id int64) {
380
381
382 logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
383 }
384
385 func (d *dummyEntry) triggerDelete() {
386 logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
387 }
388
389 func (*dummyEntry) deleteSelfIfReady() {
390
391 }
392
393 func (*dummyEntry) getParentID() int64 {
394 return 0
395 }
396
397
398 type Entity interface {
399 isEntity()
400 fmt.Stringer
401 id() int64
402 }
403
View as plain text