...
1 package sd
2
3 import (
4 "time"
5
6 "github.com/go-kit/kit/endpoint"
7 "github.com/go-kit/log"
8 )
9
10
11
12
13
14 type Endpointer interface {
15 Endpoints() ([]endpoint.Endpoint, error)
16 }
17
18
19 type FixedEndpointer []endpoint.Endpoint
20
21
22 func (s FixedEndpointer) Endpoints() ([]endpoint.Endpoint, error) { return s, nil }
23
24
25
26
27
28 func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer {
29 opts := endpointerOptions{}
30 for _, opt := range options {
31 opt(&opts)
32 }
33 se := &DefaultEndpointer{
34 cache: newEndpointCache(f, logger, opts),
35 instancer: src,
36 ch: make(chan Event),
37 }
38 go se.receive()
39 src.Register(se.ch)
40 return se
41 }
42
43
44 type EndpointerOption func(*endpointerOptions)
45
46
47
48
49
50
51
52
53 func InvalidateOnError(timeout time.Duration) EndpointerOption {
54 return func(opts *endpointerOptions) {
55 opts.invalidateOnError = true
56 opts.invalidateTimeout = timeout
57 }
58 }
59
60 type endpointerOptions struct {
61 invalidateOnError bool
62 invalidateTimeout time.Duration
63 }
64
65
66
67
68
69 type DefaultEndpointer struct {
70 cache *endpointCache
71 instancer Instancer
72 ch chan Event
73 }
74
75 func (de *DefaultEndpointer) receive() {
76 for event := range de.ch {
77 de.cache.Update(event)
78 }
79 }
80
81
82 func (de *DefaultEndpointer) Close() {
83 de.instancer.Deregister(de.ch)
84 close(de.ch)
85 }
86
87
88 func (de *DefaultEndpointer) Endpoints() ([]endpoint.Endpoint, error) {
89 return de.cache.Endpoints()
90 }
91
View as plain text