...

Source file src/go.etcd.io/etcd/server/v3/mvcc/watcher.go

Documentation: go.etcd.io/etcd/server/v3/mvcc

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package mvcc
    16  
    17  import (
    18  	"bytes"
    19  	"errors"
    20  	"sync"
    21  
    22  	"go.etcd.io/etcd/api/v3/mvccpb"
    23  	clientv3 "go.etcd.io/etcd/client/v3"
    24  )
    25  
    26  var (
    27  	ErrWatcherNotExist    = errors.New("mvcc: watcher does not exist")
    28  	ErrEmptyWatcherRange  = errors.New("mvcc: watcher range is empty")
    29  	ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream")
    30  )
    31  
    32  type WatchID int64
    33  
    34  // FilterFunc returns true if the given event should be filtered out.
    35  type FilterFunc func(e mvccpb.Event) bool
    36  
    37  type WatchStream interface {
    38  	// Watch creates a watcher. The watcher watches the events happening or
    39  	// happened on the given key or range [key, end) from the given startRev.
    40  	//
    41  	// The whole event history can be watched unless compacted.
    42  	// If "startRev" <=0, watch observes events after currentRev.
    43  	//
    44  	// The returned "id" is the ID of this watcher. It appears as WatchID
    45  	// in events that are sent to the created watcher through stream channel.
    46  	// The watch ID is used when it's not equal to AutoWatchID. Otherwise,
    47  	// an auto-generated watch ID is returned.
    48  	Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)
    49  
    50  	// Chan returns a chan. All watch response will be sent to the returned chan.
    51  	Chan() <-chan WatchResponse
    52  
    53  	// RequestProgress requests the progress of the watcher with given ID. The response
    54  	// will only be sent if the watcher is currently synced.
    55  	// The responses will be sent through the WatchRespone Chan attached
    56  	// with this stream to ensure correct ordering.
    57  	// The responses contains no events. The revision in the response is the progress
    58  	// of the watchers since the watcher is currently synced.
    59  	RequestProgress(id WatchID)
    60  
    61  	// RequestProgressAll requests a progress notification for all
    62  	// watchers sharing the stream.  If all watchers are synced, a
    63  	// progress notification with watch ID -1 will be sent to an
    64  	// arbitrary watcher of this stream, and the function returns
    65  	// true.
    66  	RequestProgressAll() bool
    67  
    68  	// Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
    69  	// returned.
    70  	Cancel(id WatchID) error
    71  
    72  	// Close closes Chan and release all related resources.
    73  	Close()
    74  
    75  	// Rev returns the current revision of the KV the stream watches on.
    76  	Rev() int64
    77  }
    78  
    79  type WatchResponse struct {
    80  	// WatchID is the WatchID of the watcher this response sent to.
    81  	WatchID WatchID
    82  
    83  	// Events contains all the events that needs to send.
    84  	Events []mvccpb.Event
    85  
    86  	// Revision is the revision of the KV when the watchResponse is created.
    87  	// For a normal response, the revision should be the same as the last
    88  	// modified revision inside Events. For a delayed response to a unsynced
    89  	// watcher, the revision is greater than the last modified revision
    90  	// inside Events.
    91  	Revision int64
    92  
    93  	// CompactRevision is set when the watcher is cancelled due to compaction.
    94  	CompactRevision int64
    95  }
    96  
    97  // watchStream contains a collection of watchers that share
    98  // one streaming chan to send out watched events and other control events.
    99  type watchStream struct {
   100  	watchable watchable
   101  	ch        chan WatchResponse
   102  
   103  	mu sync.Mutex // guards fields below it
   104  	// nextID is the ID pre-allocated for next new watcher in this stream
   105  	nextID   WatchID
   106  	closed   bool
   107  	cancels  map[WatchID]cancelFunc
   108  	watchers map[WatchID]*watcher
   109  }
   110  
   111  // Watch creates a new watcher in the stream and returns its WatchID.
   112  func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
   113  	// prevent wrong range where key >= end lexicographically
   114  	// watch request with 'WithFromKey' has empty-byte range end
   115  	if len(end) != 0 && bytes.Compare(key, end) != -1 {
   116  		return -1, ErrEmptyWatcherRange
   117  	}
   118  
   119  	ws.mu.Lock()
   120  	defer ws.mu.Unlock()
   121  	if ws.closed {
   122  		return -1, ErrEmptyWatcherRange
   123  	}
   124  
   125  	if id == clientv3.AutoWatchID {
   126  		for ws.watchers[ws.nextID] != nil {
   127  			ws.nextID++
   128  		}
   129  		id = ws.nextID
   130  		ws.nextID++
   131  	} else if _, ok := ws.watchers[id]; ok {
   132  		return -1, ErrWatcherDuplicateID
   133  	}
   134  
   135  	w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
   136  
   137  	ws.cancels[id] = c
   138  	ws.watchers[id] = w
   139  	return id, nil
   140  }
   141  
   142  func (ws *watchStream) Chan() <-chan WatchResponse {
   143  	return ws.ch
   144  }
   145  
   146  func (ws *watchStream) Cancel(id WatchID) error {
   147  	ws.mu.Lock()
   148  	cancel, ok := ws.cancels[id]
   149  	w := ws.watchers[id]
   150  	ok = ok && !ws.closed
   151  	ws.mu.Unlock()
   152  
   153  	if !ok {
   154  		return ErrWatcherNotExist
   155  	}
   156  	cancel()
   157  
   158  	ws.mu.Lock()
   159  	// The watch isn't removed until cancel so that if Close() is called,
   160  	// it will wait for the cancel. Otherwise, Close() could close the
   161  	// watch channel while the store is still posting events.
   162  	if ww := ws.watchers[id]; ww == w {
   163  		delete(ws.cancels, id)
   164  		delete(ws.watchers, id)
   165  	}
   166  	ws.mu.Unlock()
   167  
   168  	return nil
   169  }
   170  
   171  func (ws *watchStream) Close() {
   172  	ws.mu.Lock()
   173  	defer ws.mu.Unlock()
   174  
   175  	for _, cancel := range ws.cancels {
   176  		cancel()
   177  	}
   178  	ws.closed = true
   179  	close(ws.ch)
   180  	watchStreamGauge.Dec()
   181  }
   182  
   183  func (ws *watchStream) Rev() int64 {
   184  	ws.mu.Lock()
   185  	defer ws.mu.Unlock()
   186  	return ws.watchable.rev()
   187  }
   188  
   189  func (ws *watchStream) RequestProgress(id WatchID) {
   190  	ws.mu.Lock()
   191  	w, ok := ws.watchers[id]
   192  	ws.mu.Unlock()
   193  	if !ok {
   194  		return
   195  	}
   196  	ws.watchable.progress(w)
   197  }
   198  
   199  func (ws *watchStream) RequestProgressAll() bool {
   200  	ws.mu.Lock()
   201  	defer ws.mu.Unlock()
   202  	return ws.watchable.progressAll(ws.watchers)
   203  }
   204  

View as plain text