...

Source file src/google.golang.org/grpc/internal/channelz/channelmap.go

Documentation: google.golang.org/grpc/internal/channelz

     1  /*
     2   *
     3   * Copyright 2018 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package channelz
    20  
    21  import (
    22  	"fmt"
    23  	"sort"
    24  	"sync"
    25  	"time"
    26  )
    27  
    28  // entry represents a node in the channelz database.
    29  type entry interface {
    30  	// addChild adds a child e, whose channelz id is id to child list
    31  	addChild(id int64, e entry)
    32  	// deleteChild deletes a child with channelz id to be id from child list
    33  	deleteChild(id int64)
    34  	// triggerDelete tries to delete self from channelz database. However, if
    35  	// child list is not empty, then deletion from the database is on hold until
    36  	// the last child is deleted from database.
    37  	triggerDelete()
    38  	// deleteSelfIfReady check whether triggerDelete() has been called before,
    39  	// and whether child list is now empty. If both conditions are met, then
    40  	// delete self from database.
    41  	deleteSelfIfReady()
    42  	// getParentID returns parent ID of the entry. 0 value parent ID means no parent.
    43  	getParentID() int64
    44  	Entity
    45  }
    46  
    47  // channelMap is the storage data structure for channelz.
    48  //
    49  // Methods of channelMap can be divided in two two categories with respect to
    50  // locking.
    51  //
    52  // 1. Methods acquire the global lock.
    53  // 2. Methods that can only be called when global lock is held.
    54  //
    55  // A second type of method need always to be called inside a first type of method.
    56  type channelMap struct {
    57  	mu               sync.RWMutex
    58  	topLevelChannels map[int64]struct{}
    59  	channels         map[int64]*Channel
    60  	subChannels      map[int64]*SubChannel
    61  	sockets          map[int64]*Socket
    62  	servers          map[int64]*Server
    63  }
    64  
    65  func newChannelMap() *channelMap {
    66  	return &channelMap{
    67  		topLevelChannels: make(map[int64]struct{}),
    68  		channels:         make(map[int64]*Channel),
    69  		subChannels:      make(map[int64]*SubChannel),
    70  		sockets:          make(map[int64]*Socket),
    71  		servers:          make(map[int64]*Server),
    72  	}
    73  }
    74  
    75  func (c *channelMap) addServer(id int64, s *Server) {
    76  	c.mu.Lock()
    77  	defer c.mu.Unlock()
    78  	s.cm = c
    79  	c.servers[id] = s
    80  }
    81  
    82  func (c *channelMap) addChannel(id int64, cn *Channel, isTopChannel bool, pid int64) {
    83  	c.mu.Lock()
    84  	defer c.mu.Unlock()
    85  	cn.trace.cm = c
    86  	c.channels[id] = cn
    87  	if isTopChannel {
    88  		c.topLevelChannels[id] = struct{}{}
    89  	} else if p := c.channels[pid]; p != nil {
    90  		p.addChild(id, cn)
    91  	} else {
    92  		logger.Infof("channel %d references invalid parent ID %d", id, pid)
    93  	}
    94  }
    95  
    96  func (c *channelMap) addSubChannel(id int64, sc *SubChannel, pid int64) {
    97  	c.mu.Lock()
    98  	defer c.mu.Unlock()
    99  	sc.trace.cm = c
   100  	c.subChannels[id] = sc
   101  	if p := c.channels[pid]; p != nil {
   102  		p.addChild(id, sc)
   103  	} else {
   104  		logger.Infof("subchannel %d references invalid parent ID %d", id, pid)
   105  	}
   106  }
   107  
   108  func (c *channelMap) addSocket(s *Socket) {
   109  	c.mu.Lock()
   110  	defer c.mu.Unlock()
   111  	s.cm = c
   112  	c.sockets[s.ID] = s
   113  	if s.Parent == nil {
   114  		logger.Infof("normal socket %d has no parent", s.ID)
   115  	}
   116  	s.Parent.(entry).addChild(s.ID, s)
   117  }
   118  
   119  // removeEntry triggers the removal of an entry, which may not indeed delete the
   120  // entry, if it has to wait on the deletion of its children and until no other
   121  // entity's channel trace references it.  It may lead to a chain of entry
   122  // deletion. For example, deleting the last socket of a gracefully shutting down
   123  // server will lead to the server being also deleted.
   124  func (c *channelMap) removeEntry(id int64) {
   125  	c.mu.Lock()
   126  	defer c.mu.Unlock()
   127  	c.findEntry(id).triggerDelete()
   128  }
   129  
   130  // tracedChannel represents tracing operations which are present on both
   131  // channels and subChannels.
   132  type tracedChannel interface {
   133  	getChannelTrace() *ChannelTrace
   134  	incrTraceRefCount()
   135  	decrTraceRefCount()
   136  	getRefName() string
   137  }
   138  
   139  // c.mu must be held by the caller
   140  func (c *channelMap) decrTraceRefCount(id int64) {
   141  	e := c.findEntry(id)
   142  	if v, ok := e.(tracedChannel); ok {
   143  		v.decrTraceRefCount()
   144  		e.deleteSelfIfReady()
   145  	}
   146  }
   147  
   148  // c.mu must be held by the caller.
   149  func (c *channelMap) findEntry(id int64) entry {
   150  	if v, ok := c.channels[id]; ok {
   151  		return v
   152  	}
   153  	if v, ok := c.subChannels[id]; ok {
   154  		return v
   155  	}
   156  	if v, ok := c.servers[id]; ok {
   157  		return v
   158  	}
   159  	if v, ok := c.sockets[id]; ok {
   160  		return v
   161  	}
   162  	return &dummyEntry{idNotFound: id}
   163  }
   164  
   165  // c.mu must be held by the caller
   166  //
   167  // deleteEntry deletes an entry from the channelMap. Before calling this method,
   168  // caller must check this entry is ready to be deleted, i.e removeEntry() has
   169  // been called on it, and no children still exist.
   170  func (c *channelMap) deleteEntry(id int64) entry {
   171  	if v, ok := c.sockets[id]; ok {
   172  		delete(c.sockets, id)
   173  		return v
   174  	}
   175  	if v, ok := c.subChannels[id]; ok {
   176  		delete(c.subChannels, id)
   177  		return v
   178  	}
   179  	if v, ok := c.channels[id]; ok {
   180  		delete(c.channels, id)
   181  		delete(c.topLevelChannels, id)
   182  		return v
   183  	}
   184  	if v, ok := c.servers[id]; ok {
   185  		delete(c.servers, id)
   186  		return v
   187  	}
   188  	return &dummyEntry{idNotFound: id}
   189  }
   190  
   191  func (c *channelMap) traceEvent(id int64, desc *TraceEvent) {
   192  	c.mu.Lock()
   193  	defer c.mu.Unlock()
   194  	child := c.findEntry(id)
   195  	childTC, ok := child.(tracedChannel)
   196  	if !ok {
   197  		return
   198  	}
   199  	childTC.getChannelTrace().append(&traceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
   200  	if desc.Parent != nil {
   201  		parent := c.findEntry(child.getParentID())
   202  		var chanType RefChannelType
   203  		switch child.(type) {
   204  		case *Channel:
   205  			chanType = RefChannel
   206  		case *SubChannel:
   207  			chanType = RefSubChannel
   208  		}
   209  		if parentTC, ok := parent.(tracedChannel); ok {
   210  			parentTC.getChannelTrace().append(&traceEvent{
   211  				Desc:      desc.Parent.Desc,
   212  				Severity:  desc.Parent.Severity,
   213  				Timestamp: time.Now(),
   214  				RefID:     id,
   215  				RefName:   childTC.getRefName(),
   216  				RefType:   chanType,
   217  			})
   218  			childTC.incrTraceRefCount()
   219  		}
   220  	}
   221  }
   222  
   223  type int64Slice []int64
   224  
   225  func (s int64Slice) Len() int           { return len(s) }
   226  func (s int64Slice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
   227  func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
   228  
   229  func copyMap(m map[int64]string) map[int64]string {
   230  	n := make(map[int64]string)
   231  	for k, v := range m {
   232  		n[k] = v
   233  	}
   234  	return n
   235  }
   236  
   237  func min(a, b int) int {
   238  	if a < b {
   239  		return a
   240  	}
   241  	return b
   242  }
   243  
   244  func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) {
   245  	if maxResults <= 0 {
   246  		maxResults = EntriesPerPage
   247  	}
   248  	c.mu.RLock()
   249  	defer c.mu.RUnlock()
   250  	l := int64(len(c.topLevelChannels))
   251  	ids := make([]int64, 0, l)
   252  
   253  	for k := range c.topLevelChannels {
   254  		ids = append(ids, k)
   255  	}
   256  	sort.Sort(int64Slice(ids))
   257  	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
   258  	end := true
   259  	var t []*Channel
   260  	for _, v := range ids[idx:] {
   261  		if len(t) == maxResults {
   262  			end = false
   263  			break
   264  		}
   265  		if cn, ok := c.channels[v]; ok {
   266  			t = append(t, cn)
   267  		}
   268  	}
   269  	return t, end
   270  }
   271  
   272  func (c *channelMap) getServers(id int64, maxResults int) ([]*Server, bool) {
   273  	if maxResults <= 0 {
   274  		maxResults = EntriesPerPage
   275  	}
   276  	c.mu.RLock()
   277  	defer c.mu.RUnlock()
   278  	ids := make([]int64, 0, len(c.servers))
   279  	for k := range c.servers {
   280  		ids = append(ids, k)
   281  	}
   282  	sort.Sort(int64Slice(ids))
   283  	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
   284  	end := true
   285  	var s []*Server
   286  	for _, v := range ids[idx:] {
   287  		if len(s) == maxResults {
   288  			end = false
   289  			break
   290  		}
   291  		if svr, ok := c.servers[v]; ok {
   292  			s = append(s, svr)
   293  		}
   294  	}
   295  	return s, end
   296  }
   297  
   298  func (c *channelMap) getServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) {
   299  	if maxResults <= 0 {
   300  		maxResults = EntriesPerPage
   301  	}
   302  	c.mu.RLock()
   303  	defer c.mu.RUnlock()
   304  	svr, ok := c.servers[id]
   305  	if !ok {
   306  		// server with id doesn't exist.
   307  		return nil, true
   308  	}
   309  	svrskts := svr.sockets
   310  	ids := make([]int64, 0, len(svrskts))
   311  	sks := make([]*Socket, 0, min(len(svrskts), maxResults))
   312  	for k := range svrskts {
   313  		ids = append(ids, k)
   314  	}
   315  	sort.Sort(int64Slice(ids))
   316  	idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
   317  	end := true
   318  	for _, v := range ids[idx:] {
   319  		if len(sks) == maxResults {
   320  			end = false
   321  			break
   322  		}
   323  		if ns, ok := c.sockets[v]; ok {
   324  			sks = append(sks, ns)
   325  		}
   326  	}
   327  	return sks, end
   328  }
   329  
   330  func (c *channelMap) getChannel(id int64) *Channel {
   331  	c.mu.RLock()
   332  	defer c.mu.RUnlock()
   333  	return c.channels[id]
   334  }
   335  
   336  func (c *channelMap) getSubChannel(id int64) *SubChannel {
   337  	c.mu.RLock()
   338  	defer c.mu.RUnlock()
   339  	return c.subChannels[id]
   340  }
   341  
   342  func (c *channelMap) getSocket(id int64) *Socket {
   343  	c.mu.RLock()
   344  	defer c.mu.RUnlock()
   345  	return c.sockets[id]
   346  }
   347  
   348  func (c *channelMap) getServer(id int64) *Server {
   349  	c.mu.RLock()
   350  	defer c.mu.RUnlock()
   351  	return c.servers[id]
   352  }
   353  
   354  type dummyEntry struct {
   355  	// dummyEntry is a fake entry to handle entry not found case.
   356  	idNotFound int64
   357  	Entity
   358  }
   359  
   360  func (d *dummyEntry) String() string {
   361  	return fmt.Sprintf("non-existent entity #%d", d.idNotFound)
   362  }
   363  
   364  func (d *dummyEntry) ID() int64 { return d.idNotFound }
   365  
   366  func (d *dummyEntry) addChild(id int64, e entry) {
   367  	// Note: It is possible for a normal program to reach here under race
   368  	// condition.  For example, there could be a race between ClientConn.Close()
   369  	// info being propagated to addrConn and http2Client. ClientConn.Close()
   370  	// cancel the context and result in http2Client to error. The error info is
   371  	// then caught by transport monitor and before addrConn.tearDown() is called
   372  	// in side ClientConn.Close(). Therefore, the addrConn will create a new
   373  	// transport. And when registering the new transport in channelz, its parent
   374  	// addrConn could have already been torn down and deleted from channelz
   375  	// tracking, and thus reach the code here.
   376  	logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
   377  }
   378  
   379  func (d *dummyEntry) deleteChild(id int64) {
   380  	// It is possible for a normal program to reach here under race condition.
   381  	// Refer to the example described in addChild().
   382  	logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
   383  }
   384  
   385  func (d *dummyEntry) triggerDelete() {
   386  	logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
   387  }
   388  
   389  func (*dummyEntry) deleteSelfIfReady() {
   390  	// code should not reach here. deleteSelfIfReady is always called on an existing entry.
   391  }
   392  
   393  func (*dummyEntry) getParentID() int64 {
   394  	return 0
   395  }
   396  
   397  // Entity is implemented by all channelz types.
   398  type Entity interface {
   399  	isEntity()
   400  	fmt.Stringer
   401  	id() int64
   402  }
   403  

View as plain text