1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package grpctransport
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "net/http"
22
23 "cloud.google.com/go/auth"
24 "cloud.google.com/go/auth/credentials"
25 "cloud.google.com/go/auth/internal"
26 "cloud.google.com/go/auth/internal/transport"
27 "go.opencensus.io/plugin/ocgrpc"
28 "google.golang.org/grpc"
29 grpccreds "google.golang.org/grpc/credentials"
30 grpcinsecure "google.golang.org/grpc/credentials/insecure"
31 )
32
33 const (
34
35 disableDirectPathEnvVar = "GOOGLE_CLOUD_DISABLE_DIRECT_PATH"
36
37
38 enableDirectPathXdsEnvVar = "GOOGLE_CLOUD_ENABLE_DIRECT_PATH_XDS"
39
40 quotaProjectHeaderKey = "X-Goog-User-Project"
41 )
42
43 var (
44
45 timeoutDialerOption grpc.DialOption
46 )
47
48
49 type Options struct {
50
51
52
53 DisableTelemetry bool
54
55
56
57 DisableAuthentication bool
58
59 Endpoint string
60
61
62 Metadata map[string]string
63
64
65 GRPCDialOpts []grpc.DialOption
66
67
68 PoolSize int
69
70
71 Credentials *auth.Credentials
72
73
74 DetectOpts *credentials.DetectOptions
75
76
77
78
79 UniverseDomain string
80
81
82
83 InternalOptions *InternalOptions
84 }
85
86
87
88 func (o *Options) client() *http.Client {
89 if o.DetectOpts != nil && o.DetectOpts.Client != nil {
90 return o.DetectOpts.Client
91 }
92 return nil
93 }
94
95 func (o *Options) validate() error {
96 if o == nil {
97 return errors.New("grpctransport: opts required to be non-nil")
98 }
99 if o.InternalOptions != nil && o.InternalOptions.SkipValidation {
100 return nil
101 }
102 hasCreds := o.Credentials != nil ||
103 (o.DetectOpts != nil && len(o.DetectOpts.CredentialsJSON) > 0) ||
104 (o.DetectOpts != nil && o.DetectOpts.CredentialsFile != "")
105 if o.DisableAuthentication && hasCreds {
106 return errors.New("grpctransport: DisableAuthentication is incompatible with options that set or detect credentials")
107 }
108 return nil
109 }
110
111 func (o *Options) resolveDetectOptions() *credentials.DetectOptions {
112 io := o.InternalOptions
113
114 do := transport.CloneDetectOptions(o.DetectOpts)
115
116
117 if (io != nil && io.EnableJWTWithScope) || do.Audience != "" {
118 do.UseSelfSignedJWT = true
119 }
120
121 if len(do.Scopes) == 0 && do.Audience == "" && io != nil && len(io.DefaultScopes) > 0 {
122 do.Scopes = make([]string, len(io.DefaultScopes))
123 copy(do.Scopes, io.DefaultScopes)
124 }
125 if len(do.Scopes) == 0 && do.Audience == "" && io != nil {
126 do.Audience = o.InternalOptions.DefaultAudience
127 }
128 return do
129 }
130
131
132
133
134
135 type InternalOptions struct {
136
137
138 EnableNonDefaultSAForDirectPath bool
139
140 EnableDirectPath bool
141
142
143 EnableDirectPathXds bool
144
145 EnableJWTWithScope bool
146
147
148 DefaultAudience string
149
150
151 DefaultEndpointTemplate string
152
153 DefaultMTLSEndpoint string
154
155
156 DefaultScopes []string
157
158
159 SkipValidation bool
160 }
161
162
163
164
165 func Dial(ctx context.Context, secure bool, opts *Options) (GRPCClientConnPool, error) {
166 if err := opts.validate(); err != nil {
167 return nil, err
168 }
169 if opts.PoolSize <= 1 {
170 conn, err := dial(ctx, secure, opts)
171 if err != nil {
172 return nil, err
173 }
174 return &singleConnPool{conn}, nil
175 }
176 pool := &roundRobinConnPool{}
177 for i := 0; i < opts.PoolSize; i++ {
178 conn, err := dial(ctx, secure, opts)
179 if err != nil {
180
181 defer pool.Close()
182 return nil, err
183 }
184 pool.conns = append(pool.conns, conn)
185 }
186 return pool, nil
187 }
188
189
190 func dial(ctx context.Context, secure bool, opts *Options) (*grpc.ClientConn, error) {
191 tOpts := &transport.Options{
192 Endpoint: opts.Endpoint,
193 Client: opts.client(),
194 UniverseDomain: opts.UniverseDomain,
195 }
196 if io := opts.InternalOptions; io != nil {
197 tOpts.DefaultEndpointTemplate = io.DefaultEndpointTemplate
198 tOpts.DefaultMTLSEndpoint = io.DefaultMTLSEndpoint
199 tOpts.EnableDirectPath = io.EnableDirectPath
200 tOpts.EnableDirectPathXds = io.EnableDirectPathXds
201 }
202 transportCreds, endpoint, err := transport.GetGRPCTransportCredsAndEndpoint(tOpts)
203 if err != nil {
204 return nil, err
205 }
206
207 if !secure {
208 transportCreds = grpcinsecure.NewCredentials()
209 }
210
211
212 grpcOpts := []grpc.DialOption{
213 grpc.WithTransportCredentials(transportCreds),
214 }
215
216
217 if !opts.DisableAuthentication {
218 metadata := opts.Metadata
219
220 var creds *auth.Credentials
221 if opts.Credentials != nil {
222 creds = opts.Credentials
223 } else {
224 var err error
225 creds, err = credentials.DetectDefault(opts.resolveDetectOptions())
226 if err != nil {
227 return nil, err
228 }
229 }
230
231 qp, err := creds.QuotaProjectID(ctx)
232 if err != nil {
233 return nil, err
234 }
235 if qp != "" {
236 if metadata == nil {
237 metadata = make(map[string]string, 1)
238 }
239 metadata[quotaProjectHeaderKey] = qp
240 }
241 grpcOpts = append(grpcOpts,
242 grpc.WithPerRPCCredentials(&grpcCredentialsProvider{
243 creds: creds,
244 metadata: metadata,
245 clientUniverseDomain: opts.UniverseDomain,
246 }),
247 )
248
249
250 grpcOpts, endpoint = configureDirectPath(grpcOpts, opts, endpoint, creds)
251 }
252
253
254
255
256 grpcOpts = addOCStatsHandler(grpcOpts, opts)
257 grpcOpts = append(grpcOpts, opts.GRPCDialOpts...)
258
259 return grpc.DialContext(ctx, endpoint, grpcOpts...)
260 }
261
262
263 type grpcCredentialsProvider struct {
264 creds *auth.Credentials
265
266 secure bool
267
268
269 metadata map[string]string
270 clientUniverseDomain string
271 }
272
273
274
275
276
277 func (c *grpcCredentialsProvider) getClientUniverseDomain() string {
278 if c.clientUniverseDomain == "" {
279 return internal.DefaultUniverseDomain
280 }
281 return c.clientUniverseDomain
282 }
283
284 func (c *grpcCredentialsProvider) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
285 credentialsUniverseDomain, err := c.creds.UniverseDomain(ctx)
286 if err != nil {
287 return nil, err
288 }
289 if err := transport.ValidateUniverseDomain(c.getClientUniverseDomain(), credentialsUniverseDomain); err != nil {
290 return nil, err
291 }
292 token, err := c.creds.Token(ctx)
293 if err != nil {
294 return nil, err
295 }
296 if c.secure {
297 ri, _ := grpccreds.RequestInfoFromContext(ctx)
298 if err = grpccreds.CheckSecurityLevel(ri.AuthInfo, grpccreds.PrivacyAndIntegrity); err != nil {
299 return nil, fmt.Errorf("unable to transfer credentials PerRPCCredentials: %v", err)
300 }
301 }
302 metadata := make(map[string]string, len(c.metadata)+1)
303 setAuthMetadata(token, metadata)
304 for k, v := range c.metadata {
305 metadata[k] = v
306 }
307 return metadata, nil
308 }
309
310
311
312 func setAuthMetadata(token *auth.Token, m map[string]string) {
313 typ := token.Type
314 if typ == "" {
315 typ = internal.TokenTypeBearer
316 }
317 m["authorization"] = typ + " " + token.Value
318 }
319
320 func (c *grpcCredentialsProvider) RequireTransportSecurity() bool {
321 return c.secure
322 }
323
324 func addOCStatsHandler(dialOpts []grpc.DialOption, opts *Options) []grpc.DialOption {
325 if opts.DisableTelemetry {
326 return dialOpts
327 }
328 return append(dialOpts, grpc.WithStatsHandler(&ocgrpc.ClientHandler{}))
329 }
330
View as plain text