...

Source file src/github.com/datawire/ambassador/v2/cmd/entrypoint/fswatcher_test.go

Documentation: github.com/datawire/ambassador/v2/cmd/entrypoint

     1  package entrypoint_test
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"io/ioutil"
     8  	"os"
     9  	"path/filepath"
    10  	"runtime"
    11  	"sync"
    12  	"testing"
    13  	"time"
    14  
    15  	"github.com/stretchr/testify/assert"
    16  
    17  	"github.com/datawire/ambassador/v2/cmd/entrypoint"
    18  	"github.com/datawire/dlib/dgroup"
    19  	"github.com/datawire/dlib/dlog"
    20  )
    21  
    22  type fswMetadata struct {
    23  	t            *testing.T
    24  	fsw          *entrypoint.FSWatcher
    25  	dir          string
    26  	bootstrapped map[string]bool
    27  	updates      map[string]int
    28  	deletes      map[string]int
    29  	errorCount   int
    30  	mutex        sync.Mutex
    31  }
    32  
    33  func newMetadata(t *testing.T) (context.Context, *fswMetadata, error) {
    34  	ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
    35  	grp := dgroup.NewGroup(ctx, dgroup.GroupConfig{})
    36  	t.Cleanup(func() {
    37  		cancel()
    38  		assert.NoError(t, grp.Wait())
    39  	})
    40  	m := &fswMetadata{t: t}
    41  	m.bootstrapped = make(map[string]bool)
    42  	m.updates = make(map[string]int)
    43  	m.deletes = make(map[string]int)
    44  
    45  	var err error
    46  
    47  	m.dir, err = ioutil.TempDir("", "fswatcher_test")
    48  
    49  	if err != nil {
    50  		t.Errorf("could not create tempdir: %s", err)
    51  		return nil, nil, err
    52  	}
    53  
    54  	m.fsw, err = entrypoint.NewFSWatcher(ctx)
    55  	if err != nil {
    56  		t.Errorf("could not instantiate FSWatcher: %s", err)
    57  		return nil, nil, err
    58  	}
    59  	grp.Go("watch", func(ctx context.Context) error {
    60  		m.fsw.Run(ctx)
    61  		return nil
    62  	})
    63  
    64  	m.fsw.SetErrorHandler(m.errorHandler)
    65  
    66  	return ctx, m, nil
    67  }
    68  
    69  func (m *fswMetadata) done() {
    70  	// You would think that a call to os.RemoveAll() would suffice
    71  	// here, but nope. Turns out that on MacOS, at least, that won't
    72  	// guarantee that we get events for deleting all the files in the
    73  	// directory before the directory goes, and the test wants to see
    74  	// all the files get deleted. Sigh. So. Do it by hand.
    75  
    76  	files, err := ioutil.ReadDir(m.dir)
    77  
    78  	if err != nil {
    79  		m.t.Errorf("m.done: couldn't scan %s: %s", m.dir, err)
    80  		return
    81  	}
    82  
    83  	for _, file := range files {
    84  		path := filepath.Join(m.dir, file.Name())
    85  
    86  		err = os.Remove(path)
    87  
    88  		if err != nil {
    89  			m.t.Errorf("m.done: couldn't remove %s: %s", path, err)
    90  		}
    91  	}
    92  
    93  	// Sleep to make sure the file-deletion events get handled.
    94  	time.Sleep(250 * time.Millisecond)
    95  
    96  	// After scrapping the files, remove the directory too...
    97  	os.Remove(m.dir)
    98  
    99  	// ...and sleep once more to make sure the event for the directory
   100  	// deletion makes it through.
   101  	time.Sleep(250 * time.Millisecond)
   102  }
   103  
   104  // Error handler: just count errors received.
   105  func (m *fswMetadata) errorHandler(_ context.Context, err error) {
   106  	m.t.Logf("errorHandler: got %s", err)
   107  	m.mutex.Lock()
   108  	defer m.mutex.Unlock()
   109  	m.errorCount++
   110  	m.t.Logf("errorHandler: errorCount now %d", m.errorCount)
   111  }
   112  
   113  // Event handler: separately keep track of bootstrapped, updated, and deleted
   114  // for each distinct basename we see.
   115  func (m *fswMetadata) eventHandler(ctx context.Context, event entrypoint.FSWEvent) {
   116  	dir := filepath.Dir(event.Path)
   117  	base := filepath.Base(event.Path)
   118  
   119  	bstr := ""
   120  	if event.Bootstrap {
   121  		bstr = "B|"
   122  	}
   123  
   124  	opStr := fmt.Sprintf("%s %s%s", event.Time, bstr, event.Op)
   125  
   126  	m.t.Logf("eventHandler %s %s (dir %s)", opStr, base, dir)
   127  
   128  	if dir != m.dir {
   129  		m.t.Errorf("eventHandler: event for %s arrived, but we're watching %s", event.Path, m.dir)
   130  		return
   131  	}
   132  
   133  	if event.Bootstrap {
   134  		// Handle bootstrap events, which cannot be deletes.
   135  		if event.Op == entrypoint.FSWDelete {
   136  			m.t.Errorf("eventHandler: impossible bootstrap delete of %s arrived", event.Path)
   137  			return
   138  		}
   139  
   140  		// Not a delete, so remember that this was a bootstrapped file.
   141  		m.bootstrapped[base] = true
   142  	}
   143  
   144  	// Next, count updates and deletes.
   145  	which := m.updates
   146  
   147  	if event.Op == entrypoint.FSWDelete {
   148  		which = m.deletes
   149  	}
   150  
   151  	count, ok := which[base]
   152  
   153  	m.mutex.Lock()
   154  	defer m.mutex.Unlock()
   155  	if ok {
   156  		which[base] = count + 1
   157  	} else {
   158  		which[base] = 1
   159  	}
   160  }
   161  
   162  // Make sure that per-file stats match what we expect.
   163  func (m *fswMetadata) check(key string, wantedBootstrap bool, wantedUpdates int, wantedDeletes int) {
   164  	bootstrapped, ok := m.bootstrapped[key]
   165  
   166  	if !ok {
   167  		m.t.Logf("%s bootstrapped: wanted %v, got nothing", key, wantedBootstrap)
   168  		bootstrapped = false
   169  	} else {
   170  		m.t.Logf("%s bootstrapped: wanted %v, got %v", key, wantedBootstrap, bootstrapped)
   171  	}
   172  
   173  	if bootstrapped != wantedBootstrap {
   174  		m.t.Errorf("%s bootstrapped: wanted %v, got %v", key, wantedBootstrap, bootstrapped)
   175  	}
   176  
   177  	m.mutex.Lock()
   178  	got, ok := m.updates[key]
   179  	m.mutex.Unlock()
   180  
   181  	if !ok {
   182  		m.t.Logf("%s updates: wanted %d, got nothing", key, wantedUpdates)
   183  		got = 0
   184  	} else {
   185  		m.t.Logf("%s updates: wanted %d, got %d", key, wantedUpdates, got)
   186  	}
   187  
   188  	if got != wantedUpdates {
   189  		m.t.Errorf("%s updates: wanted %d, got %d", key, wantedUpdates, got)
   190  	}
   191  
   192  	m.mutex.Lock()
   193  	got, ok = m.deletes[key]
   194  	m.mutex.Unlock()
   195  
   196  	if !ok {
   197  		m.t.Logf("%s deletes: wanted %d, got nothing", key, wantedDeletes)
   198  		got = 0
   199  	} else {
   200  		m.t.Logf("%s deletes: wanted %d, got %d", key, wantedDeletes, got)
   201  	}
   202  
   203  	if got != wantedDeletes {
   204  		m.t.Errorf("%s deletes: wanted %d, got %d", key, wantedDeletes, got)
   205  	}
   206  }
   207  
   208  // Make sure that the error count is what we expect.
   209  func (m *fswMetadata) checkErrors(wanted int) {
   210  	m.mutex.Lock()
   211  	defer m.mutex.Unlock()
   212  	m.t.Logf("checkErrors: wanted %d, have %d", wanted, m.errorCount)
   213  
   214  	if m.errorCount != wanted {
   215  		m.t.Errorf("errors: wanted %d, got %d", wanted, m.errorCount)
   216  	}
   217  }
   218  
   219  // Write a file, generating a certain number of Write events for it.
   220  func (m *fswMetadata) writeFile(name string, count int, slow bool) bool {
   221  	path := filepath.Join(m.dir, name)
   222  
   223  	f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0755)
   224  
   225  	if err != nil {
   226  		m.t.Errorf("could not open %s: %s", path, err)
   227  		return false
   228  	}
   229  
   230  	m.t.Logf("%s: opened %s", runtime.GOOS, path)
   231  
   232  	// If our caller wants slowness, give 'em slowness.
   233  	if slow {
   234  		time.Sleep(time.Second)
   235  	}
   236  
   237  	for i := 0; i < count; i++ {
   238  		m.t.Logf("writing chunk %d of %s", i, path)
   239  
   240  		_, err = f.WriteString("contents!\n")
   241  
   242  		if err != nil {
   243  			m.t.Errorf("could not write chunk %d of %s: %s", i, path, err)
   244  			return false
   245  		}
   246  
   247  		m.t.Logf("syncing chunk %d of %s", i, path)
   248  
   249  		// Make sure to flush the file.
   250  		err = f.Sync()
   251  
   252  		if err != nil {
   253  			m.t.Errorf("could not sync chunk %d of %s: %s", i, path, err)
   254  			return false
   255  		}
   256  
   257  		// If our caller wants slowness, give 'em slowness.
   258  		if slow {
   259  			time.Sleep(time.Second)
   260  		}
   261  	}
   262  
   263  	err = f.Close()
   264  
   265  	if err != nil {
   266  		m.t.Errorf("could not close %s: %s", path, err)
   267  	}
   268  
   269  	m.t.Logf("closed %s", path)
   270  
   271  	return true
   272  }
   273  
   274  // Send an error, to test the error-handler path.
   275  //
   276  // XXX This is a pretty blatant hack, since we're just suborning an
   277  // implementation detail of the FSWatcher to do this. Oh well.
   278  func (m *fswMetadata) sendError() {
   279  	m.fsw.FSW.Errors <- errors.New("OH GOD AN ERROR")
   280  
   281  	// This seems necessary to give the goroutine running in the
   282  	// FSWatcher a chance to process the error before our caller
   283  	// tries to check things.
   284  	time.Sleep(250 * time.Millisecond)
   285  }
   286  
   287  func TestFSWatcherExtantFiles(t *testing.T) {
   288  	ctx, m, err := newMetadata(t)
   289  
   290  	if err != nil {
   291  		return
   292  	}
   293  
   294  	m.t.Logf("FSW initialized for ExtantFiles (%s)", m.dir)
   295  
   296  	defer m.done()
   297  
   298  	if !m.writeFile("f1", 1, false) {
   299  		return
   300  	}
   301  
   302  	if !m.writeFile("f2", 2, false) {
   303  		return
   304  	}
   305  
   306  	if !m.writeFile("f3", 3, false) {
   307  		return
   308  	}
   309  
   310  	assert.NoError(t, m.fsw.WatchDir(ctx, m.dir, m.eventHandler))
   311  
   312  	m.check("f1", true, 1, 0)
   313  	m.check("f2", true, 1, 0)
   314  	m.check("f3", true, 1, 0)
   315  
   316  	m.checkErrors(0)
   317  
   318  	m.sendError()
   319  
   320  	m.checkErrors(1)
   321  }
   322  
   323  func TestFSWatcherNoExtantFiles(t *testing.T) {
   324  	ctx, m, err := newMetadata(t)
   325  
   326  	if err != nil {
   327  		return
   328  	}
   329  
   330  	m.t.Logf("FSW initialized for NonExtantFiles (%s)", m.dir)
   331  
   332  	assert.NoError(t, m.fsw.WatchDir(ctx, m.dir, m.eventHandler))
   333  
   334  	if !m.writeFile("f1", 1, false) {
   335  		return
   336  	}
   337  
   338  	if !m.writeFile("f2", 2, false) {
   339  		return
   340  	}
   341  
   342  	if !m.writeFile("f3", 3, false) {
   343  		return
   344  	}
   345  
   346  	time.Sleep(1 * time.Second)
   347  
   348  	m.check("f1", false, 1, 0)
   349  	m.check("f2", false, 1, 0)
   350  	m.check("f3", false, 1, 0)
   351  
   352  	m.done()
   353  
   354  	time.Sleep(1 * time.Second)
   355  
   356  	m.check("f1", false, 1, 1)
   357  	m.check("f2", false, 1, 1)
   358  	m.check("f3", false, 1, 1)
   359  
   360  	m.checkErrors(0)
   361  }
   362  
   363  func TestFSWatcherSlow(t *testing.T) {
   364  	ctx, m, err := newMetadata(t)
   365  
   366  	if err != nil {
   367  		return
   368  	}
   369  
   370  	m.t.Logf("FSW initialized for NonExtantFiles (%s)", m.dir)
   371  
   372  	assert.NoError(t, m.fsw.WatchDir(ctx, m.dir, m.eventHandler))
   373  
   374  	if !m.writeFile("f1", 1, true) {
   375  		return
   376  	}
   377  
   378  	if !m.writeFile("f2", 2, true) {
   379  		return
   380  	}
   381  
   382  	if !m.writeFile("f3", 3, true) {
   383  		return
   384  	}
   385  
   386  	time.Sleep(1 * time.Second)
   387  
   388  	// Each of these should now register an event for creation, plus an
   389  	// event for each write.
   390  	m.check("f1", false, 2, 0)
   391  	m.check("f2", false, 3, 0)
   392  	m.check("f3", false, 4, 0)
   393  
   394  	m.done()
   395  
   396  	time.Sleep(1 * time.Second)
   397  
   398  	m.check("f1", false, 2, 1)
   399  	m.check("f2", false, 3, 1)
   400  	m.check("f3", false, 4, 1)
   401  
   402  	m.checkErrors(0)
   403  }
   404  

View as plain text