...
1
18
19 package grpc
20
21 import (
22 "context"
23 "strings"
24 "sync"
25
26 "google.golang.org/grpc/internal/channelz"
27 "google.golang.org/grpc/internal/grpcsync"
28 "google.golang.org/grpc/internal/pretty"
29 "google.golang.org/grpc/resolver"
30 "google.golang.org/grpc/serviceconfig"
31 )
32
33
34
35 type ccResolverWrapper struct {
36
37
38 cc *ClientConn
39 ignoreServiceConfig bool
40 serializer *grpcsync.CallbackSerializer
41 serializerCancel context.CancelFunc
42
43 resolver resolver.Resolver
44
45
46
47 mu sync.Mutex
48 curState resolver.State
49 closed bool
50 }
51
52
53
54 func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper {
55 ctx, cancel := context.WithCancel(cc.ctx)
56 return &ccResolverWrapper{
57 cc: cc,
58 ignoreServiceConfig: cc.dopts.disableServiceConfig,
59 serializer: grpcsync.NewCallbackSerializer(ctx),
60 serializerCancel: cancel,
61 }
62 }
63
64
65
66
67 func (ccr *ccResolverWrapper) start() error {
68 errCh := make(chan error)
69 ccr.serializer.Schedule(func(ctx context.Context) {
70 if ctx.Err() != nil {
71 return
72 }
73 opts := resolver.BuildOptions{
74 DisableServiceConfig: ccr.cc.dopts.disableServiceConfig,
75 DialCreds: ccr.cc.dopts.copts.TransportCredentials,
76 CredsBundle: ccr.cc.dopts.copts.CredsBundle,
77 Dialer: ccr.cc.dopts.copts.Dialer,
78 Authority: ccr.cc.authority,
79 }
80 var err error
81 ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts)
82 errCh <- err
83 })
84 return <-errCh
85 }
86
87 func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
88 ccr.serializer.Schedule(func(ctx context.Context) {
89 if ctx.Err() != nil || ccr.resolver == nil {
90 return
91 }
92 ccr.resolver.ResolveNow(o)
93 })
94 }
95
96
97
98
99 func (ccr *ccResolverWrapper) close() {
100 channelz.Info(logger, ccr.cc.channelz, "Closing the name resolver")
101 ccr.mu.Lock()
102 ccr.closed = true
103 ccr.mu.Unlock()
104
105 ccr.serializer.Schedule(func(context.Context) {
106 if ccr.resolver == nil {
107 return
108 }
109 ccr.resolver.Close()
110 ccr.resolver = nil
111 })
112 ccr.serializerCancel()
113 }
114
115
116
117 func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
118 ccr.cc.mu.Lock()
119 ccr.mu.Lock()
120 if ccr.closed {
121 ccr.mu.Unlock()
122 ccr.cc.mu.Unlock()
123 return nil
124 }
125 if s.Endpoints == nil {
126 s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
127 for _, a := range s.Addresses {
128 ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
129 ep.Addresses[0].BalancerAttributes = nil
130 s.Endpoints = append(s.Endpoints, ep)
131 }
132 }
133 ccr.addChannelzTraceEvent(s)
134 ccr.curState = s
135 ccr.mu.Unlock()
136 return ccr.cc.updateResolverStateAndUnlock(s, nil)
137 }
138
139
140
141 func (ccr *ccResolverWrapper) ReportError(err error) {
142 ccr.cc.mu.Lock()
143 ccr.mu.Lock()
144 if ccr.closed {
145 ccr.mu.Unlock()
146 ccr.cc.mu.Unlock()
147 return
148 }
149 ccr.mu.Unlock()
150 channelz.Warningf(logger, ccr.cc.channelz, "ccResolverWrapper: reporting error to cc: %v", err)
151 ccr.cc.updateResolverStateAndUnlock(resolver.State{}, err)
152 }
153
154
155
156 func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
157 ccr.cc.mu.Lock()
158 ccr.mu.Lock()
159 if ccr.closed {
160 ccr.mu.Unlock()
161 ccr.cc.mu.Unlock()
162 return
163 }
164 s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}
165 ccr.addChannelzTraceEvent(s)
166 ccr.curState = s
167 ccr.mu.Unlock()
168 ccr.cc.updateResolverStateAndUnlock(s, nil)
169 }
170
171
172
173 func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
174 return parseServiceConfig(scJSON)
175 }
176
177
178
179 func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
180 var updates []string
181 var oldSC, newSC *ServiceConfig
182 var oldOK, newOK bool
183 if ccr.curState.ServiceConfig != nil {
184 oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
185 }
186 if s.ServiceConfig != nil {
187 newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
188 }
189 if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
190 updates = append(updates, "service config updated")
191 }
192 if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
193 updates = append(updates, "resolver returned an empty address list")
194 } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
195 updates = append(updates, "resolver returned new addresses")
196 }
197 channelz.Infof(logger, ccr.cc.channelz, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
198 }
199
View as plain text