1 package eureka
2
3 import (
4 "errors"
5 "fmt"
6 "reflect"
7 "sync"
8 "time"
9
10 "github.com/go-kit/log"
11 "github.com/hudl/fargo"
12 )
13
14 type testConnection struct {
15 mu sync.RWMutex
16 instances []*fargo.Instance
17
18 errApplication error
19 errHeartbeat error
20 errRegister error
21 errDeregister error
22 }
23
24 var (
25 errTest = errors.New("kaboom")
26 errNotFound = &fargoUnsuccessfulHTTPResponse{statusCode: 404, messagePrefix: "not found"}
27 loggerTest = log.NewNopLogger()
28 appNameTest = "go-kit"
29 instanceTest1 = &fargo.Instance{
30 HostName: "serveregistrar1.acme.org",
31 Port: 8080,
32 App: appNameTest,
33 IPAddr: "192.168.0.1",
34 VipAddress: "192.168.0.1",
35 SecureVipAddress: "192.168.0.1",
36 HealthCheckUrl: "http://serveregistrar1.acme.org:8080/healthz",
37 StatusPageUrl: "http://serveregistrar1.acme.org:8080/status",
38 HomePageUrl: "http://serveregistrar1.acme.org:8080/",
39 Status: fargo.UP,
40 DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn},
41 LeaseInfo: fargo.LeaseInfo{RenewalIntervalInSecs: 1},
42 }
43 instanceTest2 = &fargo.Instance{
44 HostName: "serveregistrar2.acme.org",
45 Port: 8080,
46 App: appNameTest,
47 IPAddr: "192.168.0.2",
48 VipAddress: "192.168.0.2",
49 SecureVipAddress: "192.168.0.2",
50 HealthCheckUrl: "http://serveregistrar2.acme.org:8080/healthz",
51 StatusPageUrl: "http://serveregistrar2.acme.org:8080/status",
52 HomePageUrl: "http://serveregistrar2.acme.org:8080/",
53 Status: fargo.UP,
54 DataCenterInfo: fargo.DataCenterInfo{Name: fargo.MyOwn},
55 }
56 )
57
58 var _ fargoConnection = (*testConnection)(nil)
59
60 func (c *testConnection) RegisterInstance(i *fargo.Instance) error {
61 if c.errRegister != nil {
62 return c.errRegister
63 }
64 c.mu.Lock()
65 defer c.mu.Unlock()
66 for _, instance := range c.instances {
67 if reflect.DeepEqual(*instance, *i) {
68 return errors.New("already registered")
69 }
70 }
71 c.instances = append(c.instances, i)
72 return nil
73 }
74
75 func (c *testConnection) HeartBeatInstance(i *fargo.Instance) error {
76 return c.errHeartbeat
77 }
78
79 func (c *testConnection) DeregisterInstance(i *fargo.Instance) error {
80 if c.errDeregister != nil {
81 return c.errDeregister
82 }
83 c.mu.Lock()
84 defer c.mu.Unlock()
85 remaining := make([]*fargo.Instance, 0, len(c.instances))
86 for _, instance := range c.instances {
87 if reflect.DeepEqual(*instance, *i) {
88 continue
89 }
90 remaining = append(remaining, instance)
91 }
92 if len(remaining) == len(c.instances) {
93 return errors.New("not registered")
94 }
95 c.instances = remaining
96 return nil
97 }
98
99 func (c *testConnection) ReregisterInstance(ins *fargo.Instance) error {
100 return nil
101 }
102
103 func (c *testConnection) instancesForApplication(name string) []*fargo.Instance {
104 c.mu.RLock()
105 defer c.mu.RUnlock()
106 instances := make([]*fargo.Instance, 0, len(c.instances))
107 for _, i := range c.instances {
108 if i.App == name {
109 instances = append(instances, i)
110 }
111 }
112 return instances
113 }
114
115 func (c *testConnection) GetApp(name string) (*fargo.Application, error) {
116 if err := c.errApplication; err != nil {
117 return nil, err
118 }
119 instances := c.instancesForApplication(name)
120 if len(instances) == 0 {
121 return nil, fmt.Errorf("application not found for name=%s", name)
122 }
123 return &fargo.Application{Name: name, Instances: instances}, nil
124 }
125
126 func (c *testConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate {
127 updatec := make(chan fargo.AppUpdate, 1)
128 send := func() {
129 app, err := c.GetApp(name)
130 select {
131 case updatec <- fargo.AppUpdate{App: app, Err: err}:
132 default:
133 }
134 }
135
136 if await {
137 send()
138 }
139 go func() {
140 ticker := time.NewTicker(100 * time.Millisecond)
141 for {
142 select {
143 case <-ticker.C:
144 send()
145 case <-done:
146 ticker.Stop()
147 return
148 }
149 }
150 }()
151 return updatec
152 }
153
View as plain text