1
2
3
4 package etcdv3
5
6 import (
7 "context"
8 "io"
9 "os"
10 "testing"
11 "time"
12
13 "github.com/go-kit/kit/endpoint"
14 "github.com/go-kit/kit/sd"
15 "github.com/go-kit/log"
16 )
17
18 func runIntegration(settings integrationSettings, client Client, service Service, t *testing.T) {
19
20 entries, err := client.GetEntries(settings.key)
21 if err != nil {
22 t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
23 }
24 if len(entries) > 0 {
25 t.Fatalf("GetEntries(%q): expected no instance entries, got %d", settings.key, len(entries))
26 }
27 t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)
28
29
30 registrar := NewRegistrar(
31 client,
32 service,
33 log.With(log.NewLogfmtLogger(os.Stderr), "component", "registrar"),
34 )
35
36
37 registrar.Register()
38 t.Log("Registered")
39
40
41 entries, err = client.GetEntries(settings.key)
42 if err != nil {
43 t.Fatalf("client.GetEntries(%q): %v", settings.key, err)
44 }
45 if want, have := 1, len(entries); want != have {
46 t.Fatalf("client.GetEntries(%q): want %d, have %d", settings.key, want, have)
47 }
48 if want, have := settings.value, entries[0]; want != have {
49 t.Fatalf("want %q, have %q", want, have)
50 }
51
52 instancer, err := NewInstancer(
53 client,
54 settings.prefix,
55 log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"),
56 )
57 if err != nil {
58 t.Fatalf("NewInstancer: %v", err)
59 }
60 t.Log("Constructed Instancer OK")
61 defer instancer.Stop()
62
63 endpointer := sd.NewEndpointer(
64 instancer,
65 func(string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, nil, nil },
66 log.With(log.NewLogfmtLogger(os.Stderr), "component", "instancer"),
67 )
68 t.Log("Constructed Endpointer OK")
69 defer endpointer.Close()
70
71 if !within(time.Second, func() bool {
72 endpoints, err := endpointer.Endpoints()
73 return err == nil && len(endpoints) == 1
74 }) {
75 t.Fatal("Endpointer didn't see Register in time")
76 }
77 t.Log("Endpointer saw Register OK")
78
79
80 registrar.Deregister()
81 t.Log("Deregistered")
82
83
84 if !within(time.Second, func() bool {
85 endpoints, err := endpointer.Endpoints()
86 t.Logf("Checking Deregister: len(endpoints) = %d, err = %v", len(endpoints), err)
87 return err == nil && len(endpoints) == 0
88 }) {
89 t.Fatalf("Endpointer didn't see Deregister in time")
90 }
91
92
93 entries, err = client.GetEntries(settings.key)
94 if err != nil {
95 t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
96 }
97 if len(entries) > 0 {
98 t.Fatalf("GetEntries(%q): expected no entries, got %v", settings.key, entries)
99 }
100 t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)
101 }
102
103 type integrationSettings struct {
104 addr string
105 prefix string
106 instance string
107 key string
108 value string
109 }
110
111 func testIntegrationSettings(t *testing.T) integrationSettings {
112 var settings integrationSettings
113
114 settings.addr = os.Getenv("ETCD_ADDR")
115 if settings.addr == "" {
116 t.Skip("ETCD_ADDR not set; skipping integration test")
117 }
118
119 settings.prefix = "/services/foosvc/"
120 settings.instance = "1.2.3.4:8080"
121 settings.key = settings.prefix + settings.instance
122 settings.value = "http://" + settings.instance
123
124 return settings
125 }
126
127
128
129
130 func TestIntegration(t *testing.T) {
131 settings := testIntegrationSettings(t)
132 client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
133 DialTimeout: 2 * time.Second,
134 DialKeepAlive: 2 * time.Second,
135 })
136 if err != nil {
137 t.Fatalf("NewClient(%q): %v", settings.addr, err)
138 }
139
140 service := Service{
141 Key: settings.key,
142 Value: settings.value,
143 }
144
145 runIntegration(settings, client, service, t)
146 }
147
148 func TestIntegrationTTL(t *testing.T) {
149 settings := testIntegrationSettings(t)
150 client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
151 DialTimeout: 2 * time.Second,
152 DialKeepAlive: 2 * time.Second,
153 })
154 if err != nil {
155 t.Fatalf("NewClient(%q): %v", settings.addr, err)
156 }
157
158 service := Service{
159 Key: settings.key,
160 Value: settings.value,
161 TTL: NewTTLOption(time.Second*3, time.Second*10),
162 }
163 defer client.Deregister(service)
164
165 runIntegration(settings, client, service, t)
166 }
167
168 func TestIntegrationRegistrarOnly(t *testing.T) {
169 settings := testIntegrationSettings(t)
170 client, err := NewClient(context.Background(), []string{settings.addr}, ClientOptions{
171 DialTimeout: 2 * time.Second,
172 DialKeepAlive: 2 * time.Second,
173 })
174 if err != nil {
175 t.Fatalf("NewClient(%q): %v", settings.addr, err)
176 }
177
178 service := Service{
179 Key: settings.key,
180 Value: settings.value,
181 TTL: NewTTLOption(time.Second*3, time.Second*10),
182 }
183 defer client.Deregister(service)
184
185
186 entries, err := client.GetEntries(settings.key)
187 if err != nil {
188 t.Fatalf("GetEntries(%q): expected no error, got one: %v", settings.key, err)
189 }
190 if len(entries) > 0 {
191 t.Fatalf("GetEntries(%q): expected no instance entries, got %d", settings.key, len(entries))
192 }
193 t.Logf("GetEntries(%q): %v (OK)", settings.key, entries)
194
195
196 registrar := NewRegistrar(
197 client,
198 service,
199 log.With(log.NewLogfmtLogger(os.Stderr), "component", "registrar"),
200 )
201
202
203 registrar.Register()
204 t.Log("Registered")
205
206
207 registrar.Deregister()
208 t.Log("Deregistered")
209
210 }
211
212 func within(d time.Duration, f func() bool) bool {
213 deadline := time.Now().Add(d)
214 for time.Now().Before(deadline) {
215 if f() {
216 return true
217 }
218 time.Sleep(d / 10)
219 }
220 return false
221 }
222
View as plain text