...

Source file src/go.etcd.io/etcd/server/v3/mvcc/watcher_group.go

Documentation: go.etcd.io/etcd/server/v3/mvcc

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package mvcc
    16  
    17  import (
    18  	"fmt"
    19  	"math"
    20  
    21  	"go.etcd.io/etcd/api/v3/mvccpb"
    22  	"go.etcd.io/etcd/pkg/v3/adt"
    23  )
    24  
    25  var (
    26  	// watchBatchMaxRevs is the maximum distinct revisions that
    27  	// may be sent to an unsynced watcher at a time. Declared as
    28  	// var instead of const for testing purposes.
    29  	watchBatchMaxRevs = 1000
    30  )
    31  
    32  type eventBatch struct {
    33  	// evs is a batch of revision-ordered events
    34  	evs []mvccpb.Event
    35  	// revs is the minimum unique revisions observed for this batch
    36  	revs int
    37  	// moreRev is first revision with more events following this batch
    38  	moreRev int64
    39  }
    40  
    41  func (eb *eventBatch) add(ev mvccpb.Event) {
    42  	if eb.revs > watchBatchMaxRevs {
    43  		// maxed out batch size
    44  		return
    45  	}
    46  
    47  	if len(eb.evs) == 0 {
    48  		// base case
    49  		eb.revs = 1
    50  		eb.evs = append(eb.evs, ev)
    51  		return
    52  	}
    53  
    54  	// revision accounting
    55  	ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
    56  	evRev := ev.Kv.ModRevision
    57  	if evRev > ebRev {
    58  		eb.revs++
    59  		if eb.revs > watchBatchMaxRevs {
    60  			eb.moreRev = evRev
    61  			return
    62  		}
    63  	}
    64  
    65  	eb.evs = append(eb.evs, ev)
    66  }
    67  
    68  type watcherBatch map[*watcher]*eventBatch
    69  
    70  func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
    71  	eb := wb[w]
    72  	if eb == nil {
    73  		eb = &eventBatch{}
    74  		wb[w] = eb
    75  	}
    76  	eb.add(ev)
    77  }
    78  
    79  // newWatcherBatch maps watchers to their matched events. It enables quick
    80  // events look up by watcher.
    81  func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
    82  	if len(wg.watchers) == 0 {
    83  		return nil
    84  	}
    85  
    86  	wb := make(watcherBatch)
    87  	for _, ev := range evs {
    88  		for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
    89  			if ev.Kv.ModRevision >= w.minRev {
    90  				// don't double notify
    91  				wb.add(w, ev)
    92  			}
    93  		}
    94  	}
    95  	return wb
    96  }
    97  
    98  type watcherSet map[*watcher]struct{}
    99  
   100  func (w watcherSet) add(wa *watcher) {
   101  	if _, ok := w[wa]; ok {
   102  		panic("add watcher twice!")
   103  	}
   104  	w[wa] = struct{}{}
   105  }
   106  
   107  func (w watcherSet) union(ws watcherSet) {
   108  	for wa := range ws {
   109  		w.add(wa)
   110  	}
   111  }
   112  
   113  func (w watcherSet) delete(wa *watcher) {
   114  	if _, ok := w[wa]; !ok {
   115  		panic("removing missing watcher!")
   116  	}
   117  	delete(w, wa)
   118  }
   119  
   120  type watcherSetByKey map[string]watcherSet
   121  
   122  func (w watcherSetByKey) add(wa *watcher) {
   123  	set := w[string(wa.key)]
   124  	if set == nil {
   125  		set = make(watcherSet)
   126  		w[string(wa.key)] = set
   127  	}
   128  	set.add(wa)
   129  }
   130  
   131  func (w watcherSetByKey) delete(wa *watcher) bool {
   132  	k := string(wa.key)
   133  	if v, ok := w[k]; ok {
   134  		if _, ok := v[wa]; ok {
   135  			delete(v, wa)
   136  			if len(v) == 0 {
   137  				// remove the set; nothing left
   138  				delete(w, k)
   139  			}
   140  			return true
   141  		}
   142  	}
   143  	return false
   144  }
   145  
   146  // watcherGroup is a collection of watchers organized by their ranges
   147  type watcherGroup struct {
   148  	// keyWatchers has the watchers that watch on a single key
   149  	keyWatchers watcherSetByKey
   150  	// ranges has the watchers that watch a range; it is sorted by interval
   151  	ranges adt.IntervalTree
   152  	// watchers is the set of all watchers
   153  	watchers watcherSet
   154  }
   155  
   156  func newWatcherGroup() watcherGroup {
   157  	return watcherGroup{
   158  		keyWatchers: make(watcherSetByKey),
   159  		ranges:      adt.NewIntervalTree(),
   160  		watchers:    make(watcherSet),
   161  	}
   162  }
   163  
   164  // add puts a watcher in the group.
   165  func (wg *watcherGroup) add(wa *watcher) {
   166  	wg.watchers.add(wa)
   167  	if wa.end == nil {
   168  		wg.keyWatchers.add(wa)
   169  		return
   170  	}
   171  
   172  	// interval already registered?
   173  	ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
   174  	if iv := wg.ranges.Find(ivl); iv != nil {
   175  		iv.Val.(watcherSet).add(wa)
   176  		return
   177  	}
   178  
   179  	// not registered, put in interval tree
   180  	ws := make(watcherSet)
   181  	ws.add(wa)
   182  	wg.ranges.Insert(ivl, ws)
   183  }
   184  
   185  // contains is whether the given key has a watcher in the group.
   186  func (wg *watcherGroup) contains(key string) bool {
   187  	_, ok := wg.keyWatchers[key]
   188  	return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key))
   189  }
   190  
   191  // size gives the number of unique watchers in the group.
   192  func (wg *watcherGroup) size() int { return len(wg.watchers) }
   193  
   194  // delete removes a watcher from the group.
   195  func (wg *watcherGroup) delete(wa *watcher) bool {
   196  	if _, ok := wg.watchers[wa]; !ok {
   197  		return false
   198  	}
   199  	wg.watchers.delete(wa)
   200  	if wa.end == nil {
   201  		wg.keyWatchers.delete(wa)
   202  		return true
   203  	}
   204  
   205  	ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
   206  	iv := wg.ranges.Find(ivl)
   207  	if iv == nil {
   208  		return false
   209  	}
   210  
   211  	ws := iv.Val.(watcherSet)
   212  	delete(ws, wa)
   213  	if len(ws) == 0 {
   214  		// remove interval missing watchers
   215  		if ok := wg.ranges.Delete(ivl); !ok {
   216  			panic("could not remove watcher from interval tree")
   217  		}
   218  	}
   219  
   220  	return true
   221  }
   222  
   223  // choose selects watchers from the watcher group to update
   224  func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
   225  	if len(wg.watchers) < maxWatchers {
   226  		return wg, wg.chooseAll(curRev, compactRev)
   227  	}
   228  	ret := newWatcherGroup()
   229  	for w := range wg.watchers {
   230  		if maxWatchers <= 0 {
   231  			break
   232  		}
   233  		maxWatchers--
   234  		ret.add(w)
   235  	}
   236  	return &ret, ret.chooseAll(curRev, compactRev)
   237  }
   238  
   239  func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
   240  	minRev := int64(math.MaxInt64)
   241  	for w := range wg.watchers {
   242  		if w.minRev > curRev {
   243  			// after network partition, possibly choosing future revision watcher from restore operation
   244  			// with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
   245  			// do not panic when such watcher had been moved from "synced" watcher during restore operation
   246  			if !w.restore {
   247  				panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
   248  			}
   249  
   250  			// mark 'restore' done, since it's chosen
   251  			w.restore = false
   252  		}
   253  		if w.minRev < compactRev {
   254  			select {
   255  			case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
   256  				w.compacted = true
   257  				wg.delete(w)
   258  			default:
   259  				// retry next time
   260  			}
   261  			continue
   262  		}
   263  		if minRev > w.minRev {
   264  			minRev = w.minRev
   265  		}
   266  	}
   267  	return minRev
   268  }
   269  
   270  // watcherSetByKey gets the set of watchers that receive events on the given key.
   271  func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
   272  	wkeys := wg.keyWatchers[key]
   273  	wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
   274  
   275  	// zero-copy cases
   276  	switch {
   277  	case len(wranges) == 0:
   278  		// no need to merge ranges or copy; reuse single-key set
   279  		return wkeys
   280  	case len(wranges) == 0 && len(wkeys) == 0:
   281  		return nil
   282  	case len(wranges) == 1 && len(wkeys) == 0:
   283  		return wranges[0].Val.(watcherSet)
   284  	}
   285  
   286  	// copy case
   287  	ret := make(watcherSet)
   288  	ret.union(wg.keyWatchers[key])
   289  	for _, item := range wranges {
   290  		ret.union(item.Val.(watcherSet))
   291  	}
   292  	return ret
   293  }
   294  

View as plain text