...

Source file src/go.etcd.io/etcd/server/v3/mvcc/watchable_store.go

Documentation: go.etcd.io/etcd/server/v3/mvcc

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package mvcc
    16  
    17  import (
    18  	"sync"
    19  	"time"
    20  
    21  	"go.etcd.io/etcd/api/v3/mvccpb"
    22  	clientv3 "go.etcd.io/etcd/client/v3"
    23  	"go.etcd.io/etcd/pkg/v3/traceutil"
    24  	"go.etcd.io/etcd/server/v3/lease"
    25  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    26  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    27  
    28  	"go.uber.org/zap"
    29  )
    30  
    31  // non-const so modifiable by tests
    32  var (
    33  	// chanBufLen is the length of the buffered chan
    34  	// for sending out watched events.
    35  	// See https://github.com/etcd-io/etcd/issues/11906 for more detail.
    36  	chanBufLen = 128
    37  
    38  	// maxWatchersPerSync is the number of watchers to sync in a single batch
    39  	maxWatchersPerSync = 512
    40  )
    41  
    42  type watchable interface {
    43  	watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
    44  	progress(w *watcher)
    45  	progressAll(watchers map[WatchID]*watcher) bool
    46  	rev() int64
    47  }
    48  
    49  type watchableStore struct {
    50  	*store
    51  
    52  	// mu protects watcher groups and batches. It should never be locked
    53  	// before locking store.mu to avoid deadlock.
    54  	mu sync.RWMutex
    55  
    56  	// victims are watcher batches that were blocked on the watch channel
    57  	victims []watcherBatch
    58  	victimc chan struct{}
    59  
    60  	// contains all unsynced watchers that needs to sync with events that have happened
    61  	unsynced watcherGroup
    62  
    63  	// contains all synced watchers that are in sync with the progress of the store.
    64  	// The key of the map is the key that the watcher watches on.
    65  	synced watcherGroup
    66  
    67  	stopc chan struct{}
    68  	wg    sync.WaitGroup
    69  }
    70  
    71  // cancelFunc updates unsynced and synced maps when running
    72  // cancel operations.
    73  type cancelFunc func()
    74  
    75  func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
    76  	return newWatchableStore(lg, b, le, cfg)
    77  }
    78  
    79  func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
    80  	if lg == nil {
    81  		lg = zap.NewNop()
    82  	}
    83  	s := &watchableStore{
    84  		store:    NewStore(lg, b, le, cfg),
    85  		victimc:  make(chan struct{}, 1),
    86  		unsynced: newWatcherGroup(),
    87  		synced:   newWatcherGroup(),
    88  		stopc:    make(chan struct{}),
    89  	}
    90  	s.store.ReadView = &readView{s}
    91  	s.store.WriteView = &writeView{s}
    92  	if s.le != nil {
    93  		// use this store as the deleter so revokes trigger watch events
    94  		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
    95  	}
    96  	s.wg.Add(2)
    97  	go s.syncWatchersLoop()
    98  	go s.syncVictimsLoop()
    99  	return s
   100  }
   101  
   102  func (s *watchableStore) Close() error {
   103  	close(s.stopc)
   104  	s.wg.Wait()
   105  	return s.store.Close()
   106  }
   107  
   108  func (s *watchableStore) NewWatchStream() WatchStream {
   109  	watchStreamGauge.Inc()
   110  	return &watchStream{
   111  		watchable: s,
   112  		ch:        make(chan WatchResponse, chanBufLen),
   113  		cancels:   make(map[WatchID]cancelFunc),
   114  		watchers:  make(map[WatchID]*watcher),
   115  	}
   116  }
   117  
   118  func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
   119  	wa := &watcher{
   120  		key:    key,
   121  		end:    end,
   122  		minRev: startRev,
   123  		id:     id,
   124  		ch:     ch,
   125  		fcs:    fcs,
   126  	}
   127  
   128  	s.mu.Lock()
   129  	s.revMu.RLock()
   130  	synced := startRev > s.store.currentRev || startRev == 0
   131  	if synced {
   132  		wa.minRev = s.store.currentRev + 1
   133  		if startRev > wa.minRev {
   134  			wa.minRev = startRev
   135  		}
   136  		s.synced.add(wa)
   137  	} else {
   138  		slowWatcherGauge.Inc()
   139  		s.unsynced.add(wa)
   140  	}
   141  	s.revMu.RUnlock()
   142  	s.mu.Unlock()
   143  
   144  	watcherGauge.Inc()
   145  
   146  	return wa, func() { s.cancelWatcher(wa) }
   147  }
   148  
   149  // cancelWatcher removes references of the watcher from the watchableStore
   150  func (s *watchableStore) cancelWatcher(wa *watcher) {
   151  	for {
   152  		s.mu.Lock()
   153  		if s.unsynced.delete(wa) {
   154  			slowWatcherGauge.Dec()
   155  			watcherGauge.Dec()
   156  			break
   157  		} else if s.synced.delete(wa) {
   158  			watcherGauge.Dec()
   159  			break
   160  		} else if wa.compacted {
   161  			watcherGauge.Dec()
   162  			break
   163  		} else if wa.ch == nil {
   164  			// already canceled (e.g., cancel/close race)
   165  			break
   166  		}
   167  
   168  		if !wa.victim {
   169  			s.mu.Unlock()
   170  			panic("watcher not victim but not in watch groups")
   171  		}
   172  
   173  		var victimBatch watcherBatch
   174  		for _, wb := range s.victims {
   175  			if wb[wa] != nil {
   176  				victimBatch = wb
   177  				break
   178  			}
   179  		}
   180  		if victimBatch != nil {
   181  			slowWatcherGauge.Dec()
   182  			watcherGauge.Dec()
   183  			delete(victimBatch, wa)
   184  			break
   185  		}
   186  
   187  		// victim being processed so not accessible; retry
   188  		s.mu.Unlock()
   189  		time.Sleep(time.Millisecond)
   190  	}
   191  
   192  	wa.ch = nil
   193  	s.mu.Unlock()
   194  }
   195  
   196  func (s *watchableStore) Restore(b backend.Backend) error {
   197  	s.mu.Lock()
   198  	defer s.mu.Unlock()
   199  	err := s.store.Restore(b)
   200  	if err != nil {
   201  		return err
   202  	}
   203  
   204  	for wa := range s.synced.watchers {
   205  		wa.restore = true
   206  		s.unsynced.add(wa)
   207  	}
   208  	s.synced = newWatcherGroup()
   209  	return nil
   210  }
   211  
   212  // syncWatchersLoop syncs the watcher in the unsynced map every 100ms.
   213  func (s *watchableStore) syncWatchersLoop() {
   214  	defer s.wg.Done()
   215  
   216  	for {
   217  		s.mu.RLock()
   218  		st := time.Now()
   219  		lastUnsyncedWatchers := s.unsynced.size()
   220  		s.mu.RUnlock()
   221  
   222  		unsyncedWatchers := 0
   223  		if lastUnsyncedWatchers > 0 {
   224  			unsyncedWatchers = s.syncWatchers()
   225  		}
   226  		syncDuration := time.Since(st)
   227  
   228  		waitDuration := 100 * time.Millisecond
   229  		// more work pending?
   230  		if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
   231  			// be fair to other store operations by yielding time taken
   232  			waitDuration = syncDuration
   233  		}
   234  
   235  		select {
   236  		case <-time.After(waitDuration):
   237  		case <-s.stopc:
   238  			return
   239  		}
   240  	}
   241  }
   242  
   243  // syncVictimsLoop tries to write precomputed watcher responses to
   244  // watchers that had a blocked watcher channel
   245  func (s *watchableStore) syncVictimsLoop() {
   246  	defer s.wg.Done()
   247  
   248  	for {
   249  		for s.moveVictims() != 0 {
   250  			// try to update all victim watchers
   251  		}
   252  		s.mu.RLock()
   253  		isEmpty := len(s.victims) == 0
   254  		s.mu.RUnlock()
   255  
   256  		var tickc <-chan time.Time
   257  		if !isEmpty {
   258  			tickc = time.After(10 * time.Millisecond)
   259  		}
   260  
   261  		select {
   262  		case <-tickc:
   263  		case <-s.victimc:
   264  		case <-s.stopc:
   265  			return
   266  		}
   267  	}
   268  }
   269  
   270  // moveVictims tries to update watches with already pending event data
   271  func (s *watchableStore) moveVictims() (moved int) {
   272  	s.mu.Lock()
   273  	victims := s.victims
   274  	s.victims = nil
   275  	s.mu.Unlock()
   276  
   277  	var newVictim watcherBatch
   278  	for _, wb := range victims {
   279  		// try to send responses again
   280  		for w, eb := range wb {
   281  			// watcher has observed the store up to, but not including, w.minRev
   282  			rev := w.minRev - 1
   283  			if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
   284  				pendingEventsGauge.Add(float64(len(eb.evs)))
   285  			} else {
   286  				if newVictim == nil {
   287  					newVictim = make(watcherBatch)
   288  				}
   289  				newVictim[w] = eb
   290  				continue
   291  			}
   292  			moved++
   293  		}
   294  
   295  		// assign completed victim watchers to unsync/sync
   296  		s.mu.Lock()
   297  		s.store.revMu.RLock()
   298  		curRev := s.store.currentRev
   299  		for w, eb := range wb {
   300  			if newVictim != nil && newVictim[w] != nil {
   301  				// couldn't send watch response; stays victim
   302  				continue
   303  			}
   304  			w.victim = false
   305  			if eb.moreRev != 0 {
   306  				w.minRev = eb.moreRev
   307  			}
   308  			if w.minRev <= curRev {
   309  				s.unsynced.add(w)
   310  			} else {
   311  				slowWatcherGauge.Dec()
   312  				s.synced.add(w)
   313  			}
   314  		}
   315  		s.store.revMu.RUnlock()
   316  		s.mu.Unlock()
   317  	}
   318  
   319  	if len(newVictim) > 0 {
   320  		s.mu.Lock()
   321  		s.victims = append(s.victims, newVictim)
   322  		s.mu.Unlock()
   323  	}
   324  
   325  	return moved
   326  }
   327  
   328  // syncWatchers syncs unsynced watchers by:
   329  //  1. choose a set of watchers from the unsynced watcher group
   330  //  2. iterate over the set to get the minimum revision and remove compacted watchers
   331  //  3. use minimum revision to get all key-value pairs and send those events to watchers
   332  //  4. remove synced watchers in set from unsynced group and move to synced group
   333  func (s *watchableStore) syncWatchers() int {
   334  	s.mu.Lock()
   335  	defer s.mu.Unlock()
   336  
   337  	if s.unsynced.size() == 0 {
   338  		return 0
   339  	}
   340  
   341  	s.store.revMu.RLock()
   342  	defer s.store.revMu.RUnlock()
   343  
   344  	// in order to find key-value pairs from unsynced watchers, we need to
   345  	// find min revision index, and these revisions can be used to
   346  	// query the backend store of key-value pairs
   347  	curRev := s.store.currentRev
   348  	compactionRev := s.store.compactMainRev
   349  
   350  	wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
   351  	minBytes, maxBytes := newRevBytes(), newRevBytes()
   352  	revToBytes(revision{main: minRev}, minBytes)
   353  	revToBytes(revision{main: curRev + 1}, maxBytes)
   354  
   355  	// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
   356  	// values are actual key-value pairs in backend.
   357  	tx := s.store.b.ReadTx()
   358  	tx.RLock()
   359  	revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)
   360  	evs := kvsToEvents(s.store.lg, wg, revs, vs)
   361  	// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
   362  	// We can only unlock after Unmarshal, which will do deep copy.
   363  	// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
   364  	tx.RUnlock()
   365  
   366  	var victims watcherBatch
   367  	wb := newWatcherBatch(wg, evs)
   368  	for w := range wg.watchers {
   369  		if w.minRev < compactionRev {
   370  			// Skip the watcher that failed to send compacted watch response due to w.ch is full.
   371  			// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
   372  			continue
   373  		}
   374  		w.minRev = curRev + 1
   375  
   376  		eb, ok := wb[w]
   377  		if !ok {
   378  			// bring un-notified watcher to synced
   379  			s.synced.add(w)
   380  			s.unsynced.delete(w)
   381  			continue
   382  		}
   383  
   384  		if eb.moreRev != 0 {
   385  			w.minRev = eb.moreRev
   386  		}
   387  
   388  		if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
   389  			pendingEventsGauge.Add(float64(len(eb.evs)))
   390  		} else {
   391  			if victims == nil {
   392  				victims = make(watcherBatch)
   393  			}
   394  			w.victim = true
   395  		}
   396  
   397  		if w.victim {
   398  			victims[w] = eb
   399  		} else {
   400  			if eb.moreRev != 0 {
   401  				// stay unsynced; more to read
   402  				continue
   403  			}
   404  			s.synced.add(w)
   405  		}
   406  		s.unsynced.delete(w)
   407  	}
   408  	s.addVictim(victims)
   409  
   410  	vsz := 0
   411  	for _, v := range s.victims {
   412  		vsz += len(v)
   413  	}
   414  	slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
   415  
   416  	return s.unsynced.size()
   417  }
   418  
   419  // kvsToEvents gets all events for the watchers from all key-value pairs
   420  func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
   421  	for i, v := range vals {
   422  		var kv mvccpb.KeyValue
   423  		if err := kv.Unmarshal(v); err != nil {
   424  			lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
   425  		}
   426  
   427  		if !wg.contains(string(kv.Key)) {
   428  			continue
   429  		}
   430  
   431  		ty := mvccpb.PUT
   432  		if isTombstone(revs[i]) {
   433  			ty = mvccpb.DELETE
   434  			// patch in mod revision so watchers won't skip
   435  			kv.ModRevision = bytesToRev(revs[i]).main
   436  		}
   437  		evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
   438  	}
   439  	return evs
   440  }
   441  
   442  // notify notifies the fact that given event at the given rev just happened to
   443  // watchers that watch on the key of the event.
   444  func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
   445  	var victim watcherBatch
   446  	for w, eb := range newWatcherBatch(&s.synced, evs) {
   447  		if eb.revs != 1 {
   448  			s.store.lg.Panic(
   449  				"unexpected multiple revisions in watch notification",
   450  				zap.Int("number-of-revisions", eb.revs),
   451  			)
   452  		}
   453  		if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
   454  			pendingEventsGauge.Add(float64(len(eb.evs)))
   455  		} else {
   456  			// move slow watcher to victims
   457  			if victim == nil {
   458  				victim = make(watcherBatch)
   459  			}
   460  			w.victim = true
   461  			victim[w] = eb
   462  			s.synced.delete(w)
   463  			slowWatcherGauge.Inc()
   464  		}
   465  		// always update minRev
   466  		// in case 'send' returns true and watcher stays synced, this is needed for Restore when all watchers become unsynced
   467  		// in case 'send' returns false, this is needed for syncWatchers
   468  		w.minRev = rev + 1
   469  	}
   470  	s.addVictim(victim)
   471  }
   472  
   473  func (s *watchableStore) addVictim(victim watcherBatch) {
   474  	if victim == nil {
   475  		return
   476  	}
   477  	s.victims = append(s.victims, victim)
   478  	select {
   479  	case s.victimc <- struct{}{}:
   480  	default:
   481  	}
   482  }
   483  
   484  func (s *watchableStore) rev() int64 { return s.store.Rev() }
   485  
   486  func (s *watchableStore) progress(w *watcher) {
   487  	s.progressIfSync(map[WatchID]*watcher{w.id: w}, w.id)
   488  }
   489  
   490  func (s *watchableStore) progressAll(watchers map[WatchID]*watcher) bool {
   491  	return s.progressIfSync(watchers, clientv3.InvalidWatchID)
   492  }
   493  
   494  func (s *watchableStore) progressIfSync(watchers map[WatchID]*watcher, responseWatchID WatchID) bool {
   495  	s.mu.RLock()
   496  	defer s.mu.RUnlock()
   497  
   498  	// Any watcher unsynced?
   499  	for _, w := range watchers {
   500  		if _, ok := s.synced.watchers[w]; !ok {
   501  			return false
   502  		}
   503  	}
   504  
   505  	// If all watchers are synchronised, send out progress
   506  	// notification on first watcher. Note that all watchers
   507  	// should have the same underlying stream, and the progress
   508  	// notification will be broadcasted client-side if required
   509  	// (see dispatchEvent in client/v3/watch.go)
   510  	for _, w := range watchers {
   511  		w.send(WatchResponse{WatchID: responseWatchID, Revision: s.rev()})
   512  		return true
   513  	}
   514  	return true
   515  }
   516  
   517  type watcher struct {
   518  	// the watcher key
   519  	key []byte
   520  	// end indicates the end of the range to watch.
   521  	// If end is set, the watcher is on a range.
   522  	end []byte
   523  
   524  	// victim is set when ch is blocked and undergoing victim processing
   525  	victim bool
   526  
   527  	// compacted is set when the watcher is removed because of compaction
   528  	compacted bool
   529  
   530  	// restore is true when the watcher is being restored from leader snapshot
   531  	// which means that this watcher has just been moved from "synced" to "unsynced"
   532  	// watcher group, possibly with a future revision when it was first added
   533  	// to the synced watcher
   534  	// "unsynced" watcher revision must always be <= current revision,
   535  	// except when the watcher were to be moved from "synced" watcher group
   536  	restore bool
   537  
   538  	// minRev is the minimum revision update the watcher will accept
   539  	minRev int64
   540  	id     WatchID
   541  
   542  	fcs []FilterFunc
   543  	// a chan to send out the watch response.
   544  	// The chan might be shared with other watchers.
   545  	ch chan<- WatchResponse
   546  }
   547  
   548  func (w *watcher) send(wr WatchResponse) bool {
   549  	progressEvent := len(wr.Events) == 0
   550  
   551  	if len(w.fcs) != 0 {
   552  		ne := make([]mvccpb.Event, 0, len(wr.Events))
   553  		for i := range wr.Events {
   554  			filtered := false
   555  			for _, filter := range w.fcs {
   556  				if filter(wr.Events[i]) {
   557  					filtered = true
   558  					break
   559  				}
   560  			}
   561  			if !filtered {
   562  				ne = append(ne, wr.Events[i])
   563  			}
   564  		}
   565  		wr.Events = ne
   566  	}
   567  
   568  	// if all events are filtered out, we should send nothing.
   569  	if !progressEvent && len(wr.Events) == 0 {
   570  		return true
   571  	}
   572  	select {
   573  	case w.ch <- wr:
   574  		return true
   575  	default:
   576  		return false
   577  	}
   578  }
   579  

View as plain text