...

Source file src/go.etcd.io/etcd/server/v3/proxy/grpcproxy/watch.go

Documentation: go.etcd.io/etcd/server/v3/proxy/grpcproxy

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package grpcproxy
    16  
    17  import (
    18  	"context"
    19  	"sync"
    20  
    21  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    22  	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
    23  	"go.etcd.io/etcd/client/v3"
    24  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
    25  
    26  	"go.uber.org/zap"
    27  	"google.golang.org/grpc/codes"
    28  	"google.golang.org/grpc/metadata"
    29  	"google.golang.org/grpc/status"
    30  )
    31  
    32  type watchProxy struct {
    33  	cw  clientv3.Watcher
    34  	ctx context.Context
    35  
    36  	leader *leader
    37  
    38  	ranges *watchRanges
    39  
    40  	// mu protects adding outstanding watch servers through wg.
    41  	mu sync.Mutex
    42  
    43  	// wg waits until all outstanding watch servers quit.
    44  	wg sync.WaitGroup
    45  
    46  	// kv is used for permission checking
    47  	kv clientv3.KV
    48  	lg *zap.Logger
    49  }
    50  
    51  func NewWatchProxy(ctx context.Context, lg *zap.Logger, c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
    52  	cctx, cancel := context.WithCancel(ctx)
    53  	wp := &watchProxy{
    54  		cw:     c.Watcher,
    55  		ctx:    cctx,
    56  		leader: newLeader(cctx, c.Watcher),
    57  
    58  		kv: c.KV, // for permission checking
    59  		lg: lg,
    60  	}
    61  	wp.ranges = newWatchRanges(wp)
    62  	ch := make(chan struct{})
    63  	go func() {
    64  		defer close(ch)
    65  		<-wp.leader.stopNotify()
    66  		wp.mu.Lock()
    67  		select {
    68  		case <-wp.ctx.Done():
    69  		case <-wp.leader.disconnectNotify():
    70  			cancel()
    71  		}
    72  		<-wp.ctx.Done()
    73  		wp.mu.Unlock()
    74  		wp.wg.Wait()
    75  		wp.ranges.stop()
    76  	}()
    77  	return wp, ch
    78  }
    79  
    80  func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
    81  	wp.mu.Lock()
    82  	select {
    83  	case <-wp.ctx.Done():
    84  		wp.mu.Unlock()
    85  		select {
    86  		case <-wp.leader.disconnectNotify():
    87  			return status.Error(codes.Canceled, "the client connection is closing")
    88  		default:
    89  			return wp.ctx.Err()
    90  		}
    91  	default:
    92  		wp.wg.Add(1)
    93  	}
    94  	wp.mu.Unlock()
    95  
    96  	ctx, cancel := context.WithCancel(stream.Context())
    97  	wps := &watchProxyStream{
    98  		ranges:   wp.ranges,
    99  		watchers: make(map[int64]*watcher),
   100  		stream:   stream,
   101  		watchCh:  make(chan *pb.WatchResponse, 1024),
   102  		ctx:      ctx,
   103  		cancel:   cancel,
   104  		kv:       wp.kv,
   105  		lg:       wp.lg,
   106  	}
   107  
   108  	var lostLeaderC <-chan struct{}
   109  	if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
   110  		v := md[rpctypes.MetadataRequireLeaderKey]
   111  		if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
   112  			lostLeaderC = wp.leader.lostNotify()
   113  			// if leader is known to be lost at creation time, avoid
   114  			// letting events through at all
   115  			select {
   116  			case <-lostLeaderC:
   117  				wp.wg.Done()
   118  				return rpctypes.ErrNoLeader
   119  			default:
   120  			}
   121  		}
   122  	}
   123  
   124  	// post to stopc => terminate server stream; can't use a waitgroup
   125  	// since all goroutines will only terminate after Watch() exits.
   126  	stopc := make(chan struct{}, 3)
   127  	go func() {
   128  		defer func() { stopc <- struct{}{} }()
   129  		wps.recvLoop()
   130  	}()
   131  	go func() {
   132  		defer func() { stopc <- struct{}{} }()
   133  		wps.sendLoop()
   134  	}()
   135  	// tear down watch if leader goes down or entire watch proxy is terminated
   136  	go func() {
   137  		defer func() { stopc <- struct{}{} }()
   138  		select {
   139  		case <-lostLeaderC:
   140  		case <-ctx.Done():
   141  		case <-wp.ctx.Done():
   142  		}
   143  	}()
   144  
   145  	<-stopc
   146  	cancel()
   147  
   148  	// recv/send may only shutdown after function exits;
   149  	// goroutine notifies proxy that stream is through
   150  	go func() {
   151  		<-stopc
   152  		<-stopc
   153  		wps.close()
   154  		wp.wg.Done()
   155  	}()
   156  
   157  	select {
   158  	case <-lostLeaderC:
   159  		return rpctypes.ErrNoLeader
   160  	case <-wp.leader.disconnectNotify():
   161  		return status.Error(codes.Canceled, "the client connection is closing")
   162  	default:
   163  		return wps.ctx.Err()
   164  	}
   165  }
   166  
   167  // watchProxyStream forwards etcd watch events to a proxied client stream.
   168  type watchProxyStream struct {
   169  	ranges *watchRanges
   170  
   171  	// mu protects watchers and nextWatcherID
   172  	mu sync.Mutex
   173  	// watchers receive events from watch broadcast.
   174  	watchers map[int64]*watcher
   175  	// nextWatcherID is the id to assign the next watcher on this stream.
   176  	nextWatcherID int64
   177  
   178  	stream pb.Watch_WatchServer
   179  
   180  	// watchCh receives watch responses from the watchers.
   181  	watchCh chan *pb.WatchResponse
   182  
   183  	ctx    context.Context
   184  	cancel context.CancelFunc
   185  
   186  	// kv is used for permission checking
   187  	kv clientv3.KV
   188  	lg *zap.Logger
   189  }
   190  
   191  func (wps *watchProxyStream) close() {
   192  	var wg sync.WaitGroup
   193  	wps.cancel()
   194  	wps.mu.Lock()
   195  	wg.Add(len(wps.watchers))
   196  	for _, wpsw := range wps.watchers {
   197  		go func(w *watcher) {
   198  			wps.ranges.delete(w)
   199  			wg.Done()
   200  		}(wpsw)
   201  	}
   202  	wps.watchers = nil
   203  	wps.mu.Unlock()
   204  
   205  	wg.Wait()
   206  
   207  	close(wps.watchCh)
   208  }
   209  
   210  func (wps *watchProxyStream) checkPermissionForWatch(key, rangeEnd []byte) error {
   211  	if len(key) == 0 {
   212  		// If the length of the key is 0, we need to obtain full range.
   213  		// look at clientv3.WithPrefix()
   214  		key = []byte{0}
   215  		rangeEnd = []byte{0}
   216  	}
   217  	req := &pb.RangeRequest{
   218  		Serializable: true,
   219  		Key:          key,
   220  		RangeEnd:     rangeEnd,
   221  		CountOnly:    true,
   222  		Limit:        1,
   223  	}
   224  	_, err := wps.kv.Do(wps.ctx, RangeRequestToOp(req))
   225  	return err
   226  }
   227  
   228  func (wps *watchProxyStream) recvLoop() error {
   229  	for {
   230  		req, err := wps.stream.Recv()
   231  		if err != nil {
   232  			return err
   233  		}
   234  		switch uv := req.RequestUnion.(type) {
   235  		case *pb.WatchRequest_CreateRequest:
   236  			cr := uv.CreateRequest
   237  
   238  			if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
   239  				wps.watchCh <- &pb.WatchResponse{
   240  					Header:       &pb.ResponseHeader{},
   241  					WatchId:      clientv3.InvalidWatchID,
   242  					Created:      true,
   243  					Canceled:     true,
   244  					CancelReason: err.Error(),
   245  				}
   246  				continue
   247  			}
   248  
   249  			wps.mu.Lock()
   250  			w := &watcher{
   251  				wr:  watchRange{string(cr.Key), string(cr.RangeEnd)},
   252  				id:  wps.nextWatcherID,
   253  				wps: wps,
   254  
   255  				nextrev:  cr.StartRevision,
   256  				progress: cr.ProgressNotify,
   257  				prevKV:   cr.PrevKv,
   258  				filters:  v3rpc.FiltersFromRequest(cr),
   259  			}
   260  			if !w.wr.valid() {
   261  				w.post(&pb.WatchResponse{WatchId: clientv3.InvalidWatchID, Created: true, Canceled: true})
   262  				wps.mu.Unlock()
   263  				continue
   264  			}
   265  			wps.nextWatcherID++
   266  			w.nextrev = cr.StartRevision
   267  			wps.watchers[w.id] = w
   268  			wps.ranges.add(w)
   269  			wps.mu.Unlock()
   270  			wps.lg.Debug("create watcher", zap.String("key", w.wr.key), zap.String("end", w.wr.end), zap.Int64("watcherId", wps.nextWatcherID))
   271  		case *pb.WatchRequest_CancelRequest:
   272  			wps.delete(uv.CancelRequest.WatchId)
   273  			wps.lg.Debug("cancel watcher", zap.Int64("watcherId", uv.CancelRequest.WatchId))
   274  		default:
   275  			// Panic or Fatalf would allow to network clients to crash the serve remotely.
   276  			wps.lg.Error("not supported request type by gRPC proxy", zap.Stringer("request", req))
   277  		}
   278  	}
   279  }
   280  
   281  func (wps *watchProxyStream) sendLoop() {
   282  	for {
   283  		select {
   284  		case wresp, ok := <-wps.watchCh:
   285  			if !ok {
   286  				return
   287  			}
   288  			if err := wps.stream.Send(wresp); err != nil {
   289  				return
   290  			}
   291  		case <-wps.ctx.Done():
   292  			return
   293  		}
   294  	}
   295  }
   296  
   297  func (wps *watchProxyStream) delete(id int64) {
   298  	wps.mu.Lock()
   299  	defer wps.mu.Unlock()
   300  
   301  	w, ok := wps.watchers[id]
   302  	if !ok {
   303  		return
   304  	}
   305  	wps.ranges.delete(w)
   306  	delete(wps.watchers, id)
   307  	resp := &pb.WatchResponse{
   308  		Header:   &w.lastHeader,
   309  		WatchId:  id,
   310  		Canceled: true,
   311  	}
   312  	wps.watchCh <- resp
   313  }
   314  

View as plain text