1 package entrypoint_test
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "io/ioutil"
8 "os"
9 "path/filepath"
10 "runtime"
11 "sync"
12 "testing"
13 "time"
14
15 "github.com/stretchr/testify/assert"
16
17 "github.com/datawire/dlib/dgroup"
18 "github.com/datawire/dlib/dlog"
19 "github.com/emissary-ingress/emissary/v3/cmd/entrypoint"
20 )
21
22 type fswMetadata struct {
23 t *testing.T
24 fsw *entrypoint.FSWatcher
25 dir string
26 bootstrapped map[string]bool
27 updates map[string]int
28 deletes map[string]int
29 errorCount int
30 mutex sync.Mutex
31 }
32
33 func newMetadata(t *testing.T) (context.Context, *fswMetadata, error) {
34 ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
35 grp := dgroup.NewGroup(ctx, dgroup.GroupConfig{})
36 t.Cleanup(func() {
37 cancel()
38 assert.NoError(t, grp.Wait())
39 })
40 m := &fswMetadata{t: t}
41 m.bootstrapped = make(map[string]bool)
42 m.updates = make(map[string]int)
43 m.deletes = make(map[string]int)
44
45 var err error
46
47 m.dir, err = ioutil.TempDir("", "fswatcher_test")
48
49 if err != nil {
50 t.Errorf("could not create tempdir: %s", err)
51 return nil, nil, err
52 }
53
54 m.fsw, err = entrypoint.NewFSWatcher(ctx)
55 if err != nil {
56 t.Errorf("could not instantiate FSWatcher: %s", err)
57 return nil, nil, err
58 }
59 grp.Go("watch", func(ctx context.Context) error {
60 m.fsw.Run(ctx)
61 return nil
62 })
63
64 m.fsw.SetErrorHandler(m.errorHandler)
65
66 return ctx, m, nil
67 }
68
69 func (m *fswMetadata) done() {
70
71
72
73
74
75
76 files, err := ioutil.ReadDir(m.dir)
77
78 if err != nil {
79 m.t.Errorf("m.done: couldn't scan %s: %s", m.dir, err)
80 return
81 }
82
83 for _, file := range files {
84 path := filepath.Join(m.dir, file.Name())
85
86 err = os.Remove(path)
87
88 if err != nil {
89 m.t.Errorf("m.done: couldn't remove %s: %s", path, err)
90 }
91 }
92
93
94 time.Sleep(250 * time.Millisecond)
95
96
97 os.Remove(m.dir)
98
99
100
101 time.Sleep(250 * time.Millisecond)
102 }
103
104
105 func (m *fswMetadata) errorHandler(_ context.Context, err error) {
106 m.t.Logf("errorHandler: got %s", err)
107 m.mutex.Lock()
108 defer m.mutex.Unlock()
109 m.errorCount++
110 m.t.Logf("errorHandler: errorCount now %d", m.errorCount)
111 }
112
113
114
115 func (m *fswMetadata) eventHandler(ctx context.Context, event entrypoint.FSWEvent) {
116 dir := filepath.Dir(event.Path)
117 base := filepath.Base(event.Path)
118
119 bstr := ""
120 if event.Bootstrap {
121 bstr = "B|"
122 }
123
124 opStr := fmt.Sprintf("%s %s%s", event.Time, bstr, event.Op)
125
126 m.t.Logf("eventHandler %s %s (dir %s)", opStr, base, dir)
127
128 if dir != m.dir {
129 m.t.Errorf("eventHandler: event for %s arrived, but we're watching %s", event.Path, m.dir)
130 return
131 }
132
133 if event.Bootstrap {
134
135 if event.Op == entrypoint.FSWDelete {
136 m.t.Errorf("eventHandler: impossible bootstrap delete of %s arrived", event.Path)
137 return
138 }
139
140
141 m.bootstrapped[base] = true
142 }
143
144
145 which := m.updates
146
147 if event.Op == entrypoint.FSWDelete {
148 which = m.deletes
149 }
150
151 count, ok := which[base]
152
153 m.mutex.Lock()
154 defer m.mutex.Unlock()
155 if ok {
156 which[base] = count + 1
157 } else {
158 which[base] = 1
159 }
160 }
161
162
163 func (m *fswMetadata) check(key string, wantedBootstrap bool, wantedUpdates int, wantedDeletes int) {
164 bootstrapped, ok := m.bootstrapped[key]
165
166 if !ok {
167 m.t.Logf("%s bootstrapped: wanted %v, got nothing", key, wantedBootstrap)
168 bootstrapped = false
169 } else {
170 m.t.Logf("%s bootstrapped: wanted %v, got %v", key, wantedBootstrap, bootstrapped)
171 }
172
173 if bootstrapped != wantedBootstrap {
174 m.t.Errorf("%s bootstrapped: wanted %v, got %v", key, wantedBootstrap, bootstrapped)
175 }
176
177 m.mutex.Lock()
178 got, ok := m.updates[key]
179 m.mutex.Unlock()
180
181 if !ok {
182 m.t.Logf("%s updates: wanted %d, got nothing", key, wantedUpdates)
183 got = 0
184 } else {
185 m.t.Logf("%s updates: wanted %d, got %d", key, wantedUpdates, got)
186 }
187
188 if got != wantedUpdates {
189 m.t.Errorf("%s updates: wanted %d, got %d", key, wantedUpdates, got)
190 }
191
192 m.mutex.Lock()
193 got, ok = m.deletes[key]
194 m.mutex.Unlock()
195
196 if !ok {
197 m.t.Logf("%s deletes: wanted %d, got nothing", key, wantedDeletes)
198 got = 0
199 } else {
200 m.t.Logf("%s deletes: wanted %d, got %d", key, wantedDeletes, got)
201 }
202
203 if got != wantedDeletes {
204 m.t.Errorf("%s deletes: wanted %d, got %d", key, wantedDeletes, got)
205 }
206 }
207
208
209 func (m *fswMetadata) checkErrors(wanted int) {
210 m.mutex.Lock()
211 defer m.mutex.Unlock()
212 m.t.Logf("checkErrors: wanted %d, have %d", wanted, m.errorCount)
213
214 if m.errorCount != wanted {
215 m.t.Errorf("errors: wanted %d, got %d", wanted, m.errorCount)
216 }
217 }
218
219
220 func (m *fswMetadata) writeFile(name string, count int, slow bool) bool {
221 path := filepath.Join(m.dir, name)
222
223 f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0755)
224
225 if err != nil {
226 m.t.Errorf("could not open %s: %s", path, err)
227 return false
228 }
229
230 m.t.Logf("%s: opened %s", runtime.GOOS, path)
231
232
233 if slow {
234 time.Sleep(time.Second)
235 }
236
237 for i := 0; i < count; i++ {
238 m.t.Logf("writing chunk %d of %s", i, path)
239
240 _, err = f.WriteString("contents!\n")
241
242 if err != nil {
243 m.t.Errorf("could not write chunk %d of %s: %s", i, path, err)
244 return false
245 }
246
247 m.t.Logf("syncing chunk %d of %s", i, path)
248
249
250 err = f.Sync()
251
252 if err != nil {
253 m.t.Errorf("could not sync chunk %d of %s: %s", i, path, err)
254 return false
255 }
256
257
258 if slow {
259 time.Sleep(time.Second)
260 }
261 }
262
263 err = f.Close()
264
265 if err != nil {
266 m.t.Errorf("could not close %s: %s", path, err)
267 }
268
269 m.t.Logf("closed %s", path)
270
271 return true
272 }
273
274
275
276
277
278 func (m *fswMetadata) sendError() {
279 m.fsw.FSW.Errors <- errors.New("OH GOD AN ERROR")
280
281
282
283
284 time.Sleep(250 * time.Millisecond)
285 }
286
287 func TestFSWatcherExtantFiles(t *testing.T) {
288 ctx, m, err := newMetadata(t)
289
290 if err != nil {
291 return
292 }
293
294 m.t.Logf("FSW initialized for ExtantFiles (%s)", m.dir)
295
296 defer m.done()
297
298 if !m.writeFile("f1", 1, false) {
299 return
300 }
301
302 if !m.writeFile("f2", 2, false) {
303 return
304 }
305
306 if !m.writeFile("f3", 3, false) {
307 return
308 }
309
310 assert.NoError(t, m.fsw.WatchDir(ctx, m.dir, m.eventHandler))
311
312 m.check("f1", true, 1, 0)
313 m.check("f2", true, 1, 0)
314 m.check("f3", true, 1, 0)
315
316 m.checkErrors(0)
317
318 m.sendError()
319
320 m.checkErrors(1)
321 }
322
323 func TestFSWatcherNoExtantFiles(t *testing.T) {
324 ctx, m, err := newMetadata(t)
325
326 if err != nil {
327 return
328 }
329
330 m.t.Logf("FSW initialized for NonExtantFiles (%s)", m.dir)
331
332 assert.NoError(t, m.fsw.WatchDir(ctx, m.dir, m.eventHandler))
333
334 if !m.writeFile("f1", 1, false) {
335 return
336 }
337
338 if !m.writeFile("f2", 2, false) {
339 return
340 }
341
342 if !m.writeFile("f3", 3, false) {
343 return
344 }
345
346 time.Sleep(1 * time.Second)
347
348 m.check("f1", false, 1, 0)
349 m.check("f2", false, 1, 0)
350 m.check("f3", false, 1, 0)
351
352 m.done()
353
354 time.Sleep(1 * time.Second)
355
356 m.check("f1", false, 1, 1)
357 m.check("f2", false, 1, 1)
358 m.check("f3", false, 1, 1)
359
360 m.checkErrors(0)
361 }
362
363 func TestFSWatcherSlow(t *testing.T) {
364 ctx, m, err := newMetadata(t)
365
366 if err != nil {
367 return
368 }
369
370 m.t.Logf("FSW initialized for NonExtantFiles (%s)", m.dir)
371
372 assert.NoError(t, m.fsw.WatchDir(ctx, m.dir, m.eventHandler))
373
374 if !m.writeFile("f1", 1, true) {
375 return
376 }
377
378 if !m.writeFile("f2", 2, true) {
379 return
380 }
381
382 if !m.writeFile("f3", 3, true) {
383 return
384 }
385
386 time.Sleep(1 * time.Second)
387
388
389
390 m.check("f1", false, 2, 0)
391 m.check("f2", false, 3, 0)
392 m.check("f3", false, 4, 0)
393
394 m.done()
395
396 time.Sleep(1 * time.Second)
397
398 m.check("f1", false, 2, 1)
399 m.check("f2", false, 3, 1)
400 m.check("f3", false, 4, 1)
401
402 m.checkErrors(0)
403 }
404
View as plain text