...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc/interceptor.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package v3rpc
    16  
    17  import (
    18  	"context"
    19  	"sync"
    20  	"time"
    21  
    22  	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
    23  	"go.etcd.io/etcd/client/pkg/v3/types"
    24  	"go.etcd.io/etcd/raft/v3"
    25  	"go.etcd.io/etcd/server/v3/etcdserver"
    26  	"go.etcd.io/etcd/server/v3/etcdserver/api"
    27  
    28  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    29  	"go.uber.org/zap"
    30  	"google.golang.org/grpc"
    31  	"google.golang.org/grpc/metadata"
    32  	"google.golang.org/grpc/peer"
    33  )
    34  
    35  const (
    36  	maxNoLeaderCnt          = 3
    37  	warnUnaryRequestLatency = 300 * time.Millisecond
    38  	snapshotMethod          = "/etcdserverpb.Maintenance/Snapshot"
    39  )
    40  
    41  type streamsMap struct {
    42  	mu      sync.Mutex
    43  	streams map[grpc.ServerStream]struct{}
    44  }
    45  
    46  func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
    47  	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    48  		if !api.IsCapabilityEnabled(api.V3rpcCapability) {
    49  			return nil, rpctypes.ErrGRPCNotCapable
    50  		}
    51  
    52  		if s.IsMemberExist(s.ID()) && s.IsLearner() && !isRPCSupportedForLearner(req) {
    53  			return nil, rpctypes.ErrGPRCNotSupportedForLearner
    54  		}
    55  
    56  		md, ok := metadata.FromIncomingContext(ctx)
    57  		if ok {
    58  			ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
    59  			if len(vs) > 0 {
    60  				ver = vs[0]
    61  			}
    62  			clientRequests.WithLabelValues("unary", ver).Inc()
    63  
    64  			if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
    65  				if s.Leader() == types.ID(raft.None) {
    66  					return nil, rpctypes.ErrGRPCNoLeader
    67  				}
    68  			}
    69  		}
    70  
    71  		return handler(ctx, req)
    72  	}
    73  }
    74  
    75  func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
    76  	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    77  		startTime := time.Now()
    78  		resp, err := handler(ctx, req)
    79  		lg := s.Logger()
    80  		if lg != nil { // acquire stats if debug level is enabled or request is expensive
    81  			defer logUnaryRequestStats(ctx, lg, info, startTime, req, resp)
    82  		}
    83  		return resp, err
    84  	}
    85  }
    86  
    87  func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
    88  	duration := time.Since(startTime)
    89  	var enabledDebugLevel, expensiveRequest bool
    90  	if lg.Core().Enabled(zap.DebugLevel) {
    91  		enabledDebugLevel = true
    92  	}
    93  	if duration > warnUnaryRequestLatency {
    94  		expensiveRequest = true
    95  	}
    96  	if !enabledDebugLevel && !expensiveRequest {
    97  		return
    98  	}
    99  	remote := "No remote client info."
   100  	peerInfo, ok := peer.FromContext(ctx)
   101  	if ok {
   102  		remote = peerInfo.Addr.String()
   103  	}
   104  	responseType := info.FullMethod
   105  	var reqCount, respCount int64
   106  	var reqSize, respSize int
   107  	var reqContent string
   108  	switch _resp := resp.(type) {
   109  	case *pb.RangeResponse:
   110  		_req, ok := req.(*pb.RangeRequest)
   111  		if ok {
   112  			reqCount = 0
   113  			reqSize = _req.Size()
   114  			reqContent = _req.String()
   115  		}
   116  		if _resp != nil {
   117  			respCount = _resp.GetCount()
   118  			respSize = _resp.Size()
   119  		}
   120  	case *pb.PutResponse:
   121  		_req, ok := req.(*pb.PutRequest)
   122  		if ok {
   123  			reqCount = 1
   124  			reqSize = _req.Size()
   125  			reqContent = pb.NewLoggablePutRequest(_req).String()
   126  			// redact value field from request content, see PR #9821
   127  		}
   128  		if _resp != nil {
   129  			respCount = 0
   130  			respSize = _resp.Size()
   131  		}
   132  	case *pb.DeleteRangeResponse:
   133  		_req, ok := req.(*pb.DeleteRangeRequest)
   134  		if ok {
   135  			reqCount = 0
   136  			reqSize = _req.Size()
   137  			reqContent = _req.String()
   138  		}
   139  		if _resp != nil {
   140  			respCount = _resp.GetDeleted()
   141  			respSize = _resp.Size()
   142  		}
   143  	case *pb.TxnResponse:
   144  		_req, ok := req.(*pb.TxnRequest)
   145  		if ok && _resp != nil {
   146  			if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
   147  				reqCount = int64(len(_req.GetSuccess()))
   148  				reqSize = 0
   149  				for _, r := range _req.GetSuccess() {
   150  					reqSize += r.Size()
   151  				}
   152  			} else {
   153  				reqCount = int64(len(_req.GetFailure()))
   154  				reqSize = 0
   155  				for _, r := range _req.GetFailure() {
   156  					reqSize += r.Size()
   157  				}
   158  			}
   159  			reqContent = pb.NewLoggableTxnRequest(_req).String()
   160  			// redact value field from request content, see PR #9821
   161  		}
   162  		if _resp != nil {
   163  			respCount = 0
   164  			respSize = _resp.Size()
   165  		}
   166  	default:
   167  		reqCount = -1
   168  		reqSize = -1
   169  		respCount = -1
   170  		respSize = -1
   171  	}
   172  
   173  	if enabledDebugLevel {
   174  		logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
   175  	} else if expensiveRequest {
   176  		logExpensiveRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
   177  	}
   178  }
   179  
   180  func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
   181  	reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
   182  	lg.Debug("request stats",
   183  		zap.Time("start time", startTime),
   184  		zap.Duration("time spent", duration),
   185  		zap.String("remote", remote),
   186  		zap.String("response type", responseType),
   187  		zap.Int64("request count", reqCount),
   188  		zap.Int("request size", reqSize),
   189  		zap.Int64("response count", respCount),
   190  		zap.Int("response size", respSize),
   191  		zap.String("request content", reqContent),
   192  	)
   193  }
   194  
   195  func logExpensiveRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
   196  	reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
   197  	lg.Warn("request stats",
   198  		zap.Time("start time", startTime),
   199  		zap.Duration("time spent", duration),
   200  		zap.String("remote", remote),
   201  		zap.String("response type", responseType),
   202  		zap.Int64("request count", reqCount),
   203  		zap.Int("request size", reqSize),
   204  		zap.Int64("response count", respCount),
   205  		zap.Int("response size", respSize),
   206  		zap.String("request content", reqContent),
   207  	)
   208  }
   209  
   210  func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
   211  	smap := monitorLeader(s)
   212  
   213  	return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
   214  		if !api.IsCapabilityEnabled(api.V3rpcCapability) {
   215  			return rpctypes.ErrGRPCNotCapable
   216  		}
   217  
   218  		if s.IsMemberExist(s.ID()) && s.IsLearner() && info.FullMethod != snapshotMethod { // learner does not support stream RPC except Snapshot
   219  			return rpctypes.ErrGPRCNotSupportedForLearner
   220  		}
   221  
   222  		md, ok := metadata.FromIncomingContext(ss.Context())
   223  		if ok {
   224  			ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
   225  			if len(vs) > 0 {
   226  				ver = vs[0]
   227  			}
   228  			clientRequests.WithLabelValues("stream", ver).Inc()
   229  
   230  			if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
   231  				if s.Leader() == types.ID(raft.None) {
   232  					return rpctypes.ErrGRPCNoLeader
   233  				}
   234  
   235  				ctx := newCancellableContext(ss.Context())
   236  				ss = serverStreamWithCtx{ctx: ctx, ServerStream: ss}
   237  
   238  				smap.mu.Lock()
   239  				smap.streams[ss] = struct{}{}
   240  				smap.mu.Unlock()
   241  
   242  				defer func() {
   243  					smap.mu.Lock()
   244  					delete(smap.streams, ss)
   245  					smap.mu.Unlock()
   246  					// TODO: investigate whether the reason for cancellation here is useful to know
   247  					ctx.Cancel(nil)
   248  				}()
   249  			}
   250  		}
   251  
   252  		return handler(srv, ss)
   253  	}
   254  }
   255  
   256  // cancellableContext wraps a context with new cancellable context that allows a
   257  // specific cancellation error to be preserved and later retrieved using the
   258  // Context.Err() function. This is so downstream context users can disambiguate
   259  // the reason for the cancellation which could be from the client (for example)
   260  // or from this interceptor code.
   261  type cancellableContext struct {
   262  	context.Context
   263  
   264  	lock         sync.RWMutex
   265  	cancel       context.CancelFunc
   266  	cancelReason error
   267  }
   268  
   269  func newCancellableContext(parent context.Context) *cancellableContext {
   270  	ctx, cancel := context.WithCancel(parent)
   271  	return &cancellableContext{
   272  		Context: ctx,
   273  		cancel:  cancel,
   274  	}
   275  }
   276  
   277  // Cancel stores the cancellation reason and then delegates to context.WithCancel
   278  // against the parent context.
   279  func (c *cancellableContext) Cancel(reason error) {
   280  	c.lock.Lock()
   281  	c.cancelReason = reason
   282  	c.lock.Unlock()
   283  	c.cancel()
   284  }
   285  
   286  // Err will return the preserved cancel reason error if present, and will
   287  // otherwise return the underlying error from the parent context.
   288  func (c *cancellableContext) Err() error {
   289  	c.lock.RLock()
   290  	defer c.lock.RUnlock()
   291  	if c.cancelReason != nil {
   292  		return c.cancelReason
   293  	}
   294  	return c.Context.Err()
   295  }
   296  
   297  type serverStreamWithCtx struct {
   298  	grpc.ServerStream
   299  
   300  	// ctx is used so that we can preserve a reason for cancellation.
   301  	ctx *cancellableContext
   302  }
   303  
   304  func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
   305  
   306  func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
   307  	smap := &streamsMap{
   308  		streams: make(map[grpc.ServerStream]struct{}),
   309  	}
   310  
   311  	s.GoAttach(func() {
   312  		election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
   313  		noLeaderCnt := 0
   314  
   315  		for {
   316  			select {
   317  			case <-s.StoppingNotify():
   318  				return
   319  			case <-time.After(election):
   320  				if s.Leader() == types.ID(raft.None) {
   321  					noLeaderCnt++
   322  				} else {
   323  					noLeaderCnt = 0
   324  				}
   325  
   326  				// We are more conservative on canceling existing streams. Reconnecting streams
   327  				// cost much more than just rejecting new requests. So we wait until the member
   328  				// cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
   329  				if noLeaderCnt >= maxNoLeaderCnt {
   330  					smap.mu.Lock()
   331  					for ss := range smap.streams {
   332  						if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
   333  							ssWithCtx.ctx.Cancel(rpctypes.ErrGRPCNoLeader)
   334  							<-ss.Context().Done()
   335  						}
   336  					}
   337  					smap.streams = make(map[grpc.ServerStream]struct{})
   338  					smap.mu.Unlock()
   339  				}
   340  			}
   341  		}
   342  	})
   343  
   344  	return smap
   345  }
   346  

View as plain text