1 // Copyright 2015 The etcd Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package mvcc 16 17 import ( 18 "bytes" 19 "errors" 20 "sync" 21 22 "go.etcd.io/etcd/api/v3/mvccpb" 23 clientv3 "go.etcd.io/etcd/client/v3" 24 ) 25 26 var ( 27 ErrWatcherNotExist = errors.New("mvcc: watcher does not exist") 28 ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty") 29 ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream") 30 ) 31 32 type WatchID int64 33 34 // FilterFunc returns true if the given event should be filtered out. 35 type FilterFunc func(e mvccpb.Event) bool 36 37 type WatchStream interface { 38 // Watch creates a watcher. The watcher watches the events happening or 39 // happened on the given key or range [key, end) from the given startRev. 40 // 41 // The whole event history can be watched unless compacted. 42 // If "startRev" <=0, watch observes events after currentRev. 43 // 44 // The returned "id" is the ID of this watcher. It appears as WatchID 45 // in events that are sent to the created watcher through stream channel. 46 // The watch ID is used when it's not equal to AutoWatchID. Otherwise, 47 // an auto-generated watch ID is returned. 48 Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) 49 50 // Chan returns a chan. All watch response will be sent to the returned chan. 51 Chan() <-chan WatchResponse 52 53 // RequestProgress requests the progress of the watcher with given ID. The response 54 // will only be sent if the watcher is currently synced. 55 // The responses will be sent through the WatchRespone Chan attached 56 // with this stream to ensure correct ordering. 57 // The responses contains no events. The revision in the response is the progress 58 // of the watchers since the watcher is currently synced. 59 RequestProgress(id WatchID) 60 61 // RequestProgressAll requests a progress notification for all 62 // watchers sharing the stream. If all watchers are synced, a 63 // progress notification with watch ID -1 will be sent to an 64 // arbitrary watcher of this stream, and the function returns 65 // true. 66 RequestProgressAll() bool 67 68 // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be 69 // returned. 70 Cancel(id WatchID) error 71 72 // Close closes Chan and release all related resources. 73 Close() 74 75 // Rev returns the current revision of the KV the stream watches on. 76 Rev() int64 77 } 78 79 type WatchResponse struct { 80 // WatchID is the WatchID of the watcher this response sent to. 81 WatchID WatchID 82 83 // Events contains all the events that needs to send. 84 Events []mvccpb.Event 85 86 // Revision is the revision of the KV when the watchResponse is created. 87 // For a normal response, the revision should be the same as the last 88 // modified revision inside Events. For a delayed response to a unsynced 89 // watcher, the revision is greater than the last modified revision 90 // inside Events. 91 Revision int64 92 93 // CompactRevision is set when the watcher is cancelled due to compaction. 94 CompactRevision int64 95 } 96 97 // watchStream contains a collection of watchers that share 98 // one streaming chan to send out watched events and other control events. 99 type watchStream struct { 100 watchable watchable 101 ch chan WatchResponse 102 103 mu sync.Mutex // guards fields below it 104 // nextID is the ID pre-allocated for next new watcher in this stream 105 nextID WatchID 106 closed bool 107 cancels map[WatchID]cancelFunc 108 watchers map[WatchID]*watcher 109 } 110 111 // Watch creates a new watcher in the stream and returns its WatchID. 112 func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) { 113 // prevent wrong range where key >= end lexicographically 114 // watch request with 'WithFromKey' has empty-byte range end 115 if len(end) != 0 && bytes.Compare(key, end) != -1 { 116 return -1, ErrEmptyWatcherRange 117 } 118 119 ws.mu.Lock() 120 defer ws.mu.Unlock() 121 if ws.closed { 122 return -1, ErrEmptyWatcherRange 123 } 124 125 if id == clientv3.AutoWatchID { 126 for ws.watchers[ws.nextID] != nil { 127 ws.nextID++ 128 } 129 id = ws.nextID 130 ws.nextID++ 131 } else if _, ok := ws.watchers[id]; ok { 132 return -1, ErrWatcherDuplicateID 133 } 134 135 w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...) 136 137 ws.cancels[id] = c 138 ws.watchers[id] = w 139 return id, nil 140 } 141 142 func (ws *watchStream) Chan() <-chan WatchResponse { 143 return ws.ch 144 } 145 146 func (ws *watchStream) Cancel(id WatchID) error { 147 ws.mu.Lock() 148 cancel, ok := ws.cancels[id] 149 w := ws.watchers[id] 150 ok = ok && !ws.closed 151 ws.mu.Unlock() 152 153 if !ok { 154 return ErrWatcherNotExist 155 } 156 cancel() 157 158 ws.mu.Lock() 159 // The watch isn't removed until cancel so that if Close() is called, 160 // it will wait for the cancel. Otherwise, Close() could close the 161 // watch channel while the store is still posting events. 162 if ww := ws.watchers[id]; ww == w { 163 delete(ws.cancels, id) 164 delete(ws.watchers, id) 165 } 166 ws.mu.Unlock() 167 168 return nil 169 } 170 171 func (ws *watchStream) Close() { 172 ws.mu.Lock() 173 defer ws.mu.Unlock() 174 175 for _, cancel := range ws.cancels { 176 cancel() 177 } 178 ws.closed = true 179 close(ws.ch) 180 watchStreamGauge.Dec() 181 } 182 183 func (ws *watchStream) Rev() int64 { 184 ws.mu.Lock() 185 defer ws.mu.Unlock() 186 return ws.watchable.rev() 187 } 188 189 func (ws *watchStream) RequestProgress(id WatchID) { 190 ws.mu.Lock() 191 w, ok := ws.watchers[id] 192 ws.mu.Unlock() 193 if !ok { 194 return 195 } 196 ws.watchable.progress(w) 197 } 198 199 func (ws *watchStream) RequestProgressAll() bool { 200 ws.mu.Lock() 201 defer ws.mu.Unlock() 202 return ws.watchable.progressAll(ws.watchers) 203 } 204