1
18
19 package rls
20
21 import (
22 "context"
23 "fmt"
24 "time"
25
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/balancer/rls/internal/adaptive"
29 "google.golang.org/grpc/connectivity"
30 "google.golang.org/grpc/credentials/insecure"
31 "google.golang.org/grpc/internal"
32 internalgrpclog "google.golang.org/grpc/internal/grpclog"
33 "google.golang.org/grpc/internal/pretty"
34 rlsgrpc "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
35 rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
36 )
37
38 var newAdaptiveThrottler = func() adaptiveThrottler { return adaptive.New() }
39
40 type adaptiveThrottler interface {
41 ShouldThrottle() bool
42 RegisterBackendResponse(throttled bool)
43 }
44
45
46
47 type controlChannel struct {
48
49
50 rpcTimeout time.Duration
51
52
53 backToReadyFunc func()
54
55
56 throttler adaptiveThrottler
57
58 cc *grpc.ClientConn
59 client rlsgrpc.RouteLookupServiceClient
60 logger *internalgrpclog.PrefixLogger
61 }
62
63
64
65
66 func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) {
67 ctrlCh := &controlChannel{
68 rpcTimeout: rpcTimeout,
69 backToReadyFunc: backToReadyFunc,
70 throttler: newAdaptiveThrottler(),
71 }
72 ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh))
73
74 dopts, err := ctrlCh.dialOpts(bOpts, serviceConfig)
75 if err != nil {
76 return nil, err
77 }
78 ctrlCh.cc, err = grpc.Dial(rlsServerName, dopts...)
79 if err != nil {
80 return nil, err
81 }
82 ctrlCh.client = rlsgrpc.NewRouteLookupServiceClient(ctrlCh.cc)
83 ctrlCh.logger.Infof("Control channel created to RLS server at: %v", rlsServerName)
84
85 go ctrlCh.monitorConnectivityState()
86 return ctrlCh, nil
87 }
88
89
90 func (cc *controlChannel) dialOpts(bOpts balancer.BuildOptions, serviceConfig string) ([]grpc.DialOption, error) {
91
92
93
94
95
96 dopts := []grpc.DialOption{grpc.WithAuthority(bOpts.Authority)}
97 if bOpts.Dialer != nil {
98 dopts = append(dopts, grpc.WithContextDialer(bOpts.Dialer))
99 }
100
101
102
103 var credsOpt grpc.DialOption
104 switch {
105 case bOpts.DialCreds != nil:
106 credsOpt = grpc.WithTransportCredentials(bOpts.DialCreds.Clone())
107 case bOpts.CredsBundle != nil:
108
109
110
111
112 bundle, err := bOpts.CredsBundle.NewWithMode(internal.CredsBundleModeFallback)
113 if err != nil {
114 return nil, err
115 }
116 credsOpt = grpc.WithCredentialsBundle(bundle)
117 default:
118 cc.logger.Warningf("no credentials available, using Insecure")
119 credsOpt = grpc.WithTransportCredentials(insecure.NewCredentials())
120 }
121 dopts = append(dopts, credsOpt)
122
123
124
125
126 if serviceConfig != "" {
127 cc.logger.Infof("Disabling service config from the name resolver and instead using: %s", serviceConfig)
128 dopts = append(dopts, grpc.WithDisableServiceConfig(), grpc.WithDefaultServiceConfig(serviceConfig))
129 }
130
131 return dopts, nil
132 }
133
134 func (cc *controlChannel) monitorConnectivityState() {
135 cc.logger.Infof("Starting connectivity state monitoring goroutine")
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159 ctx := context.Background()
160
161 first := true
162 for {
163
164 for s := cc.cc.GetState(); s != connectivity.Ready; s = cc.cc.GetState() {
165 if s == connectivity.Shutdown {
166 return
167 }
168 cc.cc.WaitForStateChange(ctx, s)
169 }
170 cc.logger.Infof("Connectivity state is READY")
171
172 if !first {
173 cc.logger.Infof("Control channel back to READY")
174 cc.backToReadyFunc()
175 }
176 first = false
177
178
179 cc.cc.WaitForStateChange(ctx, connectivity.Ready)
180 if cc.cc.GetState() == connectivity.Shutdown {
181 return
182 }
183 cc.logger.Infof("Connectivity state is %s", cc.cc.GetState())
184 }
185 }
186
187 func (cc *controlChannel) close() {
188 cc.logger.Infof("Closing control channel")
189 cc.cc.Close()
190 }
191
192 type lookupCallback func(targets []string, headerData string, err error)
193
194
195
196
197
198
199
200 func (cc *controlChannel) lookup(reqKeys map[string]string, reason rlspb.RouteLookupRequest_Reason, staleHeaders string, cb lookupCallback) (throttled bool) {
201 if cc.throttler.ShouldThrottle() {
202 cc.logger.Infof("RLS request throttled by client-side adaptive throttling")
203 return true
204 }
205 go func() {
206 req := &rlspb.RouteLookupRequest{
207 TargetType: "grpc",
208 KeyMap: reqKeys,
209 Reason: reason,
210 StaleHeaderData: staleHeaders,
211 }
212 cc.logger.Infof("Sending RLS request %+v", pretty.ToJSON(req))
213
214 ctx, cancel := context.WithTimeout(context.Background(), cc.rpcTimeout)
215 defer cancel()
216 resp, err := cc.client.RouteLookup(ctx, req)
217 cb(resp.GetTargets(), resp.GetHeaderData(), err)
218 }()
219 return false
220 }
221
View as plain text