...
1 package dnssrv
2
3 import (
4 "errors"
5 "fmt"
6 "net"
7 "time"
8
9 "github.com/go-kit/kit/sd"
10 "github.com/go-kit/kit/sd/internal/instance"
11 "github.com/go-kit/log"
12 )
13
14
15
16
17 var ErrPortZero = errors.New("resolver returned SRV record with port 0")
18
19
20
21 type Instancer struct {
22 cache *instance.Cache
23 name string
24 logger log.Logger
25 quit chan struct{}
26 }
27
28
29 func NewInstancer(
30 name string,
31 ttl time.Duration,
32 logger log.Logger,
33 ) *Instancer {
34 return NewInstancerDetailed(name, time.NewTicker(ttl), net.LookupSRV, logger)
35 }
36
37
38
39
40 func NewInstancerDetailed(
41 name string,
42 refresh *time.Ticker,
43 lookup Lookup,
44 logger log.Logger,
45 ) *Instancer {
46 p := &Instancer{
47 cache: instance.NewCache(),
48 name: name,
49 logger: logger,
50 quit: make(chan struct{}),
51 }
52
53 instances, err := p.resolve(lookup)
54 if err == nil {
55 logger.Log("name", name, "instances", len(instances))
56 } else {
57 logger.Log("name", name, "err", err)
58 }
59 p.cache.Update(sd.Event{Instances: instances, Err: err})
60
61 go p.loop(refresh, lookup)
62 return p
63 }
64
65
66 func (in *Instancer) Stop() {
67 close(in.quit)
68 }
69
70 func (in *Instancer) loop(t *time.Ticker, lookup Lookup) {
71 defer t.Stop()
72 for {
73 select {
74 case <-t.C:
75 instances, err := in.resolve(lookup)
76 if err != nil {
77 in.logger.Log("name", in.name, "err", err)
78 in.cache.Update(sd.Event{Err: err})
79 continue
80 }
81 in.cache.Update(sd.Event{Instances: instances})
82
83 case <-in.quit:
84 return
85 }
86 }
87 }
88
89 func (in *Instancer) resolve(lookup Lookup) ([]string, error) {
90 _, addrs, err := lookup("", "", in.name)
91 if err != nil {
92 return nil, err
93 }
94 instances := make([]string, len(addrs))
95 for i, addr := range addrs {
96 if addr.Port == 0 {
97 return nil, ErrPortZero
98 }
99 instances[i] = net.JoinHostPort(addr.Target, fmt.Sprint(addr.Port))
100 }
101 return instances, nil
102 }
103
104
105 func (in *Instancer) Register(ch chan<- sd.Event) {
106 in.cache.Register(ch)
107 }
108
109
110 func (in *Instancer) Deregister(ch chan<- sd.Event) {
111 in.cache.Deregister(ch)
112 }
113
View as plain text