...
1 package eureka
2
3 import (
4 "fmt"
5 "net/http"
6 "sync"
7 "time"
8
9 "github.com/hudl/fargo"
10
11 "github.com/go-kit/kit/sd"
12 "github.com/go-kit/log"
13 )
14
15
16 const defaultRenewalInterval = 30 * time.Second
17
18
19 type fargoConnection interface {
20 RegisterInstance(instance *fargo.Instance) error
21 DeregisterInstance(instance *fargo.Instance) error
22 ReregisterInstance(instance *fargo.Instance) error
23 HeartBeatInstance(instance *fargo.Instance) error
24 ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
25 GetApp(name string) (*fargo.Application, error)
26 }
27
28 type fargoUnsuccessfulHTTPResponse struct {
29 statusCode int
30 messagePrefix string
31 }
32
33 func (u *fargoUnsuccessfulHTTPResponse) Error() string {
34 return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
35 }
36
37
38 type Registrar struct {
39 conn fargoConnection
40 instance *fargo.Instance
41 logger log.Logger
42 quitc chan chan struct{}
43 sync.Mutex
44 }
45
46 var _ sd.Registrar = (*Registrar)(nil)
47
48
49
50 func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
51 return &Registrar{
52 conn: conn,
53 instance: instance,
54 logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
55 }
56 }
57
58
59 func (r *Registrar) Register() {
60 r.Lock()
61 defer r.Unlock()
62
63 if r.quitc != nil {
64 return
65 }
66
67 if err := r.conn.RegisterInstance(r.instance); err != nil {
68 r.logger.Log("during", "Register", "err", err)
69 }
70
71 r.quitc = make(chan chan struct{})
72 go r.loop()
73 }
74
75
76 func (r *Registrar) Deregister() {
77 r.Lock()
78 defer r.Unlock()
79
80 if r.quitc == nil {
81 return
82 }
83
84 q := make(chan struct{})
85 r.quitc <- q
86 <-q
87 r.quitc = nil
88 }
89
90 func (r *Registrar) loop() {
91 var renewalInterval time.Duration
92 if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
93 renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
94 } else {
95 renewalInterval = defaultRenewalInterval
96 }
97 ticker := time.NewTicker(renewalInterval)
98 defer ticker.Stop()
99
100 for {
101 select {
102 case <-ticker.C:
103 if err := r.heartbeat(); err != nil {
104 r.logger.Log("during", "heartbeat", "err", err)
105 }
106
107 case q := <-r.quitc:
108 if err := r.conn.DeregisterInstance(r.instance); err != nil {
109 r.logger.Log("during", "Deregister", "err", err)
110 }
111 close(q)
112 return
113 }
114 }
115 }
116
117 func httpResponseStatusCode(err error) (code int, present bool) {
118 if code, ok := fargo.HTTPResponseStatusCode(err); ok {
119 return code, true
120 }
121
122 if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
123 return u.statusCode, true
124 }
125 return 0, false
126 }
127
128 func isNotFound(err error) bool {
129 code, ok := httpResponseStatusCode(err)
130 return ok && code == http.StatusNotFound
131 }
132
133 func (r *Registrar) heartbeat() error {
134 err := r.conn.HeartBeatInstance(r.instance)
135 if err == nil {
136 return nil
137 }
138 if isNotFound(err) {
139
140 return r.conn.ReregisterInstance(r.instance)
141 }
142 return err
143 }
144
View as plain text