1 package redis
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "net"
8 "strings"
9 "time"
10
11 "github.com/letsencrypt/boulder/cmd"
12 blog "github.com/letsencrypt/boulder/log"
13 "github.com/prometheus/client_golang/prometheus"
14
15 "github.com/redis/go-redis/v9"
16 )
17
18 var ErrNoShardsResolved = errors.New("0 shards were resolved")
19
20
21
22 type lookup struct {
23
24 srvLookups []cmd.ServiceDomain
25
26
27
28 updateFrequency time.Duration
29
30
31
32 updateTimeout time.Duration
33
34
35
36
37
38
39 dnsAuthority string
40
41
42
43 stop context.CancelFunc
44
45 resolver *net.Resolver
46 ring *redis.Ring
47 logger blog.Logger
48 stats prometheus.Registerer
49 }
50
51
52
53
54 func newLookup(srvLookups []cmd.ServiceDomain, dnsAuthority string, frequency time.Duration, ring *redis.Ring, logger blog.Logger, stats prometheus.Registerer) (*lookup, error) {
55 updateFrequency := frequency
56 if updateFrequency <= 0 {
57
58 updateFrequency = 30 * time.Second
59 }
60
61 updateTimeout := updateFrequency - updateFrequency/10
62
63 lookup := &lookup{
64 srvLookups: srvLookups,
65 ring: ring,
66 logger: logger,
67 stats: stats,
68 updateFrequency: updateFrequency,
69 updateTimeout: updateTimeout,
70 dnsAuthority: dnsAuthority,
71 }
72
73 if dnsAuthority == "" {
74
75 lookup.resolver = net.DefaultResolver
76 } else {
77
78 host, port, err := net.SplitHostPort(dnsAuthority)
79 if err != nil {
80
81 host = dnsAuthority
82 port = "53"
83 }
84 lookup.dnsAuthority = net.JoinHostPort(host, port)
85 lookup.resolver = &net.Resolver{
86 PreferGo: true,
87 Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
88
89
90 return net.Dial(network, lookup.dnsAuthority)
91 },
92 }
93 }
94
95 ctx, cancel := context.WithTimeout(context.Background(), updateTimeout)
96 defer cancel()
97 tempErr, nonTempErr := lookup.updateNow(ctx)
98 if tempErr != nil {
99
100
101 logger.Warningf("resolving ring shards: %s", tempErr)
102 }
103 if nonTempErr != nil && errors.Is(nonTempErr, ErrNoShardsResolved) {
104
105
106 return nil, nonTempErr
107 }
108
109 return lookup, nil
110 }
111
112
113
114
115
116
117
118 func (look *lookup) updateNow(ctx context.Context) (tempError, nonTempError error) {
119 var tempErrs []error
120 handleDNSError := func(err error, srv cmd.ServiceDomain) {
121 var dnsErr *net.DNSError
122 if errors.As(err, &dnsErr) && (dnsErr.IsTimeout || dnsErr.IsTemporary) {
123 tempErrs = append(tempErrs, err)
124 return
125 }
126
127
128 look.logger.Errf("resolving service _%s._tcp.%s: %s", srv.Service, srv.Domain, err)
129 }
130
131 nextAddrs := make(map[string]string)
132 for _, srv := range look.srvLookups {
133 _, targets, err := look.resolver.LookupSRV(ctx, srv.Service, "tcp", srv.Domain)
134 if err != nil {
135 handleDNSError(err, srv)
136
137 continue
138 }
139 if len(targets) <= 0 {
140 tempErrs = append(tempErrs, fmt.Errorf("0 targets resolved for service \"_%s._tcp.%s\"", srv.Service, srv.Domain))
141
142 continue
143 }
144
145 for _, target := range targets {
146 host := strings.TrimRight(target.Target, ".")
147 if look.dnsAuthority != "" {
148
149
150 hostAddrs, err := look.resolver.LookupHost(ctx, host)
151 if err != nil {
152 handleDNSError(err, srv)
153
154 continue
155 }
156 if len(hostAddrs) <= 0 {
157 tempErrs = append(tempErrs, fmt.Errorf("0 addrs resolved for target %q of service \"_%s._tcp.%s\"", host, srv.Service, srv.Domain))
158
159 continue
160 }
161
162 host = hostAddrs[0]
163 }
164 addr := fmt.Sprintf("%s:%d", host, target.Port)
165 nextAddrs[addr] = addr
166 }
167 }
168
169
170 if len(nextAddrs) <= 0 {
171 return errors.Join(tempErrs...), ErrNoShardsResolved
172 }
173
174
175 look.ring.SetAddrs(nextAddrs)
176
177
178 MustRegisterClientMetricsCollector(look.ring, look.stats, nextAddrs, look.ring.Options().Username)
179
180 return nil, nil
181 }
182
183
184
185 func (look *lookup) start() {
186 var lookupCtx context.Context
187 lookupCtx, look.stop = context.WithCancel(context.Background())
188 go func() {
189 ticker := time.NewTicker(look.updateFrequency)
190 defer ticker.Stop()
191 for {
192
193 if lookupCtx.Err() != nil {
194 return
195 }
196
197 timeoutCtx, cancel := context.WithTimeout(lookupCtx, look.updateTimeout)
198 tempErrs, nonTempErrs := look.updateNow(timeoutCtx)
199 cancel()
200 if tempErrs != nil {
201 look.logger.Warningf("resolving ring shards, temporary errors: %s", tempErrs)
202 continue
203 }
204 if nonTempErrs != nil {
205 look.logger.Errf("resolving ring shards, non-temporary errors: %s", nonTempErrs)
206 continue
207 }
208
209 select {
210 case <-ticker.C:
211 continue
212
213 case <-lookupCtx.Done():
214 return
215 }
216 }
217 }()
218 }
219
View as plain text