1 /* 2 * 3 * Copyright 2021 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package rls 20 21 import ( 22 "fmt" 23 "sync/atomic" 24 "unsafe" 25 26 "google.golang.org/grpc/balancer" 27 "google.golang.org/grpc/balancer/base" 28 "google.golang.org/grpc/connectivity" 29 internalgrpclog "google.golang.org/grpc/internal/grpclog" 30 ) 31 32 // childPolicyWrapper is a reference counted wrapper around a child policy. 33 // 34 // The LB policy maintains a map of these wrappers keyed by the target returned 35 // by RLS. When a target is seen for the first time, a child policy wrapper is 36 // created for it and the wrapper is added to the child policy map. Each entry 37 // in the data cache holds references to the corresponding child policy 38 // wrappers. The LB policy also holds a reference to the child policy wrapper 39 // for the default target specified in the LB Policy Configuration 40 // 41 // When a cache entry is evicted, it releases references to the child policy 42 // wrappers that it contains. When all references have been released, the 43 // wrapper is removed from the child policy map and is destroyed. 44 // 45 // The child policy wrapper also caches the connectivity state and most recent 46 // picker from the child policy. Once the child policy wrapper reports 47 // TRANSIENT_FAILURE, it will continue reporting that state until it goes READY; 48 // transitions from TRANSIENT_FAILURE to CONNECTING are ignored. 49 // 50 // Whenever a child policy wrapper changes its connectivity state, the LB policy 51 // returns a new picker to the channel, since the channel may need to re-process 52 // the picks for queued RPCs. 53 // 54 // It is not safe for concurrent access. 55 type childPolicyWrapper struct { 56 logger *internalgrpclog.PrefixLogger 57 target string // RLS target corresponding to this child policy. 58 refCnt int // Reference count. 59 60 // Balancer state reported by the child policy. The RLS LB policy maintains 61 // these child policies in a BalancerGroup. The state reported by the child 62 // policy is pushed to the state aggregator (which is also implemented by the 63 // RLS LB policy) and cached here. See handleChildPolicyStateUpdate() for 64 // details on how the state aggregation is performed. 65 // 66 // While this field is written to by the LB policy, it is read by the picker 67 // at Pick time. Making this an atomic to enable the picker to read this value 68 // without a mutex. 69 state unsafe.Pointer // *balancer.State 70 } 71 72 // newChildPolicyWrapper creates a child policy wrapper for the given target, 73 // and is initialized with one reference and starts off in CONNECTING state. 74 func newChildPolicyWrapper(target string) *childPolicyWrapper { 75 c := &childPolicyWrapper{ 76 target: target, 77 refCnt: 1, 78 state: unsafe.Pointer(&balancer.State{ 79 ConnectivityState: connectivity.Connecting, 80 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), 81 }), 82 } 83 c.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-child-policy-wrapper %s %p] ", c.target, c)) 84 c.logger.Infof("Created") 85 return c 86 } 87 88 // acquireRef increments the reference count on the child policy wrapper. 89 func (c *childPolicyWrapper) acquireRef() { 90 c.refCnt++ 91 } 92 93 // releaseRef decrements the reference count on the child policy wrapper. The 94 // return value indicates whether the released reference was the last one. 95 func (c *childPolicyWrapper) releaseRef() bool { 96 c.refCnt-- 97 return c.refCnt == 0 98 } 99 100 // lamify causes the child policy wrapper to return a picker which will always 101 // fail requests. This is used when the wrapper runs into errors when trying to 102 // build and parse the child policy configuration. 103 func (c *childPolicyWrapper) lamify(err error) { 104 c.logger.Warningf("Entering lame mode: %v", err) 105 atomic.StorePointer(&c.state, unsafe.Pointer(&balancer.State{ 106 ConnectivityState: connectivity.TransientFailure, 107 Picker: base.NewErrPicker(err), 108 })) 109 } 110