...

Source file src/google.golang.org/grpc/xds/internal/xdsclient/clientimpl_authority.go

Documentation: google.golang.org/grpc/xds/internal/xdsclient

     1  /*
     2   *
     3   * Copyright 2022 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   */
    17  
    18  package xdsclient
    19  
    20  import (
    21  	"errors"
    22  	"fmt"
    23  
    24  	"google.golang.org/grpc/internal/grpclog"
    25  	"google.golang.org/grpc/internal/xds/bootstrap"
    26  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    27  )
    28  
    29  // findAuthority returns the authority for this name. If it doesn't already
    30  // exist, one will be created.
    31  //
    32  // Note that this doesn't always create new authority. authorities with the same
    33  // config but different names are shared.
    34  //
    35  // The returned unref function must be called when the caller is done using this
    36  // authority, without holding c.authorityMu.
    37  //
    38  // Caller must not hold c.authorityMu.
    39  func (c *clientImpl) findAuthority(n *xdsresource.Name) (*authority, func(), error) {
    40  	scheme, authority := n.Scheme, n.Authority
    41  
    42  	c.authorityMu.Lock()
    43  	defer c.authorityMu.Unlock()
    44  	if c.done.HasFired() {
    45  		return nil, nil, errors.New("the xds-client is closed")
    46  	}
    47  
    48  	config := c.config.XDSServer
    49  	if scheme == xdsresource.FederationScheme {
    50  		cfg, ok := c.config.Authorities[authority]
    51  		if !ok {
    52  			return nil, nil, fmt.Errorf("xds: failed to find authority %q", authority)
    53  		}
    54  		if cfg.XDSServer != nil {
    55  			config = cfg.XDSServer
    56  		}
    57  	}
    58  
    59  	a, err := c.newAuthorityLocked(config)
    60  	if err != nil {
    61  		return nil, nil, fmt.Errorf("xds: failed to connect to the control plane for authority %q: %v", authority, err)
    62  	}
    63  	// All returned authority from this function will be used by a watch,
    64  	// holding the ref here.
    65  	//
    66  	// Note that this must be done while c.authorityMu is held, to avoid the
    67  	// race that an authority is returned, but before the watch starts, the
    68  	// old last watch is canceled (in another goroutine), causing this
    69  	// authority to be removed, and then a watch will start on a removed
    70  	// authority.
    71  	//
    72  	// unref() will be done when the watch is canceled.
    73  	a.refLocked()
    74  	return a, func() { c.unrefAuthority(a) }, nil
    75  }
    76  
    77  // newAuthorityLocked creates a new authority for the given config.  If an
    78  // authority for the given config exists in the cache, it is returned instead of
    79  // creating a new one.
    80  //
    81  // The caller must take a reference of the returned authority before using, and
    82  // unref afterwards.
    83  //
    84  // caller must hold c.authorityMu
    85  func (c *clientImpl) newAuthorityLocked(config *bootstrap.ServerConfig) (_ *authority, retErr error) {
    86  	// First check if there's already an authority for this config. If found, it
    87  	// means this authority is used by other watches (could be the same
    88  	// authority name, or a different authority name but the same server
    89  	// config). Return it.
    90  	configStr := config.String()
    91  	if a, ok := c.authorities[configStr]; ok {
    92  		return a, nil
    93  	}
    94  	// Second check if there's an authority in the idle cache. If found, it
    95  	// means this authority was created, but moved to the idle cache because the
    96  	// watch was canceled. Move it from idle cache to the authority cache, and
    97  	// return.
    98  	if old, ok := c.idleAuthorities.Remove(configStr); ok {
    99  		oldA, _ := old.(*authority)
   100  		if oldA != nil {
   101  			c.authorities[configStr] = oldA
   102  			return oldA, nil
   103  		}
   104  	}
   105  
   106  	// Make a new authority since there's no existing authority for this config.
   107  	ret, err := newAuthority(authorityArgs{
   108  		serverCfg:          config,
   109  		bootstrapCfg:       c.config,
   110  		serializer:         c.serializer,
   111  		resourceTypeGetter: c.resourceTypes.get,
   112  		watchExpiryTimeout: c.watchExpiryTimeout,
   113  		logger:             grpclog.NewPrefixLogger(logger, authorityPrefix(c, config.ServerURI)),
   114  	})
   115  	if err != nil {
   116  		return nil, fmt.Errorf("creating new authority for config %q: %v", config.String(), err)
   117  	}
   118  	// Add it to the cache, so it will be reused.
   119  	c.authorities[configStr] = ret
   120  	return ret, nil
   121  }
   122  
   123  // unrefAuthority unrefs the authority. It also moves the authority to idle
   124  // cache if it's ref count is 0.
   125  //
   126  // This function doesn't need to called explicitly. It's called by the returned
   127  // unref from findAuthority().
   128  //
   129  // Caller must not hold c.authorityMu.
   130  func (c *clientImpl) unrefAuthority(a *authority) {
   131  	c.authorityMu.Lock()
   132  	defer c.authorityMu.Unlock()
   133  	if a.unrefLocked() > 0 {
   134  		return
   135  	}
   136  	configStr := a.serverCfg.String()
   137  	delete(c.authorities, configStr)
   138  	c.idleAuthorities.Add(configStr, a, func() {
   139  		a.close()
   140  	})
   141  }
   142  

View as plain text