1
18
19 package grpc
20
21 import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net/url"
27 "strings"
28 "sync"
29 "sync/atomic"
30 "time"
31
32 "google.golang.org/grpc/balancer"
33 "google.golang.org/grpc/balancer/base"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/connectivity"
36 "google.golang.org/grpc/internal"
37 "google.golang.org/grpc/internal/channelz"
38 "google.golang.org/grpc/internal/grpcsync"
39 "google.golang.org/grpc/internal/idle"
40 iresolver "google.golang.org/grpc/internal/resolver"
41 "google.golang.org/grpc/internal/transport"
42 "google.golang.org/grpc/keepalive"
43 "google.golang.org/grpc/resolver"
44 "google.golang.org/grpc/serviceconfig"
45 "google.golang.org/grpc/status"
46
47 _ "google.golang.org/grpc/balancer/roundrobin"
48 _ "google.golang.org/grpc/internal/resolver/passthrough"
49 _ "google.golang.org/grpc/internal/resolver/unix"
50 _ "google.golang.org/grpc/resolver/dns"
51 )
52
53 const (
54
55 minConnectTimeout = 20 * time.Second
56 )
57
58 var (
59
60
61
62
63
64 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
65
66 errConnDrain = errors.New("grpc: the connection is drained")
67
68 errConnClosing = errors.New("grpc: the connection is closing")
69
70
71 errConnIdling = errors.New("grpc: the connection is closing due to channel idleness")
72
73
74 invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"
75 )
76
77
78 var (
79
80
81
82 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithTransportCredentials(insecure.NewCredentials()) explicitly or set credentials)")
83
84
85 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
86
87
88 errNoTransportCredsInBundle = errors.New("grpc: credentials.Bundle must return non-nil transport credentials")
89
90
91
92 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
93 )
94
95 const (
96 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
97 defaultClientMaxSendMessageSize = math.MaxInt32
98
99 defaultWriteBufSize = 32 * 1024
100 defaultReadBufSize = 32 * 1024
101 )
102
103 type defaultConfigSelector struct {
104 sc *ServiceConfig
105 }
106
107 func (dcs *defaultConfigSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RPCConfig, error) {
108 return &iresolver.RPCConfig{
109 Context: rpcInfo.Context,
110 MethodConfig: getMethodConfig(dcs.sc, rpcInfo.Method),
111 }, nil
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125
126 func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
127 cc := &ClientConn{
128 target: target,
129 conns: make(map[*addrConn]struct{}),
130 dopts: defaultDialOptions(),
131 }
132
133 cc.retryThrottler.Store((*retryThrottler)(nil))
134 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
135 cc.ctx, cc.cancel = context.WithCancel(context.Background())
136
137
138 disableGlobalOpts := false
139 for _, opt := range opts {
140 if _, ok := opt.(*disableGlobalDialOptions); ok {
141 disableGlobalOpts = true
142 break
143 }
144 }
145
146 if !disableGlobalOpts {
147 for _, opt := range globalDialOptions {
148 opt.apply(&cc.dopts)
149 }
150 }
151
152 for _, opt := range opts {
153 opt.apply(&cc.dopts)
154 }
155 chainUnaryClientInterceptors(cc)
156 chainStreamClientInterceptors(cc)
157
158 if err := cc.validateTransportCredentials(); err != nil {
159 return nil, err
160 }
161
162 if cc.dopts.defaultServiceConfigRawJSON != nil {
163 scpr := parseServiceConfig(*cc.dopts.defaultServiceConfigRawJSON)
164 if scpr.Err != nil {
165 return nil, fmt.Errorf("%s: %v", invalidDefaultServiceConfigErrPrefix, scpr.Err)
166 }
167 cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
168 }
169 cc.mkp = cc.dopts.copts.KeepaliveParams
170
171
172 cc.channelzRegistration(target)
173
174
175
176
177
178
179
180
181 if err := cc.parseTargetAndFindResolver(); err != nil {
182 channelz.RemoveEntry(cc.channelz.ID)
183 return nil, err
184 }
185 if err = cc.determineAuthority(); err != nil {
186 channelz.RemoveEntry(cc.channelz.ID)
187 return nil, err
188 }
189
190 cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
191 cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
192
193 cc.initIdleStateLocked()
194 cc.idlenessMgr = idle.NewManager((*idler)(cc), cc.dopts.idleTimeout)
195 return cc, nil
196 }
197
198
199
200
201 func Dial(target string, opts ...DialOption) (*ClientConn, error) {
202 return DialContext(context.Background(), target, opts...)
203 }
204
205
206
207
208
209
210
211
212
213
214
215
216 func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
217
218
219 opts = append([]DialOption{withDefaultScheme("passthrough")}, opts...)
220 cc, err := NewClient(target, opts...)
221 if err != nil {
222 return nil, err
223 }
224
225
226
227
228 defer func() {
229 if err != nil {
230 cc.Close()
231 }
232 }()
233
234
235 if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
236 return nil, err
237 }
238
239
240 if !cc.dopts.block {
241 return cc, nil
242 }
243
244 if cc.dopts.timeout > 0 {
245 var cancel context.CancelFunc
246 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
247 defer cancel()
248 }
249 defer func() {
250 select {
251 case <-ctx.Done():
252 switch {
253 case ctx.Err() == err:
254 conn = nil
255 case err == nil || !cc.dopts.returnLastError:
256 conn, err = nil, ctx.Err()
257 default:
258 conn, err = nil, fmt.Errorf("%v: %v", ctx.Err(), err)
259 }
260 default:
261 }
262 }()
263
264
265 for {
266 s := cc.GetState()
267 if s == connectivity.Idle {
268 cc.Connect()
269 }
270 if s == connectivity.Ready {
271 return cc, nil
272 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
273 if err = cc.connectionError(); err != nil {
274 terr, ok := err.(interface {
275 Temporary() bool
276 })
277 if ok && !terr.Temporary() {
278 return nil, err
279 }
280 }
281 }
282 if !cc.WaitForStateChange(ctx, s) {
283
284 if err = cc.connectionError(); err != nil && cc.dopts.returnLastError {
285 return nil, err
286 }
287 return nil, ctx.Err()
288 }
289 }
290 }
291
292
293
294 func (cc *ClientConn) addTraceEvent(msg string) {
295 ted := &channelz.TraceEvent{
296 Desc: fmt.Sprintf("Channel %s", msg),
297 Severity: channelz.CtInfo,
298 }
299 if cc.dopts.channelzParent != nil {
300 ted.Parent = &channelz.TraceEvent{
301 Desc: fmt.Sprintf("Nested channel(id:%d) %s", cc.channelz.ID, msg),
302 Severity: channelz.CtInfo,
303 }
304 }
305 channelz.AddTraceEvent(logger, cc.channelz, 0, ted)
306 }
307
308 type idler ClientConn
309
310 func (i *idler) EnterIdleMode() {
311 (*ClientConn)(i).enterIdleMode()
312 }
313
314 func (i *idler) ExitIdleMode() error {
315 return (*ClientConn)(i).exitIdleMode()
316 }
317
318
319
320
321 func (cc *ClientConn) exitIdleMode() (err error) {
322 cc.mu.Lock()
323 if cc.conns == nil {
324 cc.mu.Unlock()
325 return errConnClosing
326 }
327 cc.mu.Unlock()
328
329
330
331
332 if err := cc.resolverWrapper.start(); err != nil {
333 return err
334 }
335
336 cc.addTraceEvent("exiting idle mode")
337 return nil
338 }
339
340
341 func (cc *ClientConn) initIdleStateLocked() {
342 cc.resolverWrapper = newCCResolverWrapper(cc)
343 cc.balancerWrapper = newCCBalancerWrapper(cc)
344 cc.firstResolveEvent = grpcsync.NewEvent()
345
346
347
348 cc.conns = make(map[*addrConn]struct{})
349 }
350
351
352
353
354 func (cc *ClientConn) enterIdleMode() {
355 cc.mu.Lock()
356
357 if cc.conns == nil {
358 cc.mu.Unlock()
359 return
360 }
361
362 conns := cc.conns
363
364 rWrapper := cc.resolverWrapper
365 rWrapper.close()
366 cc.pickerWrapper.reset()
367 bWrapper := cc.balancerWrapper
368 bWrapper.close()
369 cc.csMgr.updateState(connectivity.Idle)
370 cc.addTraceEvent("entering idle mode")
371
372 cc.initIdleStateLocked()
373
374 cc.mu.Unlock()
375
376
377 <-rWrapper.serializer.Done()
378 <-bWrapper.serializer.Done()
379
380
381 for ac := range conns {
382 ac.tearDown(errConnIdling)
383 }
384 }
385
386
387
388
389
390
391
392
393
394
395
396
397 func (cc *ClientConn) validateTransportCredentials() error {
398 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
399 return errNoTransportSecurity
400 }
401 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
402 return errTransportCredsAndBundle
403 }
404 if cc.dopts.copts.CredsBundle != nil && cc.dopts.copts.CredsBundle.TransportCredentials() == nil {
405 return errNoTransportCredsInBundle
406 }
407 transportCreds := cc.dopts.copts.TransportCredentials
408 if transportCreds == nil {
409 transportCreds = cc.dopts.copts.CredsBundle.TransportCredentials()
410 }
411 if transportCreds.Info().SecurityProtocol == "insecure" {
412 for _, cd := range cc.dopts.copts.PerRPCCredentials {
413 if cd.RequireTransportSecurity() {
414 return errTransportCredentialsMissing
415 }
416 }
417 }
418 return nil
419 }
420
421
422
423
424
425
426
427
428 func (cc *ClientConn) channelzRegistration(target string) {
429 parentChannel, _ := cc.dopts.channelzParent.(*channelz.Channel)
430 cc.channelz = channelz.RegisterChannel(parentChannel, target)
431 cc.addTraceEvent("created")
432 }
433
434
435 func chainUnaryClientInterceptors(cc *ClientConn) {
436 interceptors := cc.dopts.chainUnaryInts
437
438
439 if cc.dopts.unaryInt != nil {
440 interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
441 }
442 var chainedInt UnaryClientInterceptor
443 if len(interceptors) == 0 {
444 chainedInt = nil
445 } else if len(interceptors) == 1 {
446 chainedInt = interceptors[0]
447 } else {
448 chainedInt = func(ctx context.Context, method string, req, reply any, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
449 return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
450 }
451 }
452 cc.dopts.unaryInt = chainedInt
453 }
454
455
456 func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
457 if curr == len(interceptors)-1 {
458 return finalInvoker
459 }
460 return func(ctx context.Context, method string, req, reply any, cc *ClientConn, opts ...CallOption) error {
461 return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
462 }
463 }
464
465
466 func chainStreamClientInterceptors(cc *ClientConn) {
467 interceptors := cc.dopts.chainStreamInts
468
469
470 if cc.dopts.streamInt != nil {
471 interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
472 }
473 var chainedInt StreamClientInterceptor
474 if len(interceptors) == 0 {
475 chainedInt = nil
476 } else if len(interceptors) == 1 {
477 chainedInt = interceptors[0]
478 } else {
479 chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
480 return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
481 }
482 }
483 cc.dopts.streamInt = chainedInt
484 }
485
486
487 func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
488 if curr == len(interceptors)-1 {
489 return finalStreamer
490 }
491 return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
492 return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
493 }
494 }
495
496
497
498 func newConnectivityStateManager(ctx context.Context, channel *channelz.Channel) *connectivityStateManager {
499 return &connectivityStateManager{
500 channelz: channel,
501 pubSub: grpcsync.NewPubSub(ctx),
502 }
503 }
504
505
506
507
508
509
510
511 type connectivityStateManager struct {
512 mu sync.Mutex
513 state connectivity.State
514 notifyChan chan struct{}
515 channelz *channelz.Channel
516 pubSub *grpcsync.PubSub
517 }
518
519
520
521
522 func (csm *connectivityStateManager) updateState(state connectivity.State) {
523 csm.mu.Lock()
524 defer csm.mu.Unlock()
525 if csm.state == connectivity.Shutdown {
526 return
527 }
528 if csm.state == state {
529 return
530 }
531 csm.state = state
532 csm.channelz.ChannelMetrics.State.Store(&state)
533 csm.pubSub.Publish(state)
534
535 channelz.Infof(logger, csm.channelz, "Channel Connectivity change to %v", state)
536 if csm.notifyChan != nil {
537
538 close(csm.notifyChan)
539 csm.notifyChan = nil
540 }
541 }
542
543 func (csm *connectivityStateManager) getState() connectivity.State {
544 csm.mu.Lock()
545 defer csm.mu.Unlock()
546 return csm.state
547 }
548
549 func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
550 csm.mu.Lock()
551 defer csm.mu.Unlock()
552 if csm.notifyChan == nil {
553 csm.notifyChan = make(chan struct{})
554 }
555 return csm.notifyChan
556 }
557
558
559
560
561 type ClientConnInterface interface {
562
563
564 Invoke(ctx context.Context, method string, args any, reply any, opts ...CallOption) error
565
566 NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error)
567 }
568
569
570 var _ ClientConnInterface = (*ClientConn)(nil)
571
572
573
574
575
576
577
578
579
580
581
582
583
584 type ClientConn struct {
585 ctx context.Context
586 cancel context.CancelFunc
587
588
589 target string
590 parsedTarget resolver.Target
591 authority string
592 dopts dialOptions
593 channelz *channelz.Channel
594 resolverBuilder resolver.Builder
595 idlenessMgr *idle.Manager
596
597
598
599 csMgr *connectivityStateManager
600 pickerWrapper *pickerWrapper
601 safeConfigSelector iresolver.SafeConfigSelector
602 retryThrottler atomic.Value
603
604
605
606 mu sync.RWMutex
607 resolverWrapper *ccResolverWrapper
608 balancerWrapper *ccBalancerWrapper
609 sc *ServiceConfig
610 conns map[*addrConn]struct{}
611 mkp keepalive.ClientParameters
612
613
614
615
616
617 firstResolveEvent *grpcsync.Event
618
619 lceMu sync.Mutex
620 lastConnectionError error
621 }
622
623
624
625
626
627
628
629
630 func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
631 ch := cc.csMgr.getNotifyChan()
632 if cc.csMgr.getState() != sourceState {
633 return true
634 }
635 select {
636 case <-ctx.Done():
637 return false
638 case <-ch:
639 return true
640 }
641 }
642
643
644
645
646
647
648
649 func (cc *ClientConn) GetState() connectivity.State {
650 return cc.csMgr.getState()
651 }
652
653
654
655
656
657
658
659
660
661 func (cc *ClientConn) Connect() {
662 if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
663 cc.addTraceEvent(err.Error())
664 return
665 }
666
667
668 cc.mu.Lock()
669 cc.balancerWrapper.exitIdle()
670 cc.mu.Unlock()
671 }
672
673
674
675
676 func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
677
678
679 if cc.firstResolveEvent.HasFired() {
680 return nil
681 }
682 select {
683 case <-cc.firstResolveEvent.Done():
684 return nil
685 case <-ctx.Done():
686 return status.FromContextError(ctx.Err()).Err()
687 case <-cc.ctx.Done():
688 return ErrClientConnClosing
689 }
690 }
691
692 var emptyServiceConfig *ServiceConfig
693
694 func init() {
695 balancer.Register(pickfirstBuilder{})
696 cfg := parseServiceConfig("{}")
697 if cfg.Err != nil {
698 panic(fmt.Sprintf("impossible error parsing empty service config: %v", cfg.Err))
699 }
700 emptyServiceConfig = cfg.Config.(*ServiceConfig)
701
702 internal.SubscribeToConnectivityStateChanges = func(cc *ClientConn, s grpcsync.Subscriber) func() {
703 return cc.csMgr.pubSub.Subscribe(s)
704 }
705 internal.EnterIdleModeForTesting = func(cc *ClientConn) {
706 cc.idlenessMgr.EnterIdleModeForTesting()
707 }
708 internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
709 return cc.idlenessMgr.ExitIdleMode()
710 }
711 }
712
713 func (cc *ClientConn) maybeApplyDefaultServiceConfig() {
714 if cc.sc != nil {
715 cc.applyServiceConfigAndBalancer(cc.sc, nil)
716 return
717 }
718 if cc.dopts.defaultServiceConfig != nil {
719 cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig})
720 } else {
721 cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig})
722 }
723 }
724
725 func (cc *ClientConn) updateResolverStateAndUnlock(s resolver.State, err error) error {
726 defer cc.firstResolveEvent.Fire()
727
728
729
730 if cc.conns == nil {
731 cc.mu.Unlock()
732 return nil
733 }
734
735 if err != nil {
736
737
738
739 cc.maybeApplyDefaultServiceConfig()
740
741 cc.balancerWrapper.resolverError(err)
742
743
744 cc.mu.Unlock()
745 return balancer.ErrBadResolverState
746 }
747
748 var ret error
749 if cc.dopts.disableServiceConfig {
750 channelz.Infof(logger, cc.channelz, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig)
751 cc.maybeApplyDefaultServiceConfig()
752 } else if s.ServiceConfig == nil {
753 cc.maybeApplyDefaultServiceConfig()
754
755
756 } else {
757 if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok {
758 configSelector := iresolver.GetConfigSelector(s)
759 if configSelector != nil {
760 if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 {
761 channelz.Infof(logger, cc.channelz, "method configs in service config will be ignored due to presence of config selector")
762 }
763 } else {
764 configSelector = &defaultConfigSelector{sc}
765 }
766 cc.applyServiceConfigAndBalancer(sc, configSelector)
767 } else {
768 ret = balancer.ErrBadResolverState
769 if cc.sc == nil {
770
771
772 cc.applyFailingLBLocked(s.ServiceConfig)
773 cc.mu.Unlock()
774 return ret
775 }
776 }
777 }
778
779 var balCfg serviceconfig.LoadBalancingConfig
780 if cc.sc != nil && cc.sc.lbConfig != nil {
781 balCfg = cc.sc.lbConfig
782 }
783 bw := cc.balancerWrapper
784 cc.mu.Unlock()
785
786 uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
787 if ret == nil {
788 ret = uccsErr
789
790 }
791 return ret
792 }
793
794
795
796
797
798
799
800 func (cc *ClientConn) applyFailingLBLocked(sc *serviceconfig.ParseResult) {
801 var err error
802 if sc.Err != nil {
803 err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err)
804 } else {
805 err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
806 }
807 cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
808 cc.pickerWrapper.updatePicker(base.NewErrPicker(err))
809 cc.csMgr.updateState(connectivity.TransientFailure)
810 }
811
812
813
814
815
816
817 func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Address {
818 out := make([]resolver.Address, len(in))
819 for i := range in {
820 out[i] = in[i]
821 out[i].BalancerAttributes = nil
822 }
823 return out
824 }
825
826
827
828
829 func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
830 if cc.conns == nil {
831 return nil, ErrClientConnClosing
832 }
833
834 ac := &addrConn{
835 state: connectivity.Idle,
836 cc: cc,
837 addrs: copyAddressesWithoutBalancerAttributes(addrs),
838 scopts: opts,
839 dopts: cc.dopts,
840 channelz: channelz.RegisterSubChannel(cc.channelz, ""),
841 resetBackoff: make(chan struct{}),
842 stateChan: make(chan struct{}),
843 }
844 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
845
846
847 ac.channelz.ChannelMetrics.Target.Store(&addrs[0].Addr)
848
849 channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
850 Desc: "Subchannel created",
851 Severity: channelz.CtInfo,
852 Parent: &channelz.TraceEvent{
853 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelz.ID),
854 Severity: channelz.CtInfo,
855 },
856 })
857
858
859 cc.conns[ac] = struct{}{}
860 return ac, nil
861 }
862
863
864
865 func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
866 cc.mu.Lock()
867 if cc.conns == nil {
868 cc.mu.Unlock()
869 return
870 }
871 delete(cc.conns, ac)
872 cc.mu.Unlock()
873 ac.tearDown(err)
874 }
875
876
877 func (cc *ClientConn) Target() string {
878 return cc.target
879 }
880
881
882 func (cc *ClientConn) CanonicalTarget() string {
883 return cc.parsedTarget.String()
884 }
885
886 func (cc *ClientConn) incrCallsStarted() {
887 cc.channelz.ChannelMetrics.CallsStarted.Add(1)
888 cc.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
889 }
890
891 func (cc *ClientConn) incrCallsSucceeded() {
892 cc.channelz.ChannelMetrics.CallsSucceeded.Add(1)
893 }
894
895 func (cc *ClientConn) incrCallsFailed() {
896 cc.channelz.ChannelMetrics.CallsFailed.Add(1)
897 }
898
899
900
901
902 func (ac *addrConn) connect() error {
903 ac.mu.Lock()
904 if ac.state == connectivity.Shutdown {
905 if logger.V(2) {
906 logger.Infof("connect called on shutdown addrConn; ignoring.")
907 }
908 ac.mu.Unlock()
909 return errConnClosing
910 }
911 if ac.state != connectivity.Idle {
912 if logger.V(2) {
913 logger.Infof("connect called on addrConn in non-idle state (%v); ignoring.", ac.state)
914 }
915 ac.mu.Unlock()
916 return nil
917 }
918 ac.mu.Unlock()
919
920 ac.resetTransport()
921 return nil
922 }
923
924 func equalAddresses(a, b []resolver.Address) bool {
925 if len(a) != len(b) {
926 return false
927 }
928 for i, v := range a {
929 if !v.Equal(b[i]) {
930 return false
931 }
932 }
933 return true
934 }
935
936
937
938 func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
939 addrs = copyAddressesWithoutBalancerAttributes(addrs)
940 limit := len(addrs)
941 if limit > 5 {
942 limit = 5
943 }
944 channelz.Infof(logger, ac.channelz, "addrConn: updateAddrs addrs (%d of %d): %v", limit, len(addrs), addrs[:limit])
945
946 ac.mu.Lock()
947 if equalAddresses(ac.addrs, addrs) {
948 ac.mu.Unlock()
949 return
950 }
951
952 ac.addrs = addrs
953
954 if ac.state == connectivity.Shutdown ||
955 ac.state == connectivity.TransientFailure ||
956 ac.state == connectivity.Idle {
957
958 ac.mu.Unlock()
959 return
960 }
961
962 if ac.state == connectivity.Ready {
963
964 for _, a := range addrs {
965 a.ServerName = ac.cc.getServerName(a)
966 if a.Equal(ac.curAddr) {
967
968
969 ac.mu.Unlock()
970 return
971 }
972 }
973 }
974
975
976
977
978 ac.cancel()
979 ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
980
981
982
983 if ac.transport != nil {
984 defer ac.transport.GracefulClose()
985 ac.transport = nil
986 }
987
988 if len(addrs) == 0 {
989 ac.updateConnectivityState(connectivity.Idle, nil)
990 }
991
992 ac.mu.Unlock()
993
994
995
996 go ac.resetTransport()
997 }
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009 func (cc *ClientConn) getServerName(addr resolver.Address) string {
1010 if cc.dopts.authority != "" {
1011 return cc.dopts.authority
1012 }
1013 if addr.ServerName != "" {
1014 return addr.ServerName
1015 }
1016 return cc.authority
1017 }
1018
1019 func getMethodConfig(sc *ServiceConfig, method string) MethodConfig {
1020 if sc == nil {
1021 return MethodConfig{}
1022 }
1023 if m, ok := sc.Methods[method]; ok {
1024 return m
1025 }
1026 i := strings.LastIndex(method, "/")
1027 if m, ok := sc.Methods[method[:i+1]]; ok {
1028 return m
1029 }
1030 return sc.Methods[""]
1031 }
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041 func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
1042
1043 cc.mu.RLock()
1044 defer cc.mu.RUnlock()
1045 return getMethodConfig(cc.sc, method)
1046 }
1047
1048 func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
1049 cc.mu.RLock()
1050 defer cc.mu.RUnlock()
1051 if cc.sc == nil {
1052 return nil
1053 }
1054 return cc.sc.healthCheckConfig
1055 }
1056
1057 func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
1058 return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
1059 Ctx: ctx,
1060 FullMethodName: method,
1061 })
1062 }
1063
1064 func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) {
1065 if sc == nil {
1066
1067 return
1068 }
1069 cc.sc = sc
1070 if configSelector != nil {
1071 cc.safeConfigSelector.UpdateConfigSelector(configSelector)
1072 }
1073
1074 if cc.sc.retryThrottling != nil {
1075 newThrottler := &retryThrottler{
1076 tokens: cc.sc.retryThrottling.MaxTokens,
1077 max: cc.sc.retryThrottling.MaxTokens,
1078 thresh: cc.sc.retryThrottling.MaxTokens / 2,
1079 ratio: cc.sc.retryThrottling.TokenRatio,
1080 }
1081 cc.retryThrottler.Store(newThrottler)
1082 } else {
1083 cc.retryThrottler.Store((*retryThrottler)(nil))
1084 }
1085 }
1086
1087 func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
1088 cc.mu.RLock()
1089 cc.resolverWrapper.resolveNow(o)
1090 cc.mu.RUnlock()
1091 }
1092
1093 func (cc *ClientConn) resolveNowLocked(o resolver.ResolveNowOptions) {
1094 cc.resolverWrapper.resolveNow(o)
1095 }
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110 func (cc *ClientConn) ResetConnectBackoff() {
1111 cc.mu.Lock()
1112 conns := cc.conns
1113 cc.mu.Unlock()
1114 for ac := range conns {
1115 ac.resetConnectBackoff()
1116 }
1117 }
1118
1119
1120 func (cc *ClientConn) Close() error {
1121 defer func() {
1122 cc.cancel()
1123 <-cc.csMgr.pubSub.Done()
1124 }()
1125
1126
1127
1128 cc.idlenessMgr.Close()
1129
1130 cc.mu.Lock()
1131 if cc.conns == nil {
1132 cc.mu.Unlock()
1133 return ErrClientConnClosing
1134 }
1135
1136 conns := cc.conns
1137 cc.conns = nil
1138 cc.csMgr.updateState(connectivity.Shutdown)
1139
1140
1141
1142 cc.mu.Unlock()
1143
1144 cc.resolverWrapper.close()
1145
1146
1147 cc.pickerWrapper.close()
1148 cc.balancerWrapper.close()
1149
1150 <-cc.resolverWrapper.serializer.Done()
1151 <-cc.balancerWrapper.serializer.Done()
1152
1153 for ac := range conns {
1154 ac.tearDown(ErrClientConnClosing)
1155 }
1156 cc.addTraceEvent("deleted")
1157
1158
1159
1160 channelz.RemoveEntry(cc.channelz.ID)
1161
1162 return nil
1163 }
1164
1165
1166 type addrConn struct {
1167 ctx context.Context
1168 cancel context.CancelFunc
1169
1170 cc *ClientConn
1171 dopts dialOptions
1172 acbw *acBalancerWrapper
1173 scopts balancer.NewSubConnOptions
1174
1175
1176
1177
1178
1179 transport transport.ClientTransport
1180
1181
1182
1183
1184
1185 mu sync.Mutex
1186 curAddr resolver.Address
1187 addrs []resolver.Address
1188
1189
1190 state connectivity.State
1191 stateChan chan struct{}
1192
1193 backoffIdx int
1194 resetBackoff chan struct{}
1195
1196 channelz *channelz.SubChannel
1197 }
1198
1199
1200 func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
1201 if ac.state == s {
1202 return
1203 }
1204
1205 close(ac.stateChan)
1206 ac.stateChan = make(chan struct{})
1207 ac.state = s
1208 ac.channelz.ChannelMetrics.State.Store(&s)
1209 if lastErr == nil {
1210 channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v", s)
1211 } else {
1212 channelz.Infof(logger, ac.channelz, "Subchannel Connectivity change to %v, last error: %s", s, lastErr)
1213 }
1214 ac.acbw.updateState(s, lastErr)
1215 }
1216
1217
1218
1219 func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
1220 switch r {
1221 case transport.GoAwayTooManyPings:
1222 v := 2 * ac.dopts.copts.KeepaliveParams.Time
1223 ac.cc.mu.Lock()
1224 if v > ac.cc.mkp.Time {
1225 ac.cc.mkp.Time = v
1226 }
1227 ac.cc.mu.Unlock()
1228 }
1229 }
1230
1231 func (ac *addrConn) resetTransport() {
1232 ac.mu.Lock()
1233 acCtx := ac.ctx
1234 if acCtx.Err() != nil {
1235 ac.mu.Unlock()
1236 return
1237 }
1238
1239 addrs := ac.addrs
1240 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
1241
1242 dialDuration := minConnectTimeout
1243 if ac.dopts.minConnectTimeout != nil {
1244 dialDuration = ac.dopts.minConnectTimeout()
1245 }
1246
1247 if dialDuration < backoffFor {
1248
1249 dialDuration = backoffFor
1250 }
1251
1252
1253
1254
1255
1256
1257 connectDeadline := time.Now().Add(dialDuration)
1258
1259 ac.updateConnectivityState(connectivity.Connecting, nil)
1260 ac.mu.Unlock()
1261
1262 if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
1263 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1264 ac.mu.Lock()
1265 if acCtx.Err() != nil {
1266
1267 ac.mu.Unlock()
1268 return
1269 }
1270
1271
1272 ac.updateConnectivityState(connectivity.TransientFailure, err)
1273
1274
1275 b := ac.resetBackoff
1276 ac.mu.Unlock()
1277
1278 timer := time.NewTimer(backoffFor)
1279 select {
1280 case <-timer.C:
1281 ac.mu.Lock()
1282 ac.backoffIdx++
1283 ac.mu.Unlock()
1284 case <-b:
1285 timer.Stop()
1286 case <-acCtx.Done():
1287 timer.Stop()
1288 return
1289 }
1290
1291 ac.mu.Lock()
1292 if acCtx.Err() == nil {
1293 ac.updateConnectivityState(connectivity.Idle, err)
1294 }
1295 ac.mu.Unlock()
1296 return
1297 }
1298
1299 ac.mu.Lock()
1300 ac.backoffIdx = 0
1301 ac.mu.Unlock()
1302 }
1303
1304
1305
1306
1307 func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
1308 var firstConnErr error
1309 for _, addr := range addrs {
1310 ac.channelz.ChannelMetrics.Target.Store(&addr.Addr)
1311 if ctx.Err() != nil {
1312 return errConnClosing
1313 }
1314 ac.mu.Lock()
1315
1316 ac.cc.mu.RLock()
1317 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1318 ac.cc.mu.RUnlock()
1319
1320 copts := ac.dopts.copts
1321 if ac.scopts.CredsBundle != nil {
1322 copts.CredsBundle = ac.scopts.CredsBundle
1323 }
1324 ac.mu.Unlock()
1325
1326 channelz.Infof(logger, ac.channelz, "Subchannel picks a new address %q to connect", addr.Addr)
1327
1328 err := ac.createTransport(ctx, addr, copts, connectDeadline)
1329 if err == nil {
1330 return nil
1331 }
1332 if firstConnErr == nil {
1333 firstConnErr = err
1334 }
1335 ac.cc.updateConnectionError(err)
1336 }
1337
1338
1339 return firstConnErr
1340 }
1341
1342
1343
1344
1345 func (ac *addrConn) createTransport(ctx context.Context, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
1346 addr.ServerName = ac.cc.getServerName(addr)
1347 hctx, hcancel := context.WithCancel(ctx)
1348
1349 onClose := func(r transport.GoAwayReason) {
1350 ac.mu.Lock()
1351 defer ac.mu.Unlock()
1352
1353 ac.adjustParams(r)
1354 if ctx.Err() != nil {
1355
1356
1357
1358
1359 return
1360 }
1361 hcancel()
1362 if ac.transport == nil {
1363
1364
1365
1366
1367 return
1368 }
1369 ac.transport = nil
1370
1371 ac.cc.resolveNow(resolver.ResolveNowOptions{})
1372
1373
1374 ac.updateConnectivityState(connectivity.Idle, nil)
1375 }
1376
1377 connectCtx, cancel := context.WithDeadline(ctx, connectDeadline)
1378 defer cancel()
1379 copts.ChannelzParent = ac.channelz
1380
1381 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose)
1382 if err != nil {
1383 if logger.V(2) {
1384 logger.Infof("Creating new client transport to %q: %v", addr, err)
1385 }
1386
1387 hcancel()
1388 channelz.Warningf(logger, ac.channelz, "grpc: addrConn.createTransport failed to connect to %s. Err: %v", addr, err)
1389 return err
1390 }
1391
1392 ac.mu.Lock()
1393 defer ac.mu.Unlock()
1394 if ctx.Err() != nil {
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409 go newTr.Close(transport.ErrConnClosing)
1410 return nil
1411 }
1412 if hctx.Err() != nil {
1413
1414
1415
1416 ac.updateConnectivityState(connectivity.Idle, nil)
1417 return nil
1418 }
1419 ac.curAddr = addr
1420 ac.transport = newTr
1421 ac.startHealthCheck(hctx)
1422 return nil
1423 }
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437 func (ac *addrConn) startHealthCheck(ctx context.Context) {
1438 var healthcheckManagingState bool
1439 defer func() {
1440 if !healthcheckManagingState {
1441 ac.updateConnectivityState(connectivity.Ready, nil)
1442 }
1443 }()
1444
1445 if ac.cc.dopts.disableHealthCheck {
1446 return
1447 }
1448 healthCheckConfig := ac.cc.healthCheckConfig()
1449 if healthCheckConfig == nil {
1450 return
1451 }
1452 if !ac.scopts.HealthCheckEnabled {
1453 return
1454 }
1455 healthCheckFunc := ac.cc.dopts.healthCheckFunc
1456 if healthCheckFunc == nil {
1457
1458
1459
1460 channelz.Error(logger, ac.channelz, "Health check is requested but health check function is not set.")
1461 return
1462 }
1463
1464 healthcheckManagingState = true
1465
1466
1467 currentTr := ac.transport
1468 newStream := func(method string) (any, error) {
1469 ac.mu.Lock()
1470 if ac.transport != currentTr {
1471 ac.mu.Unlock()
1472 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
1473 }
1474 ac.mu.Unlock()
1475 return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
1476 }
1477 setConnectivityState := func(s connectivity.State, lastErr error) {
1478 ac.mu.Lock()
1479 defer ac.mu.Unlock()
1480 if ac.transport != currentTr {
1481 return
1482 }
1483 ac.updateConnectivityState(s, lastErr)
1484 }
1485
1486 go func() {
1487 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
1488 if err != nil {
1489 if status.Code(err) == codes.Unimplemented {
1490 channelz.Error(logger, ac.channelz, "Subchannel health check is unimplemented at server side, thus health check is disabled")
1491 } else {
1492 channelz.Errorf(logger, ac.channelz, "Health checking failed: %v", err)
1493 }
1494 }
1495 }()
1496 }
1497
1498 func (ac *addrConn) resetConnectBackoff() {
1499 ac.mu.Lock()
1500 close(ac.resetBackoff)
1501 ac.backoffIdx = 0
1502 ac.resetBackoff = make(chan struct{})
1503 ac.mu.Unlock()
1504 }
1505
1506
1507 func (ac *addrConn) getReadyTransport() transport.ClientTransport {
1508 ac.mu.Lock()
1509 defer ac.mu.Unlock()
1510 if ac.state == connectivity.Ready {
1511 return ac.transport
1512 }
1513 return nil
1514 }
1515
1516
1517
1518
1519 func (ac *addrConn) getTransport(ctx context.Context) (transport.ClientTransport, error) {
1520 for ctx.Err() == nil {
1521 ac.mu.Lock()
1522 t, state, sc := ac.transport, ac.state, ac.stateChan
1523 ac.mu.Unlock()
1524 if state == connectivity.Ready {
1525 return t, nil
1526 }
1527 if state == connectivity.Shutdown {
1528 return nil, status.Errorf(codes.Unavailable, "SubConn shutting down")
1529 }
1530
1531 select {
1532 case <-ctx.Done():
1533 case <-sc:
1534 }
1535 }
1536 return nil, status.FromContextError(ctx.Err()).Err()
1537 }
1538
1539
1540
1541
1542
1543 func (ac *addrConn) tearDown(err error) {
1544 ac.mu.Lock()
1545 if ac.state == connectivity.Shutdown {
1546 ac.mu.Unlock()
1547 return
1548 }
1549 curTr := ac.transport
1550 ac.transport = nil
1551
1552
1553 ac.updateConnectivityState(connectivity.Shutdown, nil)
1554 ac.cancel()
1555 ac.curAddr = resolver.Address{}
1556
1557 channelz.AddTraceEvent(logger, ac.channelz, 0, &channelz.TraceEvent{
1558 Desc: "Subchannel deleted",
1559 Severity: channelz.CtInfo,
1560 Parent: &channelz.TraceEvent{
1561 Desc: fmt.Sprintf("Subchannel(id:%d) deleted", ac.channelz.ID),
1562 Severity: channelz.CtInfo,
1563 },
1564 })
1565
1566
1567
1568 channelz.RemoveEntry(ac.channelz.ID)
1569 ac.mu.Unlock()
1570
1571
1572
1573 if curTr != nil {
1574 if err == errConnDrain {
1575
1576
1577
1578
1579
1580
1581 curTr.GracefulClose()
1582 } else {
1583
1584
1585
1586
1587
1588
1589
1590 curTr.Close(err)
1591 }
1592 }
1593 }
1594
1595 type retryThrottler struct {
1596 max float64
1597 thresh float64
1598 ratio float64
1599
1600 mu sync.Mutex
1601 tokens float64
1602 }
1603
1604
1605
1606
1607 func (rt *retryThrottler) throttle() bool {
1608 if rt == nil {
1609 return false
1610 }
1611 rt.mu.Lock()
1612 defer rt.mu.Unlock()
1613 rt.tokens--
1614 if rt.tokens < 0 {
1615 rt.tokens = 0
1616 }
1617 return rt.tokens <= rt.thresh
1618 }
1619
1620 func (rt *retryThrottler) successfulRPC() {
1621 if rt == nil {
1622 return
1623 }
1624 rt.mu.Lock()
1625 defer rt.mu.Unlock()
1626 rt.tokens += rt.ratio
1627 if rt.tokens > rt.max {
1628 rt.tokens = rt.max
1629 }
1630 }
1631
1632 func (ac *addrConn) incrCallsStarted() {
1633 ac.channelz.ChannelMetrics.CallsStarted.Add(1)
1634 ac.channelz.ChannelMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
1635 }
1636
1637 func (ac *addrConn) incrCallsSucceeded() {
1638 ac.channelz.ChannelMetrics.CallsSucceeded.Add(1)
1639 }
1640
1641 func (ac *addrConn) incrCallsFailed() {
1642 ac.channelz.ChannelMetrics.CallsFailed.Add(1)
1643 }
1644
1645
1646
1647
1648
1649
1650 var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
1651
1652
1653
1654
1655 func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
1656 for _, rb := range cc.dopts.resolvers {
1657 if scheme == rb.Scheme() {
1658 return rb
1659 }
1660 }
1661 return resolver.Get(scheme)
1662 }
1663
1664 func (cc *ClientConn) updateConnectionError(err error) {
1665 cc.lceMu.Lock()
1666 cc.lastConnectionError = err
1667 cc.lceMu.Unlock()
1668 }
1669
1670 func (cc *ClientConn) connectionError() error {
1671 cc.lceMu.Lock()
1672 defer cc.lceMu.Unlock()
1673 return cc.lastConnectionError
1674 }
1675
1676
1677
1678
1679
1680
1681
1682
1683 func (cc *ClientConn) parseTargetAndFindResolver() error {
1684 channelz.Infof(logger, cc.channelz, "original dial target is: %q", cc.target)
1685
1686 var rb resolver.Builder
1687 parsedTarget, err := parseTarget(cc.target)
1688 if err != nil {
1689 channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", cc.target, err)
1690 } else {
1691 channelz.Infof(logger, cc.channelz, "parsed dial target is: %#v", parsedTarget)
1692 rb = cc.getResolver(parsedTarget.URL.Scheme)
1693 if rb != nil {
1694 cc.parsedTarget = parsedTarget
1695 cc.resolverBuilder = rb
1696 return nil
1697 }
1698 }
1699
1700
1701
1702
1703
1704
1705 defScheme := cc.dopts.defaultScheme
1706 if internal.UserSetDefaultScheme {
1707 defScheme = resolver.GetDefaultScheme()
1708 }
1709
1710 channelz.Infof(logger, cc.channelz, "fallback to scheme %q", defScheme)
1711 canonicalTarget := defScheme + ":///" + cc.target
1712
1713 parsedTarget, err = parseTarget(canonicalTarget)
1714 if err != nil {
1715 channelz.Infof(logger, cc.channelz, "dial target %q parse failed: %v", canonicalTarget, err)
1716 return err
1717 }
1718 channelz.Infof(logger, cc.channelz, "parsed dial target is: %+v", parsedTarget)
1719 rb = cc.getResolver(parsedTarget.URL.Scheme)
1720 if rb == nil {
1721 return fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.URL.Scheme)
1722 }
1723 cc.parsedTarget = parsedTarget
1724 cc.resolverBuilder = rb
1725 return nil
1726 }
1727
1728
1729
1730
1731 func parseTarget(target string) (resolver.Target, error) {
1732 u, err := url.Parse(target)
1733 if err != nil {
1734 return resolver.Target{}, err
1735 }
1736
1737 return resolver.Target{URL: *u}, nil
1738 }
1739
1740
1741
1742 func encodeAuthority(authority string) string {
1743 const upperhex = "0123456789ABCDEF"
1744
1745
1746
1747
1748 shouldEscape := func(c byte) bool {
1749
1750 if 'a' <= c && c <= 'z' || 'A' <= c && c <= 'Z' || '0' <= c && c <= '9' {
1751 return false
1752 }
1753 switch c {
1754 case '-', '_', '.', '~':
1755 return false
1756 case '!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=':
1757 return false
1758 case ':', '[', ']', '@':
1759 return false
1760 }
1761
1762 return true
1763 }
1764
1765 hexCount := 0
1766 for i := 0; i < len(authority); i++ {
1767 c := authority[i]
1768 if shouldEscape(c) {
1769 hexCount++
1770 }
1771 }
1772
1773 if hexCount == 0 {
1774 return authority
1775 }
1776
1777 required := len(authority) + 2*hexCount
1778 t := make([]byte, required)
1779
1780 j := 0
1781
1782 for i := 0; i < len(authority); i++ {
1783 switch c := authority[i]; {
1784 case shouldEscape(c):
1785 t[j] = '%'
1786 t[j+1] = upperhex[c>>4]
1787 t[j+2] = upperhex[c&15]
1788 j += 3
1789 default:
1790 t[j] = authority[i]
1791 j++
1792 }
1793 }
1794 return string(t)
1795 }
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808 func (cc *ClientConn) determineAuthority() error {
1809 dopts := cc.dopts
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820 authorityFromCreds := ""
1821 if creds := dopts.copts.TransportCredentials; creds != nil && creds.Info().ServerName != "" {
1822 authorityFromCreds = creds.Info().ServerName
1823 }
1824 authorityFromDialOption := dopts.authority
1825 if (authorityFromCreds != "" && authorityFromDialOption != "") && authorityFromCreds != authorityFromDialOption {
1826 return fmt.Errorf("ClientConn's authority from transport creds %q and dial option %q don't match", authorityFromCreds, authorityFromDialOption)
1827 }
1828
1829 endpoint := cc.parsedTarget.Endpoint()
1830 if authorityFromDialOption != "" {
1831 cc.authority = authorityFromDialOption
1832 } else if authorityFromCreds != "" {
1833 cc.authority = authorityFromCreds
1834 } else if auth, ok := cc.resolverBuilder.(resolver.AuthorityOverrider); ok {
1835 cc.authority = auth.OverrideAuthority(cc.parsedTarget)
1836 } else if strings.HasPrefix(endpoint, ":") {
1837 cc.authority = "localhost" + endpoint
1838 } else {
1839 cc.authority = encodeAuthority(endpoint)
1840 }
1841 channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
1842 return nil
1843 }
1844
View as plain text