...

Source file src/go.etcd.io/etcd/server/v3/mvcc/watcher_test.go

Documentation: go.etcd.io/etcd/server/v3/mvcc

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package mvcc
    16  
    17  import (
    18  	"bytes"
    19  	"fmt"
    20  	"os"
    21  	"reflect"
    22  	"testing"
    23  	"time"
    24  
    25  	"go.uber.org/zap"
    26  	"go.uber.org/zap/zaptest"
    27  
    28  	"go.etcd.io/etcd/api/v3/mvccpb"
    29  	clientv3 "go.etcd.io/etcd/client/v3"
    30  	"go.etcd.io/etcd/server/v3/lease"
    31  	betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
    32  )
    33  
    34  // TestWatcherWatchID tests that each watcher provides unique watchID,
    35  // and the watched event attaches the correct watchID.
    36  func TestWatcherWatchID(t *testing.T) {
    37  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
    38  	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
    39  	defer cleanup(s, b, tmpPath)
    40  
    41  	w := s.NewWatchStream()
    42  	defer w.Close()
    43  
    44  	idm := make(map[WatchID]struct{})
    45  
    46  	for i := 0; i < 10; i++ {
    47  		id, _ := w.Watch(0, []byte("foo"), nil, 0)
    48  		if _, ok := idm[id]; ok {
    49  			t.Errorf("#%d: id %d exists", i, id)
    50  		}
    51  		idm[id] = struct{}{}
    52  
    53  		s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
    54  
    55  		resp := <-w.Chan()
    56  		if resp.WatchID != id {
    57  			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
    58  		}
    59  
    60  		if err := w.Cancel(id); err != nil {
    61  			t.Error(err)
    62  		}
    63  	}
    64  
    65  	s.Put([]byte("foo2"), []byte("bar"), lease.NoLease)
    66  
    67  	// unsynced watchers
    68  	for i := 10; i < 20; i++ {
    69  		id, _ := w.Watch(0, []byte("foo2"), nil, 1)
    70  		if _, ok := idm[id]; ok {
    71  			t.Errorf("#%d: id %d exists", i, id)
    72  		}
    73  		idm[id] = struct{}{}
    74  
    75  		resp := <-w.Chan()
    76  		if resp.WatchID != id {
    77  			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
    78  		}
    79  
    80  		if err := w.Cancel(id); err != nil {
    81  			t.Error(err)
    82  		}
    83  	}
    84  }
    85  
    86  func TestWatcherRequestsCustomID(t *testing.T) {
    87  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
    88  	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
    89  	defer cleanup(s, b, tmpPath)
    90  
    91  	w := s.NewWatchStream()
    92  	defer w.Close()
    93  
    94  	// - Request specifically ID #1
    95  	// - Try to duplicate it, get an error
    96  	// - Make sure the auto-assignment skips over things we manually assigned
    97  
    98  	tt := []struct {
    99  		givenID     WatchID
   100  		expectedID  WatchID
   101  		expectedErr error
   102  	}{
   103  		{1, 1, nil},
   104  		{1, 0, ErrWatcherDuplicateID},
   105  		{0, 0, nil},
   106  		{0, 2, nil},
   107  	}
   108  
   109  	for i, tcase := range tt {
   110  		id, err := w.Watch(tcase.givenID, []byte("foo"), nil, 0)
   111  		if tcase.expectedErr != nil || err != nil {
   112  			if err != tcase.expectedErr {
   113  				t.Errorf("expected get error %q in test case %q, got %q", tcase.expectedErr, i, err)
   114  			}
   115  		} else if tcase.expectedID != id {
   116  			t.Errorf("expected to create ID %d, got %d in test case %d", tcase.expectedID, id, i)
   117  		}
   118  	}
   119  }
   120  
   121  // TestWatcherWatchPrefix tests if Watch operation correctly watches
   122  // and returns events with matching prefixes.
   123  func TestWatcherWatchPrefix(t *testing.T) {
   124  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   125  	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
   126  	defer cleanup(s, b, tmpPath)
   127  
   128  	w := s.NewWatchStream()
   129  	defer w.Close()
   130  
   131  	idm := make(map[WatchID]struct{})
   132  
   133  	val := []byte("bar")
   134  	keyWatch, keyEnd, keyPut := []byte("foo"), []byte("fop"), []byte("foobar")
   135  
   136  	for i := 0; i < 10; i++ {
   137  		id, _ := w.Watch(0, keyWatch, keyEnd, 0)
   138  		if _, ok := idm[id]; ok {
   139  			t.Errorf("#%d: unexpected duplicated id %x", i, id)
   140  		}
   141  		idm[id] = struct{}{}
   142  
   143  		s.Put(keyPut, val, lease.NoLease)
   144  
   145  		resp := <-w.Chan()
   146  		if resp.WatchID != id {
   147  			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
   148  		}
   149  
   150  		if err := w.Cancel(id); err != nil {
   151  			t.Errorf("#%d: unexpected cancel error %v", i, err)
   152  		}
   153  
   154  		if len(resp.Events) != 1 {
   155  			t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
   156  		}
   157  		if len(resp.Events) == 1 {
   158  			if !bytes.Equal(resp.Events[0].Kv.Key, keyPut) {
   159  				t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut)
   160  			}
   161  		}
   162  	}
   163  
   164  	keyWatch1, keyEnd1, keyPut1 := []byte("foo1"), []byte("foo2"), []byte("foo1bar")
   165  	s.Put(keyPut1, val, lease.NoLease)
   166  
   167  	// unsynced watchers
   168  	for i := 10; i < 15; i++ {
   169  		id, _ := w.Watch(0, keyWatch1, keyEnd1, 1)
   170  		if _, ok := idm[id]; ok {
   171  			t.Errorf("#%d: id %d exists", i, id)
   172  		}
   173  		idm[id] = struct{}{}
   174  
   175  		resp := <-w.Chan()
   176  		if resp.WatchID != id {
   177  			t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
   178  		}
   179  
   180  		if err := w.Cancel(id); err != nil {
   181  			t.Error(err)
   182  		}
   183  
   184  		if len(resp.Events) != 1 {
   185  			t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
   186  		}
   187  		if len(resp.Events) == 1 {
   188  			if !bytes.Equal(resp.Events[0].Kv.Key, keyPut1) {
   189  				t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut1)
   190  			}
   191  		}
   192  	}
   193  }
   194  
   195  // TestWatcherWatchWrongRange ensures that watcher with wrong 'end' range
   196  // does not create watcher, which panics when canceling in range tree.
   197  func TestWatcherWatchWrongRange(t *testing.T) {
   198  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   199  	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
   200  	defer cleanup(s, b, tmpPath)
   201  
   202  	w := s.NewWatchStream()
   203  	defer w.Close()
   204  
   205  	if _, err := w.Watch(0, []byte("foa"), []byte("foa"), 1); err != ErrEmptyWatcherRange {
   206  		t.Fatalf("key == end range given; expected ErrEmptyWatcherRange, got %+v", err)
   207  	}
   208  	if _, err := w.Watch(0, []byte("fob"), []byte("foa"), 1); err != ErrEmptyWatcherRange {
   209  		t.Fatalf("key > end range given; expected ErrEmptyWatcherRange, got %+v", err)
   210  	}
   211  	// watch request with 'WithFromKey' has empty-byte range end
   212  	if id, _ := w.Watch(0, []byte("foo"), []byte{}, 1); id != 0 {
   213  		t.Fatalf("\x00 is range given; id expected 0, got %d", id)
   214  	}
   215  }
   216  
   217  func TestWatchDeleteRange(t *testing.T) {
   218  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   219  	s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   220  
   221  	defer func() {
   222  		s.store.Close()
   223  		os.Remove(tmpPath)
   224  	}()
   225  
   226  	testKeyPrefix := []byte("foo")
   227  
   228  	for i := 0; i < 3; i++ {
   229  		s.Put([]byte(fmt.Sprintf("%s_%d", testKeyPrefix, i)), []byte("bar"), lease.NoLease)
   230  	}
   231  
   232  	w := s.NewWatchStream()
   233  	from, to := testKeyPrefix, []byte(fmt.Sprintf("%s_%d", testKeyPrefix, 99))
   234  	w.Watch(0, from, to, 0)
   235  
   236  	s.DeleteRange(from, to)
   237  
   238  	we := []mvccpb.Event{
   239  		{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_0"), ModRevision: 5}},
   240  		{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_1"), ModRevision: 5}},
   241  		{Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_2"), ModRevision: 5}},
   242  	}
   243  
   244  	select {
   245  	case r := <-w.Chan():
   246  		if !reflect.DeepEqual(r.Events, we) {
   247  			t.Errorf("event = %v, want %v", r.Events, we)
   248  		}
   249  	case <-time.After(10 * time.Second):
   250  		t.Fatal("failed to receive event after 10 seconds!")
   251  	}
   252  }
   253  
   254  // TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher
   255  // with given id inside watchStream.
   256  func TestWatchStreamCancelWatcherByID(t *testing.T) {
   257  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   258  	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
   259  	defer cleanup(s, b, tmpPath)
   260  
   261  	w := s.NewWatchStream()
   262  	defer w.Close()
   263  
   264  	id, _ := w.Watch(0, []byte("foo"), nil, 0)
   265  
   266  	tests := []struct {
   267  		cancelID WatchID
   268  		werr     error
   269  	}{
   270  		// no error should be returned when cancel the created watcher.
   271  		{id, nil},
   272  		// not exist error should be returned when cancel again.
   273  		{id, ErrWatcherNotExist},
   274  		// not exist error should be returned when cancel a bad id.
   275  		{id + 1, ErrWatcherNotExist},
   276  	}
   277  
   278  	for i, tt := range tests {
   279  		gerr := w.Cancel(tt.cancelID)
   280  
   281  		if gerr != tt.werr {
   282  			t.Errorf("#%d: err = %v, want %v", i, gerr, tt.werr)
   283  		}
   284  	}
   285  
   286  	if l := len(w.(*watchStream).cancels); l != 0 {
   287  		t.Errorf("cancels = %d, want 0", l)
   288  	}
   289  }
   290  
   291  // TestWatcherRequestProgress ensures synced watcher can correctly
   292  // report its correct progress.
   293  func TestWatcherRequestProgress(t *testing.T) {
   294  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   295  
   296  	// manually create watchableStore instead of newWatchableStore
   297  	// because newWatchableStore automatically calls syncWatchers
   298  	// method to sync watchers in unsynced map. We want to keep watchers
   299  	// in unsynced to test if syncWatchers works as expected.
   300  	s := &watchableStore{
   301  		store:    NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}),
   302  		unsynced: newWatcherGroup(),
   303  		synced:   newWatcherGroup(),
   304  	}
   305  
   306  	defer func() {
   307  		s.store.Close()
   308  		os.Remove(tmpPath)
   309  	}()
   310  
   311  	testKey := []byte("foo")
   312  	notTestKey := []byte("bad")
   313  	testValue := []byte("bar")
   314  	s.Put(testKey, testValue, lease.NoLease)
   315  
   316  	w := s.NewWatchStream()
   317  
   318  	badID := WatchID(1000)
   319  	w.RequestProgress(badID)
   320  	select {
   321  	case resp := <-w.Chan():
   322  		t.Fatalf("unexpected %+v", resp)
   323  	default:
   324  	}
   325  
   326  	id, _ := w.Watch(0, notTestKey, nil, 1)
   327  	w.RequestProgress(id)
   328  	select {
   329  	case resp := <-w.Chan():
   330  		t.Fatalf("unexpected %+v", resp)
   331  	default:
   332  	}
   333  
   334  	s.syncWatchers()
   335  
   336  	w.RequestProgress(id)
   337  	wrs := WatchResponse{WatchID: id, Revision: 2}
   338  	select {
   339  	case resp := <-w.Chan():
   340  		if !reflect.DeepEqual(resp, wrs) {
   341  			t.Fatalf("got %+v, expect %+v", resp, wrs)
   342  		}
   343  	case <-time.After(time.Second):
   344  		t.Fatal("failed to receive progress")
   345  	}
   346  }
   347  
   348  func TestWatcherRequestProgressAll(t *testing.T) {
   349  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   350  
   351  	// manually create watchableStore instead of newWatchableStore
   352  	// because newWatchableStore automatically calls syncWatchers
   353  	// method to sync watchers in unsynced map. We want to keep watchers
   354  	// in unsynced to test if syncWatchers works as expected.
   355  	s := &watchableStore{
   356  		store:    NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}),
   357  		unsynced: newWatcherGroup(),
   358  		synced:   newWatcherGroup(),
   359  		stopc:    make(chan struct{}),
   360  	}
   361  
   362  	defer func() {
   363  		s.store.Close()
   364  		os.Remove(tmpPath)
   365  	}()
   366  
   367  	testKey := []byte("foo")
   368  	notTestKey := []byte("bad")
   369  	testValue := []byte("bar")
   370  	s.Put(testKey, testValue, lease.NoLease)
   371  
   372  	// Create watch stream with watcher. We will not actually get
   373  	// any notifications on it specifically, but there needs to be
   374  	// at least one Watch for progress notifications to get
   375  	// generated.
   376  	w := s.NewWatchStream()
   377  	w.Watch(0, notTestKey, nil, 1)
   378  
   379  	w.RequestProgressAll()
   380  	select {
   381  	case resp := <-w.Chan():
   382  		t.Fatalf("unexpected %+v", resp)
   383  	default:
   384  	}
   385  
   386  	s.syncWatchers()
   387  
   388  	w.RequestProgressAll()
   389  	wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}
   390  	select {
   391  	case resp := <-w.Chan():
   392  		if !reflect.DeepEqual(resp, wrs) {
   393  			t.Fatalf("got %+v, expect %+v", resp, wrs)
   394  		}
   395  	case <-time.After(time.Second):
   396  		t.Fatal("failed to receive progress")
   397  	}
   398  }
   399  
   400  func TestWatcherWatchWithFilter(t *testing.T) {
   401  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   402  	s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
   403  	defer cleanup(s, b, tmpPath)
   404  
   405  	w := s.NewWatchStream()
   406  	defer w.Close()
   407  
   408  	filterPut := func(e mvccpb.Event) bool {
   409  		return e.Type == mvccpb.PUT
   410  	}
   411  
   412  	w.Watch(0, []byte("foo"), nil, 0, filterPut)
   413  	done := make(chan struct{}, 1)
   414  
   415  	go func() {
   416  		<-w.Chan()
   417  		done <- struct{}{}
   418  	}()
   419  
   420  	s.Put([]byte("foo"), []byte("bar"), 0)
   421  
   422  	select {
   423  	case <-done:
   424  		t.Fatal("failed to filter put request")
   425  	case <-time.After(100 * time.Millisecond):
   426  	}
   427  
   428  	s.DeleteRange([]byte("foo"), nil)
   429  
   430  	select {
   431  	case <-done:
   432  	case <-time.After(100 * time.Millisecond):
   433  		t.Fatal("failed to receive delete request")
   434  	}
   435  }
   436  

View as plain text