1 /* 2 * 3 * Copyright 2022 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package xdsclient 19 20 import ( 21 "errors" 22 "fmt" 23 24 "google.golang.org/grpc/internal/grpclog" 25 "google.golang.org/grpc/internal/xds/bootstrap" 26 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" 27 ) 28 29 // findAuthority returns the authority for this name. If it doesn't already 30 // exist, one will be created. 31 // 32 // Note that this doesn't always create new authority. authorities with the same 33 // config but different names are shared. 34 // 35 // The returned unref function must be called when the caller is done using this 36 // authority, without holding c.authorityMu. 37 // 38 // Caller must not hold c.authorityMu. 39 func (c *clientImpl) findAuthority(n *xdsresource.Name) (*authority, func(), error) { 40 scheme, authority := n.Scheme, n.Authority 41 42 c.authorityMu.Lock() 43 defer c.authorityMu.Unlock() 44 if c.done.HasFired() { 45 return nil, nil, errors.New("the xds-client is closed") 46 } 47 48 config := c.config.XDSServer 49 if scheme == xdsresource.FederationScheme { 50 cfg, ok := c.config.Authorities[authority] 51 if !ok { 52 return nil, nil, fmt.Errorf("xds: failed to find authority %q", authority) 53 } 54 if cfg.XDSServer != nil { 55 config = cfg.XDSServer 56 } 57 } 58 59 a, err := c.newAuthorityLocked(config) 60 if err != nil { 61 return nil, nil, fmt.Errorf("xds: failed to connect to the control plane for authority %q: %v", authority, err) 62 } 63 // All returned authority from this function will be used by a watch, 64 // holding the ref here. 65 // 66 // Note that this must be done while c.authorityMu is held, to avoid the 67 // race that an authority is returned, but before the watch starts, the 68 // old last watch is canceled (in another goroutine), causing this 69 // authority to be removed, and then a watch will start on a removed 70 // authority. 71 // 72 // unref() will be done when the watch is canceled. 73 a.refLocked() 74 return a, func() { c.unrefAuthority(a) }, nil 75 } 76 77 // newAuthorityLocked creates a new authority for the given config. If an 78 // authority for the given config exists in the cache, it is returned instead of 79 // creating a new one. 80 // 81 // The caller must take a reference of the returned authority before using, and 82 // unref afterwards. 83 // 84 // caller must hold c.authorityMu 85 func (c *clientImpl) newAuthorityLocked(config *bootstrap.ServerConfig) (_ *authority, retErr error) { 86 // First check if there's already an authority for this config. If found, it 87 // means this authority is used by other watches (could be the same 88 // authority name, or a different authority name but the same server 89 // config). Return it. 90 configStr := config.String() 91 if a, ok := c.authorities[configStr]; ok { 92 return a, nil 93 } 94 // Second check if there's an authority in the idle cache. If found, it 95 // means this authority was created, but moved to the idle cache because the 96 // watch was canceled. Move it from idle cache to the authority cache, and 97 // return. 98 if old, ok := c.idleAuthorities.Remove(configStr); ok { 99 oldA, _ := old.(*authority) 100 if oldA != nil { 101 c.authorities[configStr] = oldA 102 return oldA, nil 103 } 104 } 105 106 // Make a new authority since there's no existing authority for this config. 107 ret, err := newAuthority(authorityArgs{ 108 serverCfg: config, 109 bootstrapCfg: c.config, 110 serializer: c.serializer, 111 resourceTypeGetter: c.resourceTypes.get, 112 watchExpiryTimeout: c.watchExpiryTimeout, 113 logger: grpclog.NewPrefixLogger(logger, authorityPrefix(c, config.ServerURI)), 114 }) 115 if err != nil { 116 return nil, fmt.Errorf("creating new authority for config %q: %v", config.String(), err) 117 } 118 // Add it to the cache, so it will be reused. 119 c.authorities[configStr] = ret 120 return ret, nil 121 } 122 123 // unrefAuthority unrefs the authority. It also moves the authority to idle 124 // cache if it's ref count is 0. 125 // 126 // This function doesn't need to called explicitly. It's called by the returned 127 // unref from findAuthority(). 128 // 129 // Caller must not hold c.authorityMu. 130 func (c *clientImpl) unrefAuthority(a *authority) { 131 c.authorityMu.Lock() 132 defer c.authorityMu.Unlock() 133 if a.unrefLocked() > 0 { 134 return 135 } 136 configStr := a.serverCfg.String() 137 delete(c.authorities, configStr) 138 c.idleAuthorities.Add(configStr, a, func() { 139 a.close() 140 }) 141 } 142