...

Source file src/go.etcd.io/etcd/server/v3/mvcc/watchable_store_test.go

Documentation: go.etcd.io/etcd/server/v3/mvcc

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package mvcc
    16  
    17  import (
    18  	"bytes"
    19  	"fmt"
    20  	"os"
    21  	"reflect"
    22  	"sync"
    23  	"testing"
    24  	"time"
    25  
    26  	"github.com/stretchr/testify/require"
    27  	"go.uber.org/zap"
    28  	"go.uber.org/zap/zaptest"
    29  
    30  	"go.etcd.io/etcd/api/v3/mvccpb"
    31  	"go.etcd.io/etcd/pkg/v3/traceutil"
    32  	"go.etcd.io/etcd/server/v3/lease"
    33  	betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
    34  )
    35  
    36  func TestWatch(t *testing.T) {
    37  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
    38  	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
    39  
    40  	defer func() {
    41  		s.store.Close()
    42  		os.Remove(tmpPath)
    43  	}()
    44  
    45  	testKey := []byte("foo")
    46  	testValue := []byte("bar")
    47  	s.Put(testKey, testValue, lease.NoLease)
    48  
    49  	w := s.NewWatchStream()
    50  	w.Watch(0, testKey, nil, 0)
    51  
    52  	if !s.synced.contains(string(testKey)) {
    53  		// the key must have had an entry in synced
    54  		t.Errorf("existence = false, want true")
    55  	}
    56  }
    57  
    58  func TestNewWatcherCancel(t *testing.T) {
    59  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
    60  	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
    61  
    62  	defer func() {
    63  		s.store.Close()
    64  		os.Remove(tmpPath)
    65  	}()
    66  	testKey := []byte("foo")
    67  	testValue := []byte("bar")
    68  	s.Put(testKey, testValue, lease.NoLease)
    69  
    70  	w := s.NewWatchStream()
    71  	wt, _ := w.Watch(0, testKey, nil, 0)
    72  
    73  	if err := w.Cancel(wt); err != nil {
    74  		t.Error(err)
    75  	}
    76  
    77  	if s.synced.contains(string(testKey)) {
    78  		// the key shoud have been deleted
    79  		t.Errorf("existence = true, want false")
    80  	}
    81  }
    82  
    83  // TestCancelUnsynced tests if running CancelFunc removes watchers from unsynced.
    84  func TestCancelUnsynced(t *testing.T) {
    85  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
    86  
    87  	// manually create watchableStore instead of newWatchableStore
    88  	// because newWatchableStore automatically calls syncWatchers
    89  	// method to sync watchers in unsynced map. We want to keep watchers
    90  	// in unsynced to test if syncWatchers works as expected.
    91  	s := &watchableStore{
    92  		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}),
    93  		unsynced: newWatcherGroup(),
    94  
    95  		// to make the test not crash from assigning to nil map.
    96  		// 'synced' doesn't get populated in this test.
    97  		synced: newWatcherGroup(),
    98  	}
    99  
   100  	defer func() {
   101  		s.store.Close()
   102  		os.Remove(tmpPath)
   103  	}()
   104  
   105  	// Put a key so that we can spawn watchers on that key.
   106  	// (testKey in this test). This increases the rev to 1,
   107  	// and later we can we set the watcher's startRev to 1,
   108  	// and force watchers to be in unsynced.
   109  	testKey := []byte("foo")
   110  	testValue := []byte("bar")
   111  	s.Put(testKey, testValue, lease.NoLease)
   112  
   113  	w := s.NewWatchStream()
   114  
   115  	// arbitrary number for watchers
   116  	watcherN := 100
   117  
   118  	// create watcherN of watch ids to cancel
   119  	watchIDs := make([]WatchID, watcherN)
   120  	for i := 0; i < watcherN; i++ {
   121  		// use 1 to keep watchers in unsynced
   122  		watchIDs[i], _ = w.Watch(0, testKey, nil, 1)
   123  	}
   124  
   125  	for _, idx := range watchIDs {
   126  		if err := w.Cancel(idx); err != nil {
   127  			t.Error(err)
   128  		}
   129  	}
   130  
   131  	// After running CancelFunc
   132  	//
   133  	// unsynced should be empty
   134  	// because cancel removes watcher from unsynced
   135  	if size := s.unsynced.size(); size != 0 {
   136  		t.Errorf("unsynced size = %d, want 0", size)
   137  	}
   138  }
   139  
   140  // TestSyncWatchers populates unsynced watcher map and tests syncWatchers
   141  // method to see if it correctly sends events to channel of unsynced watchers
   142  // and moves these watchers to synced.
   143  func TestSyncWatchers(t *testing.T) {
   144  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   145  
   146  	s := &watchableStore{
   147  		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}),
   148  		unsynced: newWatcherGroup(),
   149  		synced:   newWatcherGroup(),
   150  	}
   151  
   152  	defer func() {
   153  		s.store.Close()
   154  		os.Remove(tmpPath)
   155  	}()
   156  
   157  	testKey := []byte("foo")
   158  	testValue := []byte("bar")
   159  	s.Put(testKey, testValue, lease.NoLease)
   160  
   161  	w := s.NewWatchStream()
   162  
   163  	// arbitrary number for watchers
   164  	watcherN := 100
   165  
   166  	for i := 0; i < watcherN; i++ {
   167  		// specify rev as 1 to keep watchers in unsynced
   168  		w.Watch(0, testKey, nil, 1)
   169  	}
   170  
   171  	// Before running s.syncWatchers() synced should be empty because we manually
   172  	// populate unsynced only
   173  	sws := s.synced.watcherSetByKey(string(testKey))
   174  	uws := s.unsynced.watcherSetByKey(string(testKey))
   175  
   176  	if len(sws) != 0 {
   177  		t.Fatalf("synced[string(testKey)] size = %d, want 0", len(sws))
   178  	}
   179  	// unsynced should not be empty because we manually populated unsynced only
   180  	if len(uws) != watcherN {
   181  		t.Errorf("unsynced size = %d, want %d", len(uws), watcherN)
   182  	}
   183  
   184  	// this should move all unsynced watchers to synced ones
   185  	s.syncWatchers()
   186  
   187  	sws = s.synced.watcherSetByKey(string(testKey))
   188  	uws = s.unsynced.watcherSetByKey(string(testKey))
   189  
   190  	// After running s.syncWatchers(), synced should not be empty because syncwatchers
   191  	// populates synced in this test case
   192  	if len(sws) != watcherN {
   193  		t.Errorf("synced[string(testKey)] size = %d, want %d", len(sws), watcherN)
   194  	}
   195  
   196  	// unsynced should be empty because syncwatchers is expected to move all watchers
   197  	// from unsynced to synced in this test case
   198  	if len(uws) != 0 {
   199  		t.Errorf("unsynced size = %d, want 0", len(uws))
   200  	}
   201  
   202  	for w := range sws {
   203  		if w.minRev != s.Rev()+1 {
   204  			t.Errorf("w.minRev = %d, want %d", w.minRev, s.Rev()+1)
   205  		}
   206  	}
   207  
   208  	if len(w.(*watchStream).ch) != watcherN {
   209  		t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN)
   210  	}
   211  
   212  	evs := (<-w.(*watchStream).ch).Events
   213  	if len(evs) != 1 {
   214  		t.Errorf("len(evs) got = %d, want = 1", len(evs))
   215  	}
   216  	if evs[0].Type != mvccpb.PUT {
   217  		t.Errorf("got = %v, want = %v", evs[0].Type, mvccpb.PUT)
   218  	}
   219  	if !bytes.Equal(evs[0].Kv.Key, testKey) {
   220  		t.Errorf("got = %s, want = %s", evs[0].Kv.Key, testKey)
   221  	}
   222  	if !bytes.Equal(evs[0].Kv.Value, testValue) {
   223  		t.Errorf("got = %s, want = %s", evs[0].Kv.Value, testValue)
   224  	}
   225  }
   226  
   227  // TestWatchCompacted tests a watcher that watches on a compacted revision.
   228  func TestWatchCompacted(t *testing.T) {
   229  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   230  	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   231  
   232  	defer func() {
   233  		s.store.Close()
   234  		os.Remove(tmpPath)
   235  	}()
   236  	testKey := []byte("foo")
   237  	testValue := []byte("bar")
   238  
   239  	maxRev := 10
   240  	compactRev := int64(5)
   241  	for i := 0; i < maxRev; i++ {
   242  		s.Put(testKey, testValue, lease.NoLease)
   243  	}
   244  	_, err := s.Compact(traceutil.TODO(), compactRev)
   245  	if err != nil {
   246  		t.Fatalf("failed to compact kv (%v)", err)
   247  	}
   248  
   249  	w := s.NewWatchStream()
   250  	wt, _ := w.Watch(0, testKey, nil, compactRev-1)
   251  
   252  	select {
   253  	case resp := <-w.Chan():
   254  		if resp.WatchID != wt {
   255  			t.Errorf("resp.WatchID = %x, want %x", resp.WatchID, wt)
   256  		}
   257  		if resp.CompactRevision == 0 {
   258  			t.Errorf("resp.Compacted = %v, want %v", resp.CompactRevision, compactRev)
   259  		}
   260  	case <-time.After(1 * time.Second):
   261  		t.Fatalf("failed to receive response (timeout)")
   262  	}
   263  }
   264  
   265  func TestWatchNoEventLossOnCompact(t *testing.T) {
   266  	oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
   267  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   268  	lg := zaptest.NewLogger(t)
   269  	s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{})
   270  
   271  	defer func() {
   272  		cleanup(s, b, tmpPath)
   273  		chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
   274  	}()
   275  
   276  	chanBufLen, maxWatchersPerSync = 1, 4
   277  	testKey, testValue := []byte("foo"), []byte("bar")
   278  
   279  	maxRev := 10
   280  	compactRev := int64(5)
   281  	for i := 0; i < maxRev; i++ {
   282  		s.Put(testKey, testValue, lease.NoLease)
   283  	}
   284  	_, err := s.Compact(traceutil.TODO(), compactRev)
   285  	require.NoErrorf(t, err, "failed to compact kv (%v)", err)
   286  
   287  	w := s.NewWatchStream()
   288  	defer w.Close()
   289  
   290  	watchers := map[WatchID]int64{
   291  		0: 1,
   292  		1: 1, // create unsyncd watchers with startRev < compactRev
   293  		2: 6, // create unsyncd watchers with compactRev < startRev < currentRev
   294  	}
   295  	for id, startRev := range watchers {
   296  		_, err := w.Watch(id, testKey, nil, startRev)
   297  		require.NoError(t, err)
   298  	}
   299  	// fill up w.Chan() with 1 buf via 2 compacted watch response
   300  	s.syncWatchers()
   301  
   302  	for len(watchers) > 0 {
   303  		resp := <-w.Chan()
   304  		if resp.CompactRevision != 0 {
   305  			require.Equal(t, resp.CompactRevision, compactRev)
   306  			require.Contains(t, watchers, resp.WatchID)
   307  			delete(watchers, resp.WatchID)
   308  			continue
   309  		}
   310  		nextRev := watchers[resp.WatchID]
   311  		for _, ev := range resp.Events {
   312  			require.Equalf(t, nextRev, ev.Kv.ModRevision, "got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID)
   313  			nextRev++
   314  		}
   315  		if nextRev == s.rev()+1 {
   316  			delete(watchers, resp.WatchID)
   317  		}
   318  	}
   319  }
   320  
   321  func TestWatchFutureRev(t *testing.T) {
   322  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   323  	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   324  
   325  	defer func() {
   326  		s.store.Close()
   327  		os.Remove(tmpPath)
   328  	}()
   329  
   330  	testKey := []byte("foo")
   331  	testValue := []byte("bar")
   332  
   333  	w := s.NewWatchStream()
   334  	wrev := int64(10)
   335  	w.Watch(0, testKey, nil, wrev)
   336  
   337  	for i := 0; i < 10; i++ {
   338  		rev := s.Put(testKey, testValue, lease.NoLease)
   339  		if rev >= wrev {
   340  			break
   341  		}
   342  	}
   343  
   344  	select {
   345  	case resp := <-w.Chan():
   346  		if resp.Revision != wrev {
   347  			t.Fatalf("rev = %d, want %d", resp.Revision, wrev)
   348  		}
   349  		if len(resp.Events) != 1 {
   350  			t.Fatalf("failed to get events from the response")
   351  		}
   352  		if resp.Events[0].Kv.ModRevision != wrev {
   353  			t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, wrev)
   354  		}
   355  	case <-time.After(time.Second):
   356  		t.Fatal("failed to receive event in 1 second.")
   357  	}
   358  }
   359  
   360  func TestWatchRestore(t *testing.T) {
   361  	test := func(delay time.Duration) func(t *testing.T) {
   362  		return func(t *testing.T) {
   363  			b, tmpPath := betesting.NewDefaultTmpBackend(t)
   364  			s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   365  			defer cleanup(s, b, tmpPath)
   366  
   367  			testKey := []byte("foo")
   368  			testValue := []byte("bar")
   369  			w := s.NewWatchStream()
   370  			defer w.Close()
   371  			w.Watch(0, testKey, nil, 1)
   372  
   373  			time.Sleep(delay)
   374  			wantRev := s.Put(testKey, testValue, lease.NoLease)
   375  
   376  			s.Restore(b)
   377  			events := readEventsForSecond(w.Chan())
   378  			if len(events) != 1 {
   379  				t.Errorf("Expected only one event, got %d", len(events))
   380  			}
   381  			if events[0].Kv.ModRevision != wantRev {
   382  				t.Errorf("Expected revision to match, got %d, want %d", events[0].Kv.ModRevision, wantRev)
   383  			}
   384  
   385  		}
   386  	}
   387  
   388  	t.Run("Normal", test(0))
   389  	t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120)) // longer than default waitDuration
   390  }
   391  
   392  func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) {
   393  	for {
   394  		select {
   395  		case resp := <-ws:
   396  			events = append(events, resp.Events...)
   397  		case <-time.After(time.Second):
   398  			return events
   399  		}
   400  	}
   401  }
   402  
   403  // TestWatchRestoreSyncedWatcher tests such a case that:
   404  //  1. watcher is created with a future revision "math.MaxInt64 - 2"
   405  //  2. watcher with a future revision is added to "synced" watcher group
   406  //  3. restore/overwrite storage with snapshot of a higher lasat revision
   407  //  4. restore operation moves "synced" to "unsynced" watcher group
   408  //  5. choose the watcher from step 1, without panic
   409  func TestWatchRestoreSyncedWatcher(t *testing.T) {
   410  	b1, b1Path := betesting.NewDefaultTmpBackend(t)
   411  	s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, StoreConfig{})
   412  	defer cleanup(s1, b1, b1Path)
   413  
   414  	b2, b2Path := betesting.NewDefaultTmpBackend(t)
   415  	s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, StoreConfig{})
   416  	defer cleanup(s2, b2, b2Path)
   417  
   418  	testKey, testValue := []byte("foo"), []byte("bar")
   419  	rev := s1.Put(testKey, testValue, lease.NoLease)
   420  	startRev := rev + 2
   421  
   422  	// create a watcher with a future revision
   423  	// add to "synced" watcher group (startRev > s.store.currentRev)
   424  	w1 := s1.NewWatchStream()
   425  	w1.Watch(0, testKey, nil, startRev)
   426  
   427  	// make "s2" ends up with a higher last revision
   428  	s2.Put(testKey, testValue, lease.NoLease)
   429  	s2.Put(testKey, testValue, lease.NoLease)
   430  
   431  	// overwrite storage with higher revisions
   432  	if err := s1.Restore(b2); err != nil {
   433  		t.Fatal(err)
   434  	}
   435  
   436  	// wait for next "syncWatchersLoop" iteration
   437  	// and the unsynced watcher should be chosen
   438  	time.Sleep(2 * time.Second)
   439  
   440  	// trigger events for "startRev"
   441  	s1.Put(testKey, testValue, lease.NoLease)
   442  
   443  	select {
   444  	case resp := <-w1.Chan():
   445  		if resp.Revision != startRev {
   446  			t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
   447  		}
   448  		if len(resp.Events) != 1 {
   449  			t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
   450  		}
   451  		if resp.Events[0].Kv.ModRevision != startRev {
   452  			t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
   453  		}
   454  	case <-time.After(time.Second):
   455  		t.Fatal("failed to receive event in 1 second")
   456  	}
   457  }
   458  
   459  // TestWatchBatchUnsynced tests batching on unsynced watchers
   460  func TestWatchBatchUnsynced(t *testing.T) {
   461  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   462  	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   463  
   464  	oldMaxRevs := watchBatchMaxRevs
   465  	defer func() {
   466  		watchBatchMaxRevs = oldMaxRevs
   467  		s.store.Close()
   468  		os.Remove(tmpPath)
   469  	}()
   470  	batches := 3
   471  	watchBatchMaxRevs = 4
   472  
   473  	v := []byte("foo")
   474  	for i := 0; i < watchBatchMaxRevs*batches; i++ {
   475  		s.Put(v, v, lease.NoLease)
   476  	}
   477  
   478  	w := s.NewWatchStream()
   479  	w.Watch(0, v, nil, 1)
   480  	for i := 0; i < batches; i++ {
   481  		if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs {
   482  			t.Fatalf("len(events) = %d, want %d", len(resp.Events), watchBatchMaxRevs)
   483  		}
   484  	}
   485  
   486  	s.store.revMu.Lock()
   487  	defer s.store.revMu.Unlock()
   488  	if size := s.synced.size(); size != 1 {
   489  		t.Errorf("synced size = %d, want 1", size)
   490  	}
   491  }
   492  
   493  func TestNewMapwatcherToEventMap(t *testing.T) {
   494  	k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2")
   495  	v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2")
   496  
   497  	ws := []*watcher{{key: k0}, {key: k1}, {key: k2}}
   498  
   499  	evs := []mvccpb.Event{
   500  		{
   501  			Type: mvccpb.PUT,
   502  			Kv:   &mvccpb.KeyValue{Key: k0, Value: v0},
   503  		},
   504  		{
   505  			Type: mvccpb.PUT,
   506  			Kv:   &mvccpb.KeyValue{Key: k1, Value: v1},
   507  		},
   508  		{
   509  			Type: mvccpb.PUT,
   510  			Kv:   &mvccpb.KeyValue{Key: k2, Value: v2},
   511  		},
   512  	}
   513  
   514  	tests := []struct {
   515  		sync []*watcher
   516  		evs  []mvccpb.Event
   517  
   518  		wwe map[*watcher][]mvccpb.Event
   519  	}{
   520  		// no watcher in sync, some events should return empty wwe
   521  		{
   522  			nil,
   523  			evs,
   524  			map[*watcher][]mvccpb.Event{},
   525  		},
   526  
   527  		// one watcher in sync, one event that does not match the key of that
   528  		// watcher should return empty wwe
   529  		{
   530  			[]*watcher{ws[2]},
   531  			evs[:1],
   532  			map[*watcher][]mvccpb.Event{},
   533  		},
   534  
   535  		// one watcher in sync, one event that matches the key of that
   536  		// watcher should return wwe with that matching watcher
   537  		{
   538  			[]*watcher{ws[1]},
   539  			evs[1:2],
   540  			map[*watcher][]mvccpb.Event{
   541  				ws[1]: evs[1:2],
   542  			},
   543  		},
   544  
   545  		// two watchers in sync that watches two different keys, one event
   546  		// that matches the key of only one of the watcher should return wwe
   547  		// with the matching watcher
   548  		{
   549  			[]*watcher{ws[0], ws[2]},
   550  			evs[2:],
   551  			map[*watcher][]mvccpb.Event{
   552  				ws[2]: evs[2:],
   553  			},
   554  		},
   555  
   556  		// two watchers in sync that watches the same key, two events that
   557  		// match the keys should return wwe with those two watchers
   558  		{
   559  			[]*watcher{ws[0], ws[1]},
   560  			evs[:2],
   561  			map[*watcher][]mvccpb.Event{
   562  				ws[0]: evs[:1],
   563  				ws[1]: evs[1:2],
   564  			},
   565  		},
   566  	}
   567  
   568  	for i, tt := range tests {
   569  		wg := newWatcherGroup()
   570  		for _, w := range tt.sync {
   571  			wg.add(w)
   572  		}
   573  
   574  		gwe := newWatcherBatch(&wg, tt.evs)
   575  		if len(gwe) != len(tt.wwe) {
   576  			t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe))
   577  		}
   578  		// compare gwe and tt.wwe
   579  		for w, eb := range gwe {
   580  			if len(eb.evs) != len(tt.wwe[w]) {
   581  				t.Errorf("#%d: len(eb.evs) got = %d, want = %d", i, len(eb.evs), len(tt.wwe[w]))
   582  			}
   583  			if !reflect.DeepEqual(eb.evs, tt.wwe[w]) {
   584  				t.Errorf("#%d: reflect.DeepEqual events got = %v, want = true", i, false)
   585  			}
   586  		}
   587  	}
   588  }
   589  
   590  // TestWatchVictims tests that watchable store delivers watch events
   591  // when the watch channel is temporarily clogged with too many events.
   592  func TestWatchVictims(t *testing.T) {
   593  	oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
   594  
   595  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   596  	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   597  
   598  	defer func() {
   599  		s.store.Close()
   600  		os.Remove(tmpPath)
   601  		chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
   602  	}()
   603  
   604  	chanBufLen, maxWatchersPerSync = 1, 2
   605  	numPuts := chanBufLen * 64
   606  	testKey, testValue := []byte("foo"), []byte("bar")
   607  
   608  	var wg sync.WaitGroup
   609  	numWatches := maxWatchersPerSync * 128
   610  	errc := make(chan error, numWatches)
   611  	wg.Add(numWatches)
   612  	for i := 0; i < numWatches; i++ {
   613  		go func() {
   614  			w := s.NewWatchStream()
   615  			w.Watch(0, testKey, nil, 1)
   616  			defer func() {
   617  				w.Close()
   618  				wg.Done()
   619  			}()
   620  			tc := time.After(10 * time.Second)
   621  			evs, nextRev := 0, int64(2)
   622  			for evs < numPuts {
   623  				select {
   624  				case <-tc:
   625  					errc <- fmt.Errorf("time out")
   626  					return
   627  				case wr := <-w.Chan():
   628  					evs += len(wr.Events)
   629  					for _, ev := range wr.Events {
   630  						if ev.Kv.ModRevision != nextRev {
   631  							errc <- fmt.Errorf("expected rev=%d, got %d", nextRev, ev.Kv.ModRevision)
   632  							return
   633  						}
   634  						nextRev++
   635  					}
   636  					time.Sleep(time.Millisecond)
   637  				}
   638  			}
   639  			if evs != numPuts {
   640  				errc <- fmt.Errorf("expected %d events, got %d", numPuts, evs)
   641  				return
   642  			}
   643  			select {
   644  			case <-w.Chan():
   645  				errc <- fmt.Errorf("unexpected response")
   646  			default:
   647  			}
   648  		}()
   649  		time.Sleep(time.Millisecond)
   650  	}
   651  
   652  	var wgPut sync.WaitGroup
   653  	wgPut.Add(numPuts)
   654  	for i := 0; i < numPuts; i++ {
   655  		go func() {
   656  			defer wgPut.Done()
   657  			s.Put(testKey, testValue, lease.NoLease)
   658  		}()
   659  	}
   660  	wgPut.Wait()
   661  
   662  	wg.Wait()
   663  	select {
   664  	case err := <-errc:
   665  		t.Fatal(err)
   666  	default:
   667  	}
   668  }
   669  
   670  // TestStressWatchCancelClose tests closing a watch stream while
   671  // canceling its watches.
   672  func TestStressWatchCancelClose(t *testing.T) {
   673  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   674  	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   675  
   676  	defer func() {
   677  		s.store.Close()
   678  		os.Remove(tmpPath)
   679  	}()
   680  
   681  	testKey, testValue := []byte("foo"), []byte("bar")
   682  	var wg sync.WaitGroup
   683  	readyc := make(chan struct{})
   684  	wg.Add(100)
   685  	for i := 0; i < 100; i++ {
   686  		go func() {
   687  			defer wg.Done()
   688  			w := s.NewWatchStream()
   689  			ids := make([]WatchID, 10)
   690  			for i := range ids {
   691  				ids[i], _ = w.Watch(0, testKey, nil, 0)
   692  			}
   693  			<-readyc
   694  			wg.Add(1 + len(ids)/2)
   695  			for i := range ids[:len(ids)/2] {
   696  				go func(n int) {
   697  					defer wg.Done()
   698  					w.Cancel(ids[n])
   699  				}(i)
   700  			}
   701  			go func() {
   702  				defer wg.Done()
   703  				w.Close()
   704  			}()
   705  		}()
   706  	}
   707  
   708  	close(readyc)
   709  	for i := 0; i < 100; i++ {
   710  		s.Put(testKey, testValue, lease.NoLease)
   711  	}
   712  
   713  	wg.Wait()
   714  }
   715  

View as plain text