1 package consul
2
3 import (
4 "errors"
5 "fmt"
6 "time"
7
8 consul "github.com/hashicorp/consul/api"
9
10 "github.com/go-kit/kit/sd"
11 "github.com/go-kit/kit/sd/internal/instance"
12 "github.com/go-kit/kit/util/conn"
13 "github.com/go-kit/log"
14 )
15
16 const defaultIndex = 0
17
18
19 var errStopped = errors.New("quit and closed consul instancer")
20
21
22 type Instancer struct {
23 cache *instance.Cache
24 client Client
25 logger log.Logger
26 service string
27 tags []string
28 passingOnly bool
29 quitc chan struct{}
30 }
31
32
33
34
35 func NewInstancer(client Client, logger log.Logger, service string, tags []string, passingOnly bool) *Instancer {
36 s := &Instancer{
37 cache: instance.NewCache(),
38 client: client,
39 logger: log.With(logger, "service", service, "tags", fmt.Sprint(tags)),
40 service: service,
41 tags: tags,
42 passingOnly: passingOnly,
43 quitc: make(chan struct{}),
44 }
45
46 instances, index, err := s.getInstances(defaultIndex, nil)
47 if err == nil {
48 s.logger.Log("instances", len(instances))
49 } else {
50 s.logger.Log("err", err)
51 }
52
53 s.cache.Update(sd.Event{Instances: instances, Err: err})
54 go s.loop(index)
55 return s
56 }
57
58
59 func (s *Instancer) Stop() {
60 close(s.quitc)
61 }
62
63 func (s *Instancer) loop(lastIndex uint64) {
64 var (
65 instances []string
66 err error
67 d time.Duration = 10 * time.Millisecond
68 index uint64
69 )
70 for {
71 instances, index, err = s.getInstances(lastIndex, s.quitc)
72 switch {
73 case errors.Is(err, errStopped):
74 return
75 case err != nil:
76 s.logger.Log("err", err)
77 time.Sleep(d)
78 d = conn.Exponential(d)
79 s.cache.Update(sd.Event{Err: err})
80 case index == defaultIndex:
81 s.logger.Log("err", "index is not sane")
82 time.Sleep(d)
83 d = conn.Exponential(d)
84 case index < lastIndex:
85 s.logger.Log("err", "index is less than previous; resetting to default")
86 lastIndex = defaultIndex
87 time.Sleep(d)
88 d = conn.Exponential(d)
89 default:
90 lastIndex = index
91 s.cache.Update(sd.Event{Instances: instances})
92 d = 10 * time.Millisecond
93 }
94 }
95 }
96
97 func (s *Instancer) getInstances(lastIndex uint64, interruptc chan struct{}) ([]string, uint64, error) {
98 tag := ""
99 if len(s.tags) > 0 {
100 tag = s.tags[0]
101 }
102
103
104
105
106
107
108
109 type response struct {
110 instances []string
111 index uint64
112 }
113
114 var (
115 errc = make(chan error, 1)
116 resc = make(chan response, 1)
117 )
118
119 go func() {
120 entries, meta, err := s.client.Service(s.service, tag, s.passingOnly, &consul.QueryOptions{
121 WaitIndex: lastIndex,
122 })
123 if err != nil {
124 errc <- err
125 return
126 }
127 if len(s.tags) > 1 {
128 entries = filterEntries(entries, s.tags[1:]...)
129 }
130 resc <- response{
131 instances: makeInstances(entries),
132 index: meta.LastIndex,
133 }
134 }()
135
136 select {
137 case err := <-errc:
138 return nil, 0, err
139 case res := <-resc:
140 return res.instances, res.index, nil
141 case <-interruptc:
142 return nil, 0, errStopped
143 }
144 }
145
146
147 func (s *Instancer) Register(ch chan<- sd.Event) {
148 s.cache.Register(ch)
149 }
150
151
152 func (s *Instancer) Deregister(ch chan<- sd.Event) {
153 s.cache.Deregister(ch)
154 }
155
156 func filterEntries(entries []*consul.ServiceEntry, tags ...string) []*consul.ServiceEntry {
157 var es []*consul.ServiceEntry
158
159 ENTRIES:
160 for _, entry := range entries {
161 ts := make(map[string]struct{}, len(entry.Service.Tags))
162 for _, tag := range entry.Service.Tags {
163 ts[tag] = struct{}{}
164 }
165
166 for _, tag := range tags {
167 if _, ok := ts[tag]; !ok {
168 continue ENTRIES
169 }
170 }
171 es = append(es, entry)
172 }
173
174 return es
175 }
176
177 func makeInstances(entries []*consul.ServiceEntry) []string {
178 instances := make([]string, len(entries))
179 for i, entry := range entries {
180 addr := entry.Node.Address
181 if entry.Service.Address != "" {
182 addr = entry.Service.Address
183 }
184 instances[i] = fmt.Sprintf("%s:%d", addr, entry.Service.Port)
185 }
186 return instances
187 }
188
View as plain text