...

Source file src/go.etcd.io/etcd/client/v3/watch.go

Documentation: go.etcd.io/etcd/client/v3

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package clientv3
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"sync"
    22  	"time"
    23  
    24  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    25  	"go.etcd.io/etcd/api/v3/mvccpb"
    26  	v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
    27  
    28  	"go.uber.org/zap"
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/codes"
    31  	"google.golang.org/grpc/metadata"
    32  	"google.golang.org/grpc/status"
    33  )
    34  
    35  const (
    36  	EventTypeDelete = mvccpb.DELETE
    37  	EventTypePut    = mvccpb.PUT
    38  
    39  	closeSendErrTimeout = 250 * time.Millisecond
    40  
    41  	// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
    42  	// user-provided ID is available. If pass, an ID will automatically be assigned.
    43  	AutoWatchID = 0
    44  
    45  	// InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch.
    46  	InvalidWatchID = -1
    47  )
    48  
    49  type Event mvccpb.Event
    50  
    51  type WatchChan <-chan WatchResponse
    52  
    53  type Watcher interface {
    54  	// Watch watches on a key or prefix. The watched events will be returned
    55  	// through the returned channel. If revisions waiting to be sent over the
    56  	// watch are compacted, then the watch will be canceled by the server, the
    57  	// client will post a compacted error watch response, and the channel will close.
    58  	// If the requested revision is 0 or unspecified, the returned channel will
    59  	// return watch events that happen after the server receives the watch request.
    60  	// If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
    61  	// and "WatchResponse" from this closed channel has zero events and nil "Err()".
    62  	// The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
    63  	// to release the associated resources.
    64  	//
    65  	// If the context is "context.Background/TODO", returned "WatchChan" will
    66  	// not be closed and block until event is triggered, except when server
    67  	// returns a non-recoverable error (e.g. ErrCompacted).
    68  	// For example, when context passed with "WithRequireLeader" and the
    69  	// connected server has no leader (e.g. due to network partition),
    70  	// error "etcdserver: no leader" (ErrNoLeader) will be returned,
    71  	// and then "WatchChan" is closed with non-nil "Err()".
    72  	// In order to prevent a watch stream being stuck in a partitioned node,
    73  	// make sure to wrap context with "WithRequireLeader".
    74  	//
    75  	// Otherwise, as long as the context has not been canceled or timed out,
    76  	// watch will retry on other recoverable errors forever until reconnected.
    77  	//
    78  	// TODO: explicitly set context error in the last "WatchResponse" message and close channel?
    79  	// Currently, client contexts are overwritten with "valCtx" that never closes.
    80  	// TODO(v3.4): configure watch retry policy, limit maximum retry number
    81  	// (see https://github.com/etcd-io/etcd/issues/8980)
    82  	Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
    83  
    84  	// RequestProgress requests a progress notify response be sent in all watch channels.
    85  	RequestProgress(ctx context.Context) error
    86  
    87  	// Close closes the watcher and cancels all watch requests.
    88  	Close() error
    89  }
    90  
    91  type WatchResponse struct {
    92  	Header pb.ResponseHeader
    93  	Events []*Event
    94  
    95  	// CompactRevision is the minimum revision the watcher may receive.
    96  	CompactRevision int64
    97  
    98  	// Canceled is used to indicate watch failure.
    99  	// If the watch failed and the stream was about to close, before the channel is closed,
   100  	// the channel sends a final response that has Canceled set to true with a non-nil Err().
   101  	Canceled bool
   102  
   103  	// Created is used to indicate the creation of the watcher.
   104  	Created bool
   105  
   106  	closeErr error
   107  
   108  	// cancelReason is a reason of canceling watch
   109  	cancelReason string
   110  }
   111  
   112  // IsCreate returns true if the event tells that the key is newly created.
   113  func (e *Event) IsCreate() bool {
   114  	return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
   115  }
   116  
   117  // IsModify returns true if the event tells that a new value is put on existing key.
   118  func (e *Event) IsModify() bool {
   119  	return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
   120  }
   121  
   122  // Err is the error value if this WatchResponse holds an error.
   123  func (wr *WatchResponse) Err() error {
   124  	switch {
   125  	case wr.closeErr != nil:
   126  		return v3rpc.Error(wr.closeErr)
   127  	case wr.CompactRevision != 0:
   128  		return v3rpc.ErrCompacted
   129  	case wr.Canceled:
   130  		if len(wr.cancelReason) != 0 {
   131  			return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
   132  		}
   133  		return v3rpc.ErrFutureRev
   134  	}
   135  	return nil
   136  }
   137  
   138  // IsProgressNotify returns true if the WatchResponse is progress notification.
   139  func (wr *WatchResponse) IsProgressNotify() bool {
   140  	return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
   141  }
   142  
   143  // watcher implements the Watcher interface
   144  type watcher struct {
   145  	remote   pb.WatchClient
   146  	callOpts []grpc.CallOption
   147  
   148  	// mu protects the grpc streams map
   149  	mu sync.Mutex
   150  
   151  	// streams holds all the active grpc streams keyed by ctx value.
   152  	streams map[string]*watchGrpcStream
   153  	lg      *zap.Logger
   154  }
   155  
   156  // watchGrpcStream tracks all watch resources attached to a single grpc stream.
   157  type watchGrpcStream struct {
   158  	owner    *watcher
   159  	remote   pb.WatchClient
   160  	callOpts []grpc.CallOption
   161  
   162  	// ctx controls internal remote.Watch requests
   163  	ctx context.Context
   164  	// ctxKey is the key used when looking up this stream's context
   165  	ctxKey string
   166  	cancel context.CancelFunc
   167  
   168  	// substreams holds all active watchers on this grpc stream
   169  	substreams map[int64]*watcherStream
   170  	// resuming holds all resuming watchers on this grpc stream
   171  	resuming []*watcherStream
   172  
   173  	// reqc sends a watch request from Watch() to the main goroutine
   174  	reqc chan watchStreamRequest
   175  	// respc receives data from the watch client
   176  	respc chan *pb.WatchResponse
   177  	// donec closes to broadcast shutdown
   178  	donec chan struct{}
   179  	// errc transmits errors from grpc Recv to the watch stream reconnect logic
   180  	errc chan error
   181  	// closingc gets the watcherStream of closing watchers
   182  	closingc chan *watcherStream
   183  	// wg is Done when all substream goroutines have exited
   184  	wg sync.WaitGroup
   185  
   186  	// resumec closes to signal that all substreams should begin resuming
   187  	resumec chan struct{}
   188  	// closeErr is the error that closed the watch stream
   189  	closeErr error
   190  
   191  	lg *zap.Logger
   192  }
   193  
   194  // watchStreamRequest is a union of the supported watch request operation types
   195  type watchStreamRequest interface {
   196  	toPB() *pb.WatchRequest
   197  }
   198  
   199  // watchRequest is issued by the subscriber to start a new watcher
   200  type watchRequest struct {
   201  	ctx context.Context
   202  	key string
   203  	end string
   204  	rev int64
   205  
   206  	// send created notification event if this field is true
   207  	createdNotify bool
   208  	// progressNotify is for progress updates
   209  	progressNotify bool
   210  	// fragmentation should be disabled by default
   211  	// if true, split watch events when total exceeds
   212  	// "--max-request-bytes" flag value + 512-byte
   213  	fragment bool
   214  
   215  	// filters is the list of events to filter out
   216  	filters []pb.WatchCreateRequest_FilterType
   217  	// get the previous key-value pair before the event happens
   218  	prevKV bool
   219  	// retc receives a chan WatchResponse once the watcher is established
   220  	retc chan chan WatchResponse
   221  }
   222  
   223  // progressRequest is issued by the subscriber to request watch progress
   224  type progressRequest struct {
   225  }
   226  
   227  // watcherStream represents a registered watcher
   228  type watcherStream struct {
   229  	// initReq is the request that initiated this request
   230  	initReq watchRequest
   231  
   232  	// outc publishes watch responses to subscriber
   233  	outc chan WatchResponse
   234  	// recvc buffers watch responses before publishing
   235  	recvc chan *WatchResponse
   236  	// donec closes when the watcherStream goroutine stops.
   237  	donec chan struct{}
   238  	// closing is set to true when stream should be scheduled to shutdown.
   239  	closing bool
   240  	// id is the registered watch id on the grpc stream
   241  	id int64
   242  
   243  	// buf holds all events received from etcd but not yet consumed by the client
   244  	buf []*WatchResponse
   245  }
   246  
   247  func NewWatcher(c *Client) Watcher {
   248  	return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
   249  }
   250  
   251  func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
   252  	w := &watcher{
   253  		remote:  wc,
   254  		streams: make(map[string]*watchGrpcStream),
   255  	}
   256  	if c != nil {
   257  		w.callOpts = c.callOpts
   258  		w.lg = c.lg
   259  	}
   260  	return w
   261  }
   262  
   263  // never closes
   264  var valCtxCh = make(chan struct{})
   265  var zeroTime = time.Unix(0, 0)
   266  
   267  // ctx with only the values; never Done
   268  type valCtx struct{ context.Context }
   269  
   270  func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
   271  func (vc *valCtx) Done() <-chan struct{}       { return valCtxCh }
   272  func (vc *valCtx) Err() error                  { return nil }
   273  
   274  func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
   275  	ctx, cancel := context.WithCancel(&valCtx{inctx})
   276  	wgs := &watchGrpcStream{
   277  		owner:      w,
   278  		remote:     w.remote,
   279  		callOpts:   w.callOpts,
   280  		ctx:        ctx,
   281  		ctxKey:     streamKeyFromCtx(inctx),
   282  		cancel:     cancel,
   283  		substreams: make(map[int64]*watcherStream),
   284  		respc:      make(chan *pb.WatchResponse),
   285  		reqc:       make(chan watchStreamRequest),
   286  		donec:      make(chan struct{}),
   287  		errc:       make(chan error, 1),
   288  		closingc:   make(chan *watcherStream),
   289  		resumec:    make(chan struct{}),
   290  		lg:         w.lg,
   291  	}
   292  	go wgs.run()
   293  	return wgs
   294  }
   295  
   296  // Watch posts a watch request to run() and waits for a new watcher channel
   297  func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
   298  	ow := opWatch(key, opts...)
   299  
   300  	var filters []pb.WatchCreateRequest_FilterType
   301  	if ow.filterPut {
   302  		filters = append(filters, pb.WatchCreateRequest_NOPUT)
   303  	}
   304  	if ow.filterDelete {
   305  		filters = append(filters, pb.WatchCreateRequest_NODELETE)
   306  	}
   307  
   308  	wr := &watchRequest{
   309  		ctx:            ctx,
   310  		createdNotify:  ow.createdNotify,
   311  		key:            string(ow.key),
   312  		end:            string(ow.end),
   313  		rev:            ow.rev,
   314  		progressNotify: ow.progressNotify,
   315  		fragment:       ow.fragment,
   316  		filters:        filters,
   317  		prevKV:         ow.prevKV,
   318  		retc:           make(chan chan WatchResponse, 1),
   319  	}
   320  
   321  	ok := false
   322  	ctxKey := streamKeyFromCtx(ctx)
   323  
   324  	var closeCh chan WatchResponse
   325  	for {
   326  		// find or allocate appropriate grpc watch stream
   327  		w.mu.Lock()
   328  		if w.streams == nil {
   329  			// closed
   330  			w.mu.Unlock()
   331  			ch := make(chan WatchResponse)
   332  			close(ch)
   333  			return ch
   334  		}
   335  		wgs := w.streams[ctxKey]
   336  		if wgs == nil {
   337  			wgs = w.newWatcherGrpcStream(ctx)
   338  			w.streams[ctxKey] = wgs
   339  		}
   340  		donec := wgs.donec
   341  		reqc := wgs.reqc
   342  		w.mu.Unlock()
   343  
   344  		// couldn't create channel; return closed channel
   345  		if closeCh == nil {
   346  			closeCh = make(chan WatchResponse, 1)
   347  		}
   348  
   349  		// submit request
   350  		select {
   351  		case reqc <- wr:
   352  			ok = true
   353  		case <-wr.ctx.Done():
   354  			ok = false
   355  		case <-donec:
   356  			ok = false
   357  			if wgs.closeErr != nil {
   358  				closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
   359  				break
   360  			}
   361  			// retry; may have dropped stream from no ctxs
   362  			continue
   363  		}
   364  
   365  		// receive channel
   366  		if ok {
   367  			select {
   368  			case ret := <-wr.retc:
   369  				return ret
   370  			case <-ctx.Done():
   371  			case <-donec:
   372  				if wgs.closeErr != nil {
   373  					closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
   374  					break
   375  				}
   376  				// retry; may have dropped stream from no ctxs
   377  				continue
   378  			}
   379  		}
   380  		break
   381  	}
   382  
   383  	close(closeCh)
   384  	return closeCh
   385  }
   386  
   387  func (w *watcher) Close() (err error) {
   388  	w.mu.Lock()
   389  	streams := w.streams
   390  	w.streams = nil
   391  	w.mu.Unlock()
   392  	for _, wgs := range streams {
   393  		if werr := wgs.close(); werr != nil {
   394  			err = werr
   395  		}
   396  	}
   397  	// Consider context.Canceled as a successful close
   398  	if err == context.Canceled {
   399  		err = nil
   400  	}
   401  	return err
   402  }
   403  
   404  // RequestProgress requests a progress notify response be sent in all watch channels.
   405  func (w *watcher) RequestProgress(ctx context.Context) (err error) {
   406  	ctxKey := streamKeyFromCtx(ctx)
   407  
   408  	w.mu.Lock()
   409  	if w.streams == nil {
   410  		w.mu.Unlock()
   411  		return fmt.Errorf("no stream found for context")
   412  	}
   413  	wgs := w.streams[ctxKey]
   414  	if wgs == nil {
   415  		wgs = w.newWatcherGrpcStream(ctx)
   416  		w.streams[ctxKey] = wgs
   417  	}
   418  	donec := wgs.donec
   419  	reqc := wgs.reqc
   420  	w.mu.Unlock()
   421  
   422  	pr := &progressRequest{}
   423  
   424  	select {
   425  	case reqc <- pr:
   426  		return nil
   427  	case <-ctx.Done():
   428  		return ctx.Err()
   429  	case <-donec:
   430  		if wgs.closeErr != nil {
   431  			return wgs.closeErr
   432  		}
   433  		// retry; may have dropped stream from no ctxs
   434  		return w.RequestProgress(ctx)
   435  	}
   436  }
   437  
   438  func (w *watchGrpcStream) close() (err error) {
   439  	w.cancel()
   440  	<-w.donec
   441  	select {
   442  	case err = <-w.errc:
   443  	default:
   444  	}
   445  	return toErr(w.ctx, err)
   446  }
   447  
   448  func (w *watcher) closeStream(wgs *watchGrpcStream) {
   449  	w.mu.Lock()
   450  	close(wgs.donec)
   451  	wgs.cancel()
   452  	if w.streams != nil {
   453  		delete(w.streams, wgs.ctxKey)
   454  	}
   455  	w.mu.Unlock()
   456  }
   457  
   458  func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
   459  	// check watch ID for backward compatibility (<= v3.3)
   460  	if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
   461  		w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
   462  		// failed; no channel
   463  		close(ws.recvc)
   464  		return
   465  	}
   466  	ws.id = resp.WatchId
   467  	w.substreams[ws.id] = ws
   468  }
   469  
   470  func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
   471  	select {
   472  	case ws.outc <- *resp:
   473  	case <-ws.initReq.ctx.Done():
   474  	case <-time.After(closeSendErrTimeout):
   475  	}
   476  	close(ws.outc)
   477  }
   478  
   479  func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
   480  	// send channel response in case stream was never established
   481  	select {
   482  	case ws.initReq.retc <- ws.outc:
   483  	default:
   484  	}
   485  	// close subscriber's channel
   486  	if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
   487  		go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
   488  	} else if ws.outc != nil {
   489  		close(ws.outc)
   490  	}
   491  	if ws.id != InvalidWatchID {
   492  		delete(w.substreams, ws.id)
   493  		return
   494  	}
   495  	for i := range w.resuming {
   496  		if w.resuming[i] == ws {
   497  			w.resuming[i] = nil
   498  			return
   499  		}
   500  	}
   501  }
   502  
   503  // run is the root of the goroutines for managing a watcher client
   504  func (w *watchGrpcStream) run() {
   505  	var wc pb.Watch_WatchClient
   506  	var closeErr error
   507  
   508  	// substreams marked to close but goroutine still running; needed for
   509  	// avoiding double-closing recvc on grpc stream teardown
   510  	closing := make(map[*watcherStream]struct{})
   511  
   512  	defer func() {
   513  		w.closeErr = closeErr
   514  		// shutdown substreams and resuming substreams
   515  		for _, ws := range w.substreams {
   516  			if _, ok := closing[ws]; !ok {
   517  				close(ws.recvc)
   518  				closing[ws] = struct{}{}
   519  			}
   520  		}
   521  		for _, ws := range w.resuming {
   522  			if _, ok := closing[ws]; ws != nil && !ok {
   523  				close(ws.recvc)
   524  				closing[ws] = struct{}{}
   525  			}
   526  		}
   527  		w.joinSubstreams()
   528  		for range closing {
   529  			w.closeSubstream(<-w.closingc)
   530  		}
   531  		w.wg.Wait()
   532  		w.owner.closeStream(w)
   533  	}()
   534  
   535  	// start a stream with the etcd grpc server
   536  	if wc, closeErr = w.newWatchClient(); closeErr != nil {
   537  		return
   538  	}
   539  
   540  	cancelSet := make(map[int64]struct{})
   541  
   542  	var cur *pb.WatchResponse
   543  	backoff := time.Millisecond
   544  	for {
   545  		select {
   546  		// Watch() requested
   547  		case req := <-w.reqc:
   548  			switch wreq := req.(type) {
   549  			case *watchRequest:
   550  				outc := make(chan WatchResponse, 1)
   551  				// TODO: pass custom watch ID?
   552  				ws := &watcherStream{
   553  					initReq: *wreq,
   554  					id:      InvalidWatchID,
   555  					outc:    outc,
   556  					// unbuffered so resumes won't cause repeat events
   557  					recvc: make(chan *WatchResponse),
   558  				}
   559  
   560  				ws.donec = make(chan struct{})
   561  				w.wg.Add(1)
   562  				go w.serveSubstream(ws, w.resumec)
   563  
   564  				// queue up for watcher creation/resume
   565  				w.resuming = append(w.resuming, ws)
   566  				if len(w.resuming) == 1 {
   567  					// head of resume queue, can register a new watcher
   568  					if err := wc.Send(ws.initReq.toPB()); err != nil {
   569  						w.lg.Debug("error when sending request", zap.Error(err))
   570  					}
   571  				}
   572  			case *progressRequest:
   573  				if err := wc.Send(wreq.toPB()); err != nil {
   574  					w.lg.Debug("error when sending request", zap.Error(err))
   575  				}
   576  			}
   577  
   578  		// new events from the watch client
   579  		case pbresp := <-w.respc:
   580  			if cur == nil || pbresp.Created || pbresp.Canceled {
   581  				cur = pbresp
   582  			} else if cur != nil && cur.WatchId == pbresp.WatchId {
   583  				// merge new events
   584  				cur.Events = append(cur.Events, pbresp.Events...)
   585  				// update "Fragment" field; last response with "Fragment" == false
   586  				cur.Fragment = pbresp.Fragment
   587  			}
   588  
   589  			switch {
   590  			case pbresp.Created:
   591  				// response to head of queue creation
   592  				if len(w.resuming) != 0 {
   593  					if ws := w.resuming[0]; ws != nil {
   594  						w.addSubstream(pbresp, ws)
   595  						w.dispatchEvent(pbresp)
   596  						w.resuming[0] = nil
   597  					}
   598  				}
   599  
   600  				if ws := w.nextResume(); ws != nil {
   601  					if err := wc.Send(ws.initReq.toPB()); err != nil {
   602  						w.lg.Debug("error when sending request", zap.Error(err))
   603  					}
   604  				}
   605  
   606  				// reset for next iteration
   607  				cur = nil
   608  
   609  			case pbresp.Canceled && pbresp.CompactRevision == 0:
   610  				delete(cancelSet, pbresp.WatchId)
   611  				if ws, ok := w.substreams[pbresp.WatchId]; ok {
   612  					// signal to stream goroutine to update closingc
   613  					close(ws.recvc)
   614  					closing[ws] = struct{}{}
   615  				}
   616  
   617  				// reset for next iteration
   618  				cur = nil
   619  
   620  			case cur.Fragment:
   621  				// watch response events are still fragmented
   622  				// continue to fetch next fragmented event arrival
   623  				continue
   624  
   625  			default:
   626  				// dispatch to appropriate watch stream
   627  				ok := w.dispatchEvent(cur)
   628  
   629  				// reset for next iteration
   630  				cur = nil
   631  
   632  				if ok {
   633  					break
   634  				}
   635  
   636  				// watch response on unexpected watch id; cancel id
   637  				if _, ok := cancelSet[pbresp.WatchId]; ok {
   638  					break
   639  				}
   640  
   641  				cancelSet[pbresp.WatchId] = struct{}{}
   642  				cr := &pb.WatchRequest_CancelRequest{
   643  					CancelRequest: &pb.WatchCancelRequest{
   644  						WatchId: pbresp.WatchId,
   645  					},
   646  				}
   647  				req := &pb.WatchRequest{RequestUnion: cr}
   648  				w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
   649  				if err := wc.Send(req); err != nil {
   650  					w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
   651  				}
   652  			}
   653  
   654  		// watch client failed on Recv; spawn another if possible
   655  		case err := <-w.errc:
   656  			if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
   657  				closeErr = err
   658  				return
   659  			}
   660  			backoff = w.backoffIfUnavailable(backoff, err)
   661  			if wc, closeErr = w.newWatchClient(); closeErr != nil {
   662  				return
   663  			}
   664  			if ws := w.nextResume(); ws != nil {
   665  				if err := wc.Send(ws.initReq.toPB()); err != nil {
   666  					w.lg.Debug("error when sending request", zap.Error(err))
   667  				}
   668  			}
   669  			cancelSet = make(map[int64]struct{})
   670  
   671  		case <-w.ctx.Done():
   672  			return
   673  
   674  		case ws := <-w.closingc:
   675  			w.closeSubstream(ws)
   676  			delete(closing, ws)
   677  			// no more watchers on this stream, shutdown, skip cancellation
   678  			if len(w.substreams)+len(w.resuming) == 0 {
   679  				return
   680  			}
   681  			if ws.id != InvalidWatchID {
   682  				// client is closing an established watch; close it on the server proactively instead of waiting
   683  				// to close when the next message arrives
   684  				cancelSet[ws.id] = struct{}{}
   685  				cr := &pb.WatchRequest_CancelRequest{
   686  					CancelRequest: &pb.WatchCancelRequest{
   687  						WatchId: ws.id,
   688  					},
   689  				}
   690  				req := &pb.WatchRequest{RequestUnion: cr}
   691  				w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
   692  				if err := wc.Send(req); err != nil {
   693  					w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
   694  				}
   695  			}
   696  		}
   697  	}
   698  }
   699  
   700  // nextResume chooses the next resuming to register with the grpc stream. Abandoned
   701  // streams are marked as nil in the queue since the head must wait for its inflight registration.
   702  func (w *watchGrpcStream) nextResume() *watcherStream {
   703  	for len(w.resuming) != 0 {
   704  		if w.resuming[0] != nil {
   705  			return w.resuming[0]
   706  		}
   707  		w.resuming = w.resuming[1:len(w.resuming)]
   708  	}
   709  	return nil
   710  }
   711  
   712  // dispatchEvent sends a WatchResponse to the appropriate watcher stream
   713  func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
   714  	events := make([]*Event, len(pbresp.Events))
   715  	for i, ev := range pbresp.Events {
   716  		events[i] = (*Event)(ev)
   717  	}
   718  	// TODO: return watch ID?
   719  	wr := &WatchResponse{
   720  		Header:          *pbresp.Header,
   721  		Events:          events,
   722  		CompactRevision: pbresp.CompactRevision,
   723  		Created:         pbresp.Created,
   724  		Canceled:        pbresp.Canceled,
   725  		cancelReason:    pbresp.CancelReason,
   726  	}
   727  
   728  	// watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to
   729  	// indicate they should be broadcast.
   730  	if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID {
   731  		return w.broadcastResponse(wr)
   732  	}
   733  
   734  	return w.unicastResponse(wr, pbresp.WatchId)
   735  
   736  }
   737  
   738  // broadcastResponse send a watch response to all watch substreams.
   739  func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
   740  	for _, ws := range w.substreams {
   741  		select {
   742  		case ws.recvc <- wr:
   743  		case <-ws.donec:
   744  		}
   745  	}
   746  	return true
   747  }
   748  
   749  // unicastResponse sends a watch response to a specific watch substream.
   750  func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
   751  	ws, ok := w.substreams[watchId]
   752  	if !ok {
   753  		return false
   754  	}
   755  	select {
   756  	case ws.recvc <- wr:
   757  	case <-ws.donec:
   758  		return false
   759  	}
   760  	return true
   761  }
   762  
   763  // serveWatchClient forwards messages from the grpc stream to run()
   764  func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
   765  	for {
   766  		resp, err := wc.Recv()
   767  		if err != nil {
   768  			select {
   769  			case w.errc <- err:
   770  			case <-w.donec:
   771  			}
   772  			return
   773  		}
   774  		select {
   775  		case w.respc <- resp:
   776  		case <-w.donec:
   777  			return
   778  		}
   779  	}
   780  }
   781  
   782  // serveSubstream forwards watch responses from run() to the subscriber
   783  func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
   784  	if ws.closing {
   785  		panic("created substream goroutine but substream is closing")
   786  	}
   787  
   788  	// nextRev is the minimum expected next revision
   789  	nextRev := ws.initReq.rev
   790  	resuming := false
   791  	defer func() {
   792  		if !resuming {
   793  			ws.closing = true
   794  		}
   795  		close(ws.donec)
   796  		if !resuming {
   797  			w.closingc <- ws
   798  		}
   799  		w.wg.Done()
   800  	}()
   801  
   802  	emptyWr := &WatchResponse{}
   803  	for {
   804  		curWr := emptyWr
   805  		outc := ws.outc
   806  
   807  		if len(ws.buf) > 0 {
   808  			curWr = ws.buf[0]
   809  		} else {
   810  			outc = nil
   811  		}
   812  		select {
   813  		case outc <- *curWr:
   814  			if ws.buf[0].Err() != nil {
   815  				return
   816  			}
   817  			ws.buf[0] = nil
   818  			ws.buf = ws.buf[1:]
   819  		case wr, ok := <-ws.recvc:
   820  			if !ok {
   821  				// shutdown from closeSubstream
   822  				return
   823  			}
   824  
   825  			if wr.Created {
   826  				if ws.initReq.retc != nil {
   827  					ws.initReq.retc <- ws.outc
   828  					// to prevent next write from taking the slot in buffered channel
   829  					// and posting duplicate create events
   830  					ws.initReq.retc = nil
   831  
   832  					// send first creation event only if requested
   833  					if ws.initReq.createdNotify {
   834  						ws.outc <- *wr
   835  					}
   836  					// once the watch channel is returned, a current revision
   837  					// watch must resume at the store revision. This is necessary
   838  					// for the following case to work as expected:
   839  					//	wch := m1.Watch("a")
   840  					//	m2.Put("a", "b")
   841  					//	<-wch
   842  					// If the revision is only bound on the first observed event,
   843  					// if wch is disconnected before the Put is issued, then reconnects
   844  					// after it is committed, it'll miss the Put.
   845  					if ws.initReq.rev == 0 {
   846  						nextRev = wr.Header.Revision
   847  					}
   848  				}
   849  			} else {
   850  				// current progress of watch; <= store revision
   851  				nextRev = wr.Header.Revision + 1
   852  			}
   853  
   854  			if len(wr.Events) > 0 {
   855  				nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
   856  			}
   857  			ws.initReq.rev = nextRev
   858  
   859  			// created event is already sent above,
   860  			// watcher should not post duplicate events
   861  			if wr.Created {
   862  				continue
   863  			}
   864  
   865  			// TODO pause channel if buffer gets too large
   866  			ws.buf = append(ws.buf, wr)
   867  		case <-w.ctx.Done():
   868  			return
   869  		case <-ws.initReq.ctx.Done():
   870  			return
   871  		case <-resumec:
   872  			resuming = true
   873  			return
   874  		}
   875  	}
   876  	// lazily send cancel message if events on missing id
   877  }
   878  
   879  func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
   880  	// mark all substreams as resuming
   881  	close(w.resumec)
   882  	w.resumec = make(chan struct{})
   883  	w.joinSubstreams()
   884  	for _, ws := range w.substreams {
   885  		ws.id = InvalidWatchID
   886  		w.resuming = append(w.resuming, ws)
   887  	}
   888  	// strip out nils, if any
   889  	var resuming []*watcherStream
   890  	for _, ws := range w.resuming {
   891  		if ws != nil {
   892  			resuming = append(resuming, ws)
   893  		}
   894  	}
   895  	w.resuming = resuming
   896  	w.substreams = make(map[int64]*watcherStream)
   897  
   898  	// connect to grpc stream while accepting watcher cancelation
   899  	stopc := make(chan struct{})
   900  	donec := w.waitCancelSubstreams(stopc)
   901  	wc, err := w.openWatchClient()
   902  	close(stopc)
   903  	<-donec
   904  
   905  	// serve all non-closing streams, even if there's a client error
   906  	// so that the teardown path can shutdown the streams as expected.
   907  	for _, ws := range w.resuming {
   908  		if ws.closing {
   909  			continue
   910  		}
   911  		ws.donec = make(chan struct{})
   912  		w.wg.Add(1)
   913  		go w.serveSubstream(ws, w.resumec)
   914  	}
   915  
   916  	if err != nil {
   917  		return nil, v3rpc.Error(err)
   918  	}
   919  
   920  	// receive data from new grpc stream
   921  	go w.serveWatchClient(wc)
   922  	return wc, nil
   923  }
   924  
   925  func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
   926  	var wg sync.WaitGroup
   927  	wg.Add(len(w.resuming))
   928  	donec := make(chan struct{})
   929  	for i := range w.resuming {
   930  		go func(ws *watcherStream) {
   931  			defer wg.Done()
   932  			if ws.closing {
   933  				if ws.initReq.ctx.Err() != nil && ws.outc != nil {
   934  					close(ws.outc)
   935  					ws.outc = nil
   936  				}
   937  				return
   938  			}
   939  			select {
   940  			case <-ws.initReq.ctx.Done():
   941  				// closed ws will be removed from resuming
   942  				ws.closing = true
   943  				close(ws.outc)
   944  				ws.outc = nil
   945  				w.wg.Add(1)
   946  				go func() {
   947  					defer w.wg.Done()
   948  					w.closingc <- ws
   949  				}()
   950  			case <-stopc:
   951  			}
   952  		}(w.resuming[i])
   953  	}
   954  	go func() {
   955  		defer close(donec)
   956  		wg.Wait()
   957  	}()
   958  	return donec
   959  }
   960  
   961  // joinSubstreams waits for all substream goroutines to complete.
   962  func (w *watchGrpcStream) joinSubstreams() {
   963  	for _, ws := range w.substreams {
   964  		<-ws.donec
   965  	}
   966  	for _, ws := range w.resuming {
   967  		if ws != nil {
   968  			<-ws.donec
   969  		}
   970  	}
   971  }
   972  
   973  var maxBackoff = 100 * time.Millisecond
   974  
   975  func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error) time.Duration {
   976  	if isUnavailableErr(w.ctx, err) {
   977  		// retry, but backoff
   978  		if backoff < maxBackoff {
   979  			// 25% backoff factor
   980  			backoff = backoff + backoff/4
   981  			if backoff > maxBackoff {
   982  				backoff = maxBackoff
   983  			}
   984  		}
   985  		time.Sleep(backoff)
   986  	}
   987  	return backoff
   988  }
   989  
   990  // openWatchClient retries opening a watch client until success or halt.
   991  // manually retry in case "ws==nil && err==nil"
   992  // TODO: remove FailFast=false
   993  func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
   994  	backoff := time.Millisecond
   995  	for {
   996  		select {
   997  		case <-w.ctx.Done():
   998  			if err == nil {
   999  				return nil, w.ctx.Err()
  1000  			}
  1001  			return nil, err
  1002  		default:
  1003  		}
  1004  		if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
  1005  			break
  1006  		}
  1007  		if isHaltErr(w.ctx, err) {
  1008  			return nil, v3rpc.Error(err)
  1009  		}
  1010  		backoff = w.backoffIfUnavailable(backoff, err)
  1011  	}
  1012  	return ws, nil
  1013  }
  1014  
  1015  // toPB converts an internal watch request structure to its protobuf WatchRequest structure.
  1016  func (wr *watchRequest) toPB() *pb.WatchRequest {
  1017  	req := &pb.WatchCreateRequest{
  1018  		StartRevision:  wr.rev,
  1019  		Key:            []byte(wr.key),
  1020  		RangeEnd:       []byte(wr.end),
  1021  		ProgressNotify: wr.progressNotify,
  1022  		Filters:        wr.filters,
  1023  		PrevKv:         wr.prevKV,
  1024  		Fragment:       wr.fragment,
  1025  	}
  1026  	cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
  1027  	return &pb.WatchRequest{RequestUnion: cr}
  1028  }
  1029  
  1030  // toPB converts an internal progress request structure to its protobuf WatchRequest structure.
  1031  func (pr *progressRequest) toPB() *pb.WatchRequest {
  1032  	req := &pb.WatchProgressRequest{}
  1033  	cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
  1034  	return &pb.WatchRequest{RequestUnion: cr}
  1035  }
  1036  
  1037  func streamKeyFromCtx(ctx context.Context) string {
  1038  	if md, ok := metadata.FromOutgoingContext(ctx); ok {
  1039  		return fmt.Sprintf("%+v", md)
  1040  	}
  1041  	return ""
  1042  }
  1043  

View as plain text