1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v3rpc
16
17 import (
18 "context"
19 "sync"
20 "time"
21
22 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
23 "go.etcd.io/etcd/client/pkg/v3/types"
24 "go.etcd.io/etcd/raft/v3"
25 "go.etcd.io/etcd/server/v3/etcdserver"
26 "go.etcd.io/etcd/server/v3/etcdserver/api"
27
28 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
29 "go.uber.org/zap"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/metadata"
32 "google.golang.org/grpc/peer"
33 )
34
35 const (
36 maxNoLeaderCnt = 3
37 warnUnaryRequestLatency = 300 * time.Millisecond
38 snapshotMethod = "/etcdserverpb.Maintenance/Snapshot"
39 )
40
41 type streamsMap struct {
42 mu sync.Mutex
43 streams map[grpc.ServerStream]struct{}
44 }
45
46 func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
47 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
48 if !api.IsCapabilityEnabled(api.V3rpcCapability) {
49 return nil, rpctypes.ErrGRPCNotCapable
50 }
51
52 if s.IsMemberExist(s.ID()) && s.IsLearner() && !isRPCSupportedForLearner(req) {
53 return nil, rpctypes.ErrGPRCNotSupportedForLearner
54 }
55
56 md, ok := metadata.FromIncomingContext(ctx)
57 if ok {
58 ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
59 if len(vs) > 0 {
60 ver = vs[0]
61 }
62 clientRequests.WithLabelValues("unary", ver).Inc()
63
64 if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
65 if s.Leader() == types.ID(raft.None) {
66 return nil, rpctypes.ErrGRPCNoLeader
67 }
68 }
69 }
70
71 return handler(ctx, req)
72 }
73 }
74
75 func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
76 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
77 startTime := time.Now()
78 resp, err := handler(ctx, req)
79 lg := s.Logger()
80 if lg != nil {
81 defer logUnaryRequestStats(ctx, lg, info, startTime, req, resp)
82 }
83 return resp, err
84 }
85 }
86
87 func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
88 duration := time.Since(startTime)
89 var enabledDebugLevel, expensiveRequest bool
90 if lg.Core().Enabled(zap.DebugLevel) {
91 enabledDebugLevel = true
92 }
93 if duration > warnUnaryRequestLatency {
94 expensiveRequest = true
95 }
96 if !enabledDebugLevel && !expensiveRequest {
97 return
98 }
99 remote := "No remote client info."
100 peerInfo, ok := peer.FromContext(ctx)
101 if ok {
102 remote = peerInfo.Addr.String()
103 }
104 responseType := info.FullMethod
105 var reqCount, respCount int64
106 var reqSize, respSize int
107 var reqContent string
108 switch _resp := resp.(type) {
109 case *pb.RangeResponse:
110 _req, ok := req.(*pb.RangeRequest)
111 if ok {
112 reqCount = 0
113 reqSize = _req.Size()
114 reqContent = _req.String()
115 }
116 if _resp != nil {
117 respCount = _resp.GetCount()
118 respSize = _resp.Size()
119 }
120 case *pb.PutResponse:
121 _req, ok := req.(*pb.PutRequest)
122 if ok {
123 reqCount = 1
124 reqSize = _req.Size()
125 reqContent = pb.NewLoggablePutRequest(_req).String()
126
127 }
128 if _resp != nil {
129 respCount = 0
130 respSize = _resp.Size()
131 }
132 case *pb.DeleteRangeResponse:
133 _req, ok := req.(*pb.DeleteRangeRequest)
134 if ok {
135 reqCount = 0
136 reqSize = _req.Size()
137 reqContent = _req.String()
138 }
139 if _resp != nil {
140 respCount = _resp.GetDeleted()
141 respSize = _resp.Size()
142 }
143 case *pb.TxnResponse:
144 _req, ok := req.(*pb.TxnRequest)
145 if ok && _resp != nil {
146 if _resp.GetSucceeded() {
147 reqCount = int64(len(_req.GetSuccess()))
148 reqSize = 0
149 for _, r := range _req.GetSuccess() {
150 reqSize += r.Size()
151 }
152 } else {
153 reqCount = int64(len(_req.GetFailure()))
154 reqSize = 0
155 for _, r := range _req.GetFailure() {
156 reqSize += r.Size()
157 }
158 }
159 reqContent = pb.NewLoggableTxnRequest(_req).String()
160
161 }
162 if _resp != nil {
163 respCount = 0
164 respSize = _resp.Size()
165 }
166 default:
167 reqCount = -1
168 reqSize = -1
169 respCount = -1
170 respSize = -1
171 }
172
173 if enabledDebugLevel {
174 logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
175 } else if expensiveRequest {
176 logExpensiveRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
177 }
178 }
179
180 func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
181 reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
182 lg.Debug("request stats",
183 zap.Time("start time", startTime),
184 zap.Duration("time spent", duration),
185 zap.String("remote", remote),
186 zap.String("response type", responseType),
187 zap.Int64("request count", reqCount),
188 zap.Int("request size", reqSize),
189 zap.Int64("response count", respCount),
190 zap.Int("response size", respSize),
191 zap.String("request content", reqContent),
192 )
193 }
194
195 func logExpensiveRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
196 reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
197 lg.Warn("request stats",
198 zap.Time("start time", startTime),
199 zap.Duration("time spent", duration),
200 zap.String("remote", remote),
201 zap.String("response type", responseType),
202 zap.Int64("request count", reqCount),
203 zap.Int("request size", reqSize),
204 zap.Int64("response count", respCount),
205 zap.Int("response size", respSize),
206 zap.String("request content", reqContent),
207 )
208 }
209
210 func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
211 smap := monitorLeader(s)
212
213 return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
214 if !api.IsCapabilityEnabled(api.V3rpcCapability) {
215 return rpctypes.ErrGRPCNotCapable
216 }
217
218 if s.IsMemberExist(s.ID()) && s.IsLearner() && info.FullMethod != snapshotMethod {
219 return rpctypes.ErrGPRCNotSupportedForLearner
220 }
221
222 md, ok := metadata.FromIncomingContext(ss.Context())
223 if ok {
224 ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
225 if len(vs) > 0 {
226 ver = vs[0]
227 }
228 clientRequests.WithLabelValues("stream", ver).Inc()
229
230 if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
231 if s.Leader() == types.ID(raft.None) {
232 return rpctypes.ErrGRPCNoLeader
233 }
234
235 ctx := newCancellableContext(ss.Context())
236 ss = serverStreamWithCtx{ctx: ctx, ServerStream: ss}
237
238 smap.mu.Lock()
239 smap.streams[ss] = struct{}{}
240 smap.mu.Unlock()
241
242 defer func() {
243 smap.mu.Lock()
244 delete(smap.streams, ss)
245 smap.mu.Unlock()
246
247 ctx.Cancel(nil)
248 }()
249 }
250 }
251
252 return handler(srv, ss)
253 }
254 }
255
256
257
258
259
260
261 type cancellableContext struct {
262 context.Context
263
264 lock sync.RWMutex
265 cancel context.CancelFunc
266 cancelReason error
267 }
268
269 func newCancellableContext(parent context.Context) *cancellableContext {
270 ctx, cancel := context.WithCancel(parent)
271 return &cancellableContext{
272 Context: ctx,
273 cancel: cancel,
274 }
275 }
276
277
278
279 func (c *cancellableContext) Cancel(reason error) {
280 c.lock.Lock()
281 c.cancelReason = reason
282 c.lock.Unlock()
283 c.cancel()
284 }
285
286
287
288 func (c *cancellableContext) Err() error {
289 c.lock.RLock()
290 defer c.lock.RUnlock()
291 if c.cancelReason != nil {
292 return c.cancelReason
293 }
294 return c.Context.Err()
295 }
296
297 type serverStreamWithCtx struct {
298 grpc.ServerStream
299
300
301 ctx *cancellableContext
302 }
303
304 func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
305
306 func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
307 smap := &streamsMap{
308 streams: make(map[grpc.ServerStream]struct{}),
309 }
310
311 s.GoAttach(func() {
312 election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
313 noLeaderCnt := 0
314
315 for {
316 select {
317 case <-s.StoppingNotify():
318 return
319 case <-time.After(election):
320 if s.Leader() == types.ID(raft.None) {
321 noLeaderCnt++
322 } else {
323 noLeaderCnt = 0
324 }
325
326
327
328
329 if noLeaderCnt >= maxNoLeaderCnt {
330 smap.mu.Lock()
331 for ss := range smap.streams {
332 if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
333 ssWithCtx.ctx.Cancel(rpctypes.ErrGRPCNoLeader)
334 <-ss.Context().Done()
335 }
336 }
337 smap.streams = make(map[grpc.ServerStream]struct{})
338 smap.mu.Unlock()
339 }
340 }
341 }
342 })
343
344 return smap
345 }
346
View as plain text