...
1 package eureka
2
3 import (
4 "fmt"
5
6 "github.com/hudl/fargo"
7
8 "github.com/go-kit/kit/sd"
9 "github.com/go-kit/kit/sd/internal/instance"
10 "github.com/go-kit/log"
11 )
12
13
14
15 type Instancer struct {
16 cache *instance.Cache
17 conn fargoConnection
18 app string
19 logger log.Logger
20 quitc chan chan struct{}
21 }
22
23
24
25 func NewInstancer(conn fargoConnection, app string, logger log.Logger) *Instancer {
26 logger = log.With(logger, "app", app)
27
28 s := &Instancer{
29 cache: instance.NewCache(),
30 conn: conn,
31 app: app,
32 logger: logger,
33 quitc: make(chan chan struct{}),
34 }
35
36 done := make(chan struct{})
37 updates := conn.ScheduleAppUpdates(app, true, done)
38 s.consume(<-updates)
39 go s.loop(updates, done)
40 return s
41 }
42
43
44 func (s *Instancer) Stop() {
45 q := make(chan struct{})
46 s.quitc <- q
47 <-q
48 s.quitc = nil
49 }
50
51 func (s *Instancer) consume(update fargo.AppUpdate) {
52 if update.Err != nil {
53 s.logger.Log("during", "Update", "err", update.Err)
54 s.cache.Update(sd.Event{Err: update.Err})
55 return
56 }
57 instances := convertFargoAppToInstances(update.App)
58 s.logger.Log("instances", len(instances))
59 s.cache.Update(sd.Event{Instances: instances})
60 }
61
62 func (s *Instancer) loop(updates <-chan fargo.AppUpdate, done chan<- struct{}) {
63 defer close(done)
64
65 for {
66 select {
67 case update := <-updates:
68 s.consume(update)
69 case q := <-s.quitc:
70 close(q)
71 return
72 }
73 }
74 }
75
76 func (s *Instancer) getInstances() ([]string, error) {
77 app, err := s.conn.GetApp(s.app)
78 if err != nil {
79 return nil, err
80 }
81 return convertFargoAppToInstances(app), nil
82 }
83
84 func convertFargoAppToInstances(app *fargo.Application) []string {
85 instances := make([]string, len(app.Instances))
86 for i, inst := range app.Instances {
87 instances[i] = fmt.Sprintf("%s:%d", inst.IPAddr, inst.Port)
88 }
89 return instances
90 }
91
92
93 func (s *Instancer) Register(ch chan<- sd.Event) {
94 s.cache.Register(ch)
95 }
96
97
98 func (s *Instancer) Deregister(ch chan<- sd.Event) {
99 s.cache.Deregister(ch)
100 }
101
102
103 func (s *Instancer) state() sd.Event {
104 return s.cache.State()
105 }
106
View as plain text