1
2
3
4
5
6
7 package operation
8
9 import (
10 "context"
11 "errors"
12 "os"
13 "runtime"
14 "strconv"
15 "strings"
16
17 "go.mongodb.org/mongo-driver/bson"
18 "go.mongodb.org/mongo-driver/internal/bsonutil"
19 "go.mongodb.org/mongo-driver/internal/driverutil"
20 "go.mongodb.org/mongo-driver/internal/handshake"
21 "go.mongodb.org/mongo-driver/mongo/address"
22 "go.mongodb.org/mongo-driver/mongo/description"
23 "go.mongodb.org/mongo-driver/version"
24 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
25 "go.mongodb.org/mongo-driver/x/mongo/driver"
26 "go.mongodb.org/mongo-driver/x/mongo/driver/session"
27 )
28
29
30
31
32
33 const maxClientMetadataSize = 512
34
35 const driverName = "mongo-go-driver"
36
37
38 type Hello struct {
39 appname string
40 compressors []string
41 saslSupportedMechs string
42 d driver.Deployment
43 clock *session.ClusterClock
44 speculativeAuth bsoncore.Document
45 topologyVersion *description.TopologyVersion
46 maxAwaitTimeMS *int64
47 serverAPI *driver.ServerAPIOptions
48 loadBalanced bool
49
50 res bsoncore.Document
51 }
52
53 var _ driver.Handshaker = (*Hello)(nil)
54
55
56 func NewHello() *Hello { return &Hello{} }
57
58
59 func (h *Hello) AppName(appname string) *Hello {
60 h.appname = appname
61 return h
62 }
63
64
65 func (h *Hello) ClusterClock(clock *session.ClusterClock) *Hello {
66 if h == nil {
67 h = new(Hello)
68 }
69
70 h.clock = clock
71 return h
72 }
73
74
75 func (h *Hello) Compressors(compressors []string) *Hello {
76 h.compressors = compressors
77 return h
78 }
79
80
81
82 func (h *Hello) SASLSupportedMechs(username string) *Hello {
83 h.saslSupportedMechs = username
84 return h
85 }
86
87
88 func (h *Hello) Deployment(d driver.Deployment) *Hello {
89 h.d = d
90 return h
91 }
92
93
94 func (h *Hello) SpeculativeAuthenticate(doc bsoncore.Document) *Hello {
95 h.speculativeAuth = doc
96 return h
97 }
98
99
100 func (h *Hello) TopologyVersion(tv *description.TopologyVersion) *Hello {
101 h.topologyVersion = tv
102 return h
103 }
104
105
106 func (h *Hello) MaxAwaitTimeMS(awaitTime int64) *Hello {
107 h.maxAwaitTimeMS = &awaitTime
108 return h
109 }
110
111
112 func (h *Hello) ServerAPI(serverAPI *driver.ServerAPIOptions) *Hello {
113 h.serverAPI = serverAPI
114 return h
115 }
116
117
118 func (h *Hello) LoadBalanced(lb bool) *Hello {
119 h.loadBalanced = lb
120 return h
121 }
122
123
124 func (h *Hello) Result(addr address.Address) description.Server {
125 return description.NewServer(addr, bson.Raw(h.res))
126 }
127
128 const dockerEnvPath = "/.dockerenv"
129
130 const (
131
132 runtimeNameDocker = "docker"
133
134
135 orchestratorNameK8s = "kubernetes"
136 )
137
138
139
140
141
142
143
144 func getFaasEnvName() string {
145 envVars := []string{
146 driverutil.EnvVarAWSExecutionEnv,
147 driverutil.EnvVarAWSLambdaRuntimeAPI,
148 driverutil.EnvVarFunctionsWorkerRuntime,
149 driverutil.EnvVarKService,
150 driverutil.EnvVarFunctionName,
151 driverutil.EnvVarVercel,
152 }
153
154
155
156 names := make(map[string]struct{})
157
158 for _, envVar := range envVars {
159 val := os.Getenv(envVar)
160 if val == "" {
161 continue
162 }
163
164 var name string
165
166 switch envVar {
167 case driverutil.EnvVarAWSExecutionEnv:
168 if !strings.HasPrefix(val, driverutil.AwsLambdaPrefix) {
169 continue
170 }
171
172 name = driverutil.EnvNameAWSLambda
173 case driverutil.EnvVarAWSLambdaRuntimeAPI:
174 name = driverutil.EnvNameAWSLambda
175 case driverutil.EnvVarFunctionsWorkerRuntime:
176 name = driverutil.EnvNameAzureFunc
177 case driverutil.EnvVarKService, driverutil.EnvVarFunctionName:
178 name = driverutil.EnvNameGCPFunc
179 case driverutil.EnvVarVercel:
180
181 delete(names, driverutil.EnvNameAWSLambda)
182
183 name = driverutil.EnvNameVercel
184 }
185
186 names[name] = struct{}{}
187 if len(names) > 1 {
188
189
190 names = nil
191
192 break
193 }
194 }
195
196 for name := range names {
197 return name
198 }
199
200 return ""
201 }
202
203 type containerInfo struct {
204 runtime string
205 orchestrator string
206 }
207
208
209
210
211 func getContainerEnvInfo() *containerInfo {
212 var runtime, orchestrator string
213 if _, err := os.Stat(dockerEnvPath); !os.IsNotExist(err) {
214 runtime = runtimeNameDocker
215 }
216 if v := os.Getenv(driverutil.EnvVarK8s); v != "" {
217 orchestrator = orchestratorNameK8s
218 }
219 if runtime != "" || orchestrator != "" {
220 return &containerInfo{
221 runtime: runtime,
222 orchestrator: orchestrator,
223 }
224 }
225 return nil
226 }
227
228
229
230
231 func appendClientAppName(dst []byte, name string) ([]byte, error) {
232 if name == "" {
233 return dst, nil
234 }
235
236 var idx int32
237 idx, dst = bsoncore.AppendDocumentElementStart(dst, "application")
238
239 dst = bsoncore.AppendStringElement(dst, "name", name)
240
241 return bsoncore.AppendDocumentEnd(dst, idx)
242 }
243
244
245
246
247 func appendClientDriver(dst []byte) ([]byte, error) {
248 var idx int32
249 idx, dst = bsoncore.AppendDocumentElementStart(dst, "driver")
250
251 dst = bsoncore.AppendStringElement(dst, "name", driverName)
252 dst = bsoncore.AppendStringElement(dst, "version", version.Driver)
253
254 return bsoncore.AppendDocumentEnd(dst, idx)
255 }
256
257
258
259
260 func appendClientEnv(dst []byte, omitNonName, omitDoc bool) ([]byte, error) {
261 if omitDoc {
262 return dst, nil
263 }
264
265 name := getFaasEnvName()
266 container := getContainerEnvInfo()
267
268
269 if name == "" && container == nil {
270 return dst, nil
271 }
272
273 var idx int32
274
275 idx, dst = bsoncore.AppendDocumentElementStart(dst, "env")
276
277 if name != "" {
278 dst = bsoncore.AppendStringElement(dst, "name", name)
279 }
280
281 addMem := func(envVar string) []byte {
282 mem := os.Getenv(envVar)
283 if mem == "" {
284 return dst
285 }
286
287 memInt64, err := strconv.ParseInt(mem, 10, 32)
288 if err != nil {
289 return dst
290 }
291
292 memInt32 := int32(memInt64)
293
294 return bsoncore.AppendInt32Element(dst, "memory_mb", memInt32)
295 }
296
297 addRegion := func(envVar string) []byte {
298 region := os.Getenv(envVar)
299 if region == "" {
300 return dst
301 }
302
303 return bsoncore.AppendStringElement(dst, "region", region)
304 }
305
306 addTimeout := func(envVar string) []byte {
307 timeout := os.Getenv(envVar)
308 if timeout == "" {
309 return dst
310 }
311
312 timeoutInt64, err := strconv.ParseInt(timeout, 10, 32)
313 if err != nil {
314 return dst
315 }
316
317 timeoutInt32 := int32(timeoutInt64)
318 return bsoncore.AppendInt32Element(dst, "timeout_sec", timeoutInt32)
319 }
320
321 if !omitNonName {
322
323 switch name {
324 case driverutil.EnvNameAWSLambda:
325 dst = addMem(driverutil.EnvVarAWSLambdaFunctionMemorySize)
326 dst = addRegion(driverutil.EnvVarAWSRegion)
327 case driverutil.EnvNameGCPFunc:
328 dst = addMem(driverutil.EnvVarFunctionMemoryMB)
329 dst = addRegion(driverutil.EnvVarFunctionRegion)
330 dst = addTimeout(driverutil.EnvVarFunctionTimeoutSec)
331 case driverutil.EnvNameVercel:
332 dst = addRegion(driverutil.EnvVarVercelRegion)
333 }
334 }
335
336 if container != nil {
337 var idxCntnr int32
338 idxCntnr, dst = bsoncore.AppendDocumentElementStart(dst, "container")
339 if container.runtime != "" {
340 dst = bsoncore.AppendStringElement(dst, "runtime", container.runtime)
341 }
342 if container.orchestrator != "" {
343 dst = bsoncore.AppendStringElement(dst, "orchestrator", container.orchestrator)
344 }
345 var err error
346 dst, err = bsoncore.AppendDocumentEnd(dst, idxCntnr)
347 if err != nil {
348 return dst, err
349 }
350 }
351
352 return bsoncore.AppendDocumentEnd(dst, idx)
353 }
354
355
356
357
358 func appendClientOS(dst []byte, omitNonType bool) ([]byte, error) {
359 var idx int32
360
361 idx, dst = bsoncore.AppendDocumentElementStart(dst, "os")
362
363 dst = bsoncore.AppendStringElement(dst, "type", runtime.GOOS)
364 if !omitNonType {
365 dst = bsoncore.AppendStringElement(dst, "architecture", runtime.GOARCH)
366 }
367
368 return bsoncore.AppendDocumentEnd(dst, idx)
369 }
370
371
372
373
374 func appendClientPlatform(dst []byte) []byte {
375 return bsoncore.AppendStringElement(dst, "platform", runtime.Version())
376 }
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412 func encodeClientMetadata(appname string, maxLen int) ([]byte, error) {
413 dst := make([]byte, 0, maxLen)
414
415 omitEnvDoc := false
416 omitEnvNonName := false
417 omitOSNonType := false
418 omitEnvDocument := false
419 truncatePlatform := false
420
421 retry:
422 var idx int32
423 idx, dst = bsoncore.AppendDocumentStart(dst)
424
425 var err error
426 dst, err = appendClientAppName(dst, appname)
427 if err != nil {
428 return nil, err
429 }
430
431 dst, err = appendClientDriver(dst)
432 if err != nil {
433 return nil, err
434 }
435
436 dst, err = appendClientOS(dst, omitOSNonType)
437 if err != nil {
438 return nil, err
439 }
440
441 if !truncatePlatform {
442 dst = appendClientPlatform(dst)
443 }
444
445 if !omitEnvDocument {
446 dst, err = appendClientEnv(dst, omitEnvNonName, omitEnvDoc)
447 if err != nil {
448 return nil, err
449 }
450 }
451
452 dst, err = bsoncore.AppendDocumentEnd(dst, idx)
453 if err != nil {
454 return nil, err
455 }
456
457 if len(dst) > maxLen {
458
459
460
461
462
463
464
465 dst = dst[:0]
466
467 if !omitEnvNonName {
468 omitEnvNonName = true
469
470 goto retry
471 }
472
473 if !omitOSNonType {
474 omitOSNonType = true
475
476 goto retry
477 }
478
479 if !omitEnvDoc {
480 omitEnvDoc = true
481
482 goto retry
483 }
484
485 if !truncatePlatform {
486 truncatePlatform = true
487
488 goto retry
489 }
490
491
492
493 return nil, nil
494 }
495
496 return dst, nil
497 }
498
499
500 func (h *Hello) handshakeCommand(dst []byte, desc description.SelectedServer) ([]byte, error) {
501 dst, err := h.command(dst, desc)
502 if err != nil {
503 return dst, err
504 }
505
506 if h.saslSupportedMechs != "" {
507 dst = bsoncore.AppendStringElement(dst, "saslSupportedMechs", h.saslSupportedMechs)
508 }
509 if h.speculativeAuth != nil {
510 dst = bsoncore.AppendDocumentElement(dst, "speculativeAuthenticate", h.speculativeAuth)
511 }
512 var idx int32
513 idx, dst = bsoncore.AppendArrayElementStart(dst, "compression")
514 for i, compressor := range h.compressors {
515 dst = bsoncore.AppendStringElement(dst, strconv.Itoa(i), compressor)
516 }
517 dst, _ = bsoncore.AppendArrayEnd(dst, idx)
518
519 clientMetadata, _ := encodeClientMetadata(h.appname, maxClientMetadataSize)
520
521
522 if len(clientMetadata) > 0 {
523 dst = bsoncore.AppendDocumentElement(dst, "client", clientMetadata)
524 }
525
526 return dst, nil
527 }
528
529
530 func (h *Hello) command(dst []byte, desc description.SelectedServer) ([]byte, error) {
531
532
533 if h.loadBalanced || h.serverAPI != nil || desc.Server.HelloOK {
534 dst = bsoncore.AppendInt32Element(dst, "hello", 1)
535 } else {
536 dst = bsoncore.AppendInt32Element(dst, handshake.LegacyHello, 1)
537 }
538 dst = bsoncore.AppendBooleanElement(dst, "helloOk", true)
539
540 if tv := h.topologyVersion; tv != nil {
541 var tvIdx int32
542
543 tvIdx, dst = bsoncore.AppendDocumentElementStart(dst, "topologyVersion")
544 dst = bsoncore.AppendObjectIDElement(dst, "processId", tv.ProcessID)
545 dst = bsoncore.AppendInt64Element(dst, "counter", tv.Counter)
546 dst, _ = bsoncore.AppendDocumentEnd(dst, tvIdx)
547 }
548 if h.maxAwaitTimeMS != nil {
549 dst = bsoncore.AppendInt64Element(dst, "maxAwaitTimeMS", *h.maxAwaitTimeMS)
550 }
551 if h.loadBalanced {
552
553
554 dst = bsoncore.AppendBooleanElement(dst, "loadBalanced", true)
555 }
556
557 return dst, nil
558 }
559
560
561 func (h *Hello) Execute(ctx context.Context) error {
562 if h.d == nil {
563 return errors.New("a Hello must have a Deployment set before Execute can be called")
564 }
565
566 return h.createOperation().Execute(ctx)
567 }
568
569
570 func (h *Hello) StreamResponse(ctx context.Context, conn driver.StreamerConnection) error {
571 return h.createOperation().ExecuteExhaust(ctx, conn)
572 }
573
574
575
576
577
578 func isLegacyHandshake(srvAPI *driver.ServerAPIOptions, loadbalanced bool) bool {
579 return srvAPI == nil && !loadbalanced
580 }
581
582 func (h *Hello) createOperation() driver.Operation {
583 op := driver.Operation{
584 Clock: h.clock,
585 CommandFn: h.command,
586 Database: "admin",
587 Deployment: h.d,
588 ProcessResponseFn: func(info driver.ResponseInfo) error {
589 h.res = info.ServerResponse
590 return nil
591 },
592 ServerAPI: h.serverAPI,
593 }
594
595 if isLegacyHandshake(h.serverAPI, h.loadBalanced) {
596 op.Legacy = driver.LegacyHandshake
597 }
598
599 return op
600 }
601
602
603
604 func (h *Hello) GetHandshakeInformation(ctx context.Context, _ address.Address, c driver.Connection) (driver.HandshakeInformation, error) {
605 deployment := driver.SingleConnectionDeployment{C: c}
606
607 op := driver.Operation{
608 Clock: h.clock,
609 CommandFn: h.handshakeCommand,
610 Deployment: deployment,
611 Database: "admin",
612 ProcessResponseFn: func(info driver.ResponseInfo) error {
613 h.res = info.ServerResponse
614 return nil
615 },
616 ServerAPI: h.serverAPI,
617 }
618
619 if isLegacyHandshake(h.serverAPI, h.loadBalanced) {
620 op.Legacy = driver.LegacyHandshake
621 }
622
623 if err := op.Execute(ctx); err != nil {
624 return driver.HandshakeInformation{}, err
625 }
626
627 info := driver.HandshakeInformation{
628 Description: h.Result(c.Address()),
629 }
630 if speculativeAuthenticate, ok := h.res.Lookup("speculativeAuthenticate").DocumentOK(); ok {
631 info.SpeculativeAuthenticate = speculativeAuthenticate
632 }
633 if serverConnectionID, ok := h.res.Lookup("connectionId").AsInt64OK(); ok {
634 info.ServerConnectionID = &serverConnectionID
635 }
636
637 var err error
638
639
640
641 if saslSupportedMechs, lookupErr := bson.Raw(h.res).LookupErr("saslSupportedMechs"); lookupErr == nil {
642 info.SaslSupportedMechs, err = bsonutil.StringSliceFromRawValue("saslSupportedMechs", saslSupportedMechs)
643 }
644 return info, err
645 }
646
647
648
649 func (h *Hello) FinishHandshake(context.Context, driver.Connection) error {
650 return nil
651 }
652
View as plain text