...
1
18
19 package clusterresolver
20
21 import (
22 "fmt"
23 "net/url"
24 "sync"
25
26 "google.golang.org/grpc/internal/grpclog"
27 "google.golang.org/grpc/internal/pretty"
28 "google.golang.org/grpc/resolver"
29 "google.golang.org/grpc/serviceconfig"
30 )
31
32 var (
33 newDNS = func(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
34
35
36 return resolver.Get("dns").Build(target, cc, opts)
37 }
38 )
39
40
41
42
43 type dnsDiscoveryMechanism struct {
44 target string
45 topLevelResolver topLevelResolver
46 dnsR resolver.Resolver
47 logger *grpclog.PrefixLogger
48
49 mu sync.Mutex
50 addrs []string
51 updateReceived bool
52 }
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *grpclog.PrefixLogger) *dnsDiscoveryMechanism {
71 ret := &dnsDiscoveryMechanism{
72 target: target,
73 topLevelResolver: topLevelResolver,
74 logger: logger,
75 }
76 u, err := url.Parse("dns:///" + target)
77 if err != nil {
78 if ret.logger.V(2) {
79 ret.logger.Infof("Failed to parse dns hostname %q in clusterresolver LB policy", target)
80 }
81 ret.updateReceived = true
82 ret.topLevelResolver.onUpdate()
83 return ret
84 }
85
86 r, err := newDNS(resolver.Target{URL: *u}, ret, resolver.BuildOptions{})
87 if err != nil {
88 if ret.logger.V(2) {
89 ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err)
90 }
91 ret.updateReceived = true
92 ret.topLevelResolver.onUpdate()
93 return ret
94 }
95 ret.dnsR = r
96 return ret
97 }
98
99 func (dr *dnsDiscoveryMechanism) lastUpdate() (any, bool) {
100 dr.mu.Lock()
101 defer dr.mu.Unlock()
102
103 if !dr.updateReceived {
104 return nil, false
105 }
106 return dr.addrs, true
107 }
108
109 func (dr *dnsDiscoveryMechanism) resolveNow() {
110 if dr.dnsR != nil {
111 dr.dnsR.ResolveNow(resolver.ResolveNowOptions{})
112 }
113 }
114
115
116
117
118
119
120
121 func (dr *dnsDiscoveryMechanism) stop() {
122 if dr.dnsR != nil {
123 dr.dnsR.Close()
124 }
125 }
126
127
128
129
130 func (dr *dnsDiscoveryMechanism) UpdateState(state resolver.State) error {
131 if dr.logger.V(2) {
132 dr.logger.Infof("DNS discovery mechanism for resource %q reported an update: %s", dr.target, pretty.ToJSON(state))
133 }
134
135 dr.mu.Lock()
136 var addrs []string
137 if len(state.Endpoints) > 0 {
138
139
140 addrs = make([]string, 0, len(state.Endpoints))
141 for _, e := range state.Endpoints {
142 for _, a := range e.Addresses {
143 addrs = append(addrs, a.Addr)
144 }
145 }
146 } else {
147 addrs = make([]string, len(state.Addresses))
148 for i, a := range state.Addresses {
149 addrs[i] = a.Addr
150 }
151 }
152 dr.addrs = addrs
153 dr.updateReceived = true
154 dr.mu.Unlock()
155
156 dr.topLevelResolver.onUpdate()
157 return nil
158 }
159
160 func (dr *dnsDiscoveryMechanism) ReportError(err error) {
161 if dr.logger.V(2) {
162 dr.logger.Infof("DNS discovery mechanism for resource %q reported error: %v", dr.target, err)
163 }
164
165 dr.mu.Lock()
166
167
168
169
170
171 if dr.updateReceived {
172 dr.mu.Unlock()
173 return
174 }
175 dr.addrs = nil
176 dr.updateReceived = true
177 dr.mu.Unlock()
178
179 dr.topLevelResolver.onUpdate()
180 }
181
182 func (dr *dnsDiscoveryMechanism) NewAddress(addresses []resolver.Address) {
183 dr.UpdateState(resolver.State{Addresses: addresses})
184 }
185
186 func (dr *dnsDiscoveryMechanism) ParseServiceConfig(string) *serviceconfig.ParseResult {
187 return &serviceconfig.ParseResult{Err: fmt.Errorf("service config not supported")}
188 }
189
View as plain text