1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdserver
16
17 import (
18 "bytes"
19 "context"
20 "encoding/base64"
21 "encoding/binary"
22 "strconv"
23 "time"
24
25 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
26 "go.etcd.io/etcd/api/v3/membershippb"
27 "go.etcd.io/etcd/pkg/v3/traceutil"
28 "go.etcd.io/etcd/raft/v3"
29 "go.etcd.io/etcd/server/v3/auth"
30 "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
31 "go.etcd.io/etcd/server/v3/lease"
32 "go.etcd.io/etcd/server/v3/lease/leasehttp"
33 "go.etcd.io/etcd/server/v3/mvcc"
34
35 "github.com/gogo/protobuf/proto"
36 "go.uber.org/zap"
37 "golang.org/x/crypto/bcrypt"
38 )
39
40 const (
41
42
43
44
45 maxGapBetweenApplyAndCommitIndex = 5000
46 traceThreshold = 100 * time.Millisecond
47 readIndexRetryTime = 500 * time.Millisecond
48
49
50
51 applyTimeout = time.Second
52 )
53
54 type RaftKV interface {
55 Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error)
56 Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error)
57 DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
58 Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error)
59 Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error)
60 }
61
62 type Lessor interface {
63
64 LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
65
66 LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
67
68
69
70 LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error)
71
72
73 LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error)
74
75
76 LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error)
77 }
78
79 type Authenticator interface {
80 AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error)
81 AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error)
82 AuthStatus(ctx context.Context, r *pb.AuthStatusRequest) (*pb.AuthStatusResponse, error)
83 Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error)
84 UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
85 UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
86 UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
87 UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
88 UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
89 UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
90 RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
91 RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
92 RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
93 RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
94 RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
95 UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
96 RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
97 }
98
99 func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
100 trace := traceutil.New("range",
101 s.Logger(),
102 traceutil.Field{Key: "range_begin", Value: string(r.Key)},
103 traceutil.Field{Key: "range_end", Value: string(r.RangeEnd)},
104 )
105 ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
106
107 var resp *pb.RangeResponse
108 var err error
109 defer func(start time.Time) {
110 warnOfExpensiveReadOnlyRangeRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
111 if resp != nil {
112 trace.AddField(
113 traceutil.Field{Key: "response_count", Value: len(resp.Kvs)},
114 traceutil.Field{Key: "response_revision", Value: resp.Header.Revision},
115 )
116 }
117 trace.LogIfLong(traceThreshold)
118 }(time.Now())
119
120 if !r.Serializable {
121 err = s.linearizableReadNotify(ctx)
122 trace.Step("agreement among raft nodes before linearized reading")
123 if err != nil {
124 return nil, err
125 }
126 }
127 chk := func(ai *auth.AuthInfo) error {
128 return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
129 }
130
131 get := func() { resp, err = s.applyV3Base.Range(ctx, nil, r) }
132 if serr := s.doSerialize(ctx, chk, get); serr != nil {
133 err = serr
134 return nil, err
135 }
136 return resp, err
137 }
138
139 func (s *EtcdServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
140 ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
141 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Put: r})
142 if err != nil {
143 return nil, err
144 }
145 return resp.(*pb.PutResponse), nil
146 }
147
148 func (s *EtcdServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
149 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DeleteRange: r})
150 if err != nil {
151 return nil, err
152 }
153 return resp.(*pb.DeleteRangeResponse), nil
154 }
155
156 func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
157 if isTxnReadonly(r) {
158 trace := traceutil.New("transaction",
159 s.Logger(),
160 traceutil.Field{Key: "read_only", Value: true},
161 )
162 ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
163 if !isTxnSerializable(r) {
164 err := s.linearizableReadNotify(ctx)
165 trace.Step("agreement among raft nodes before linearized reading")
166 if err != nil {
167 return nil, err
168 }
169 }
170 var resp *pb.TxnResponse
171 var err error
172 chk := func(ai *auth.AuthInfo) error {
173 return checkTxnAuth(s.authStore, ai, r)
174 }
175
176 defer func(start time.Time) {
177 warnOfExpensiveReadOnlyTxnRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, r, resp, err)
178 trace.LogIfLong(traceThreshold)
179 }(time.Now())
180
181 get := func() { resp, _, err = s.applyV3Base.Txn(ctx, r) }
182 if serr := s.doSerialize(ctx, chk, get); serr != nil {
183 return nil, serr
184 }
185 return resp, err
186 }
187
188 ctx = context.WithValue(ctx, traceutil.StartTimeKey, time.Now())
189 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{Txn: r})
190 if err != nil {
191 return nil, err
192 }
193 return resp.(*pb.TxnResponse), nil
194 }
195
196 func isTxnSerializable(r *pb.TxnRequest) bool {
197 for _, u := range r.Success {
198 if r := u.GetRequestRange(); r == nil || !r.Serializable {
199 return false
200 }
201 }
202 for _, u := range r.Failure {
203 if r := u.GetRequestRange(); r == nil || !r.Serializable {
204 return false
205 }
206 }
207 return true
208 }
209
210 func isTxnReadonly(r *pb.TxnRequest) bool {
211 for _, u := range r.Success {
212 if r := u.GetRequestRange(); r == nil {
213 return false
214 }
215 }
216 for _, u := range r.Failure {
217 if r := u.GetRequestRange(); r == nil {
218 return false
219 }
220 }
221 return true
222 }
223
224 func (s *EtcdServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
225 startTime := time.Now()
226 result, err := s.processInternalRaftRequestOnce(ctx, pb.InternalRaftRequest{Compaction: r})
227 trace := traceutil.TODO()
228 if result != nil && result.trace != nil {
229 trace = result.trace
230 defer func() {
231 trace.LogIfLong(traceThreshold)
232 }()
233 applyStart := result.trace.GetStartTime()
234 result.trace.SetStartTime(startTime)
235 trace.InsertStep(0, applyStart, "process raft request")
236 }
237 if r.Physical && result != nil && result.physc != nil {
238 <-result.physc
239
240
241
242
243
244 s.be.ForceCommit()
245 trace.Step("physically apply compaction")
246 }
247 if err != nil {
248 return nil, err
249 }
250 if result.err != nil {
251 return nil, result.err
252 }
253 resp := result.resp.(*pb.CompactionResponse)
254 if resp == nil {
255 resp = &pb.CompactionResponse{}
256 }
257 if resp.Header == nil {
258 resp.Header = &pb.ResponseHeader{}
259 }
260 resp.Header.Revision = s.kv.Rev()
261 trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
262 return resp, nil
263 }
264
265 func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
266
267 for r.ID == int64(lease.NoLease) {
268
269 r.ID = int64(s.reqIDGen.Next() & ((1 << 63) - 1))
270 }
271 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseGrant: r})
272 if err != nil {
273 return nil, err
274 }
275 return resp.(*pb.LeaseGrantResponse), nil
276 }
277
278 func (s *EtcdServer) waitAppliedIndex() error {
279 select {
280 case <-s.ApplyWait():
281 case <-s.stopping:
282 return ErrStopped
283 case <-time.After(applyTimeout):
284 return ErrTimeoutWaitAppliedIndex
285 }
286
287 return nil
288 }
289
290 func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
291 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
292 if err != nil {
293 return nil, err
294 }
295 return resp.(*pb.LeaseRevokeResponse), nil
296 }
297
298 func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
299 if s.isLeader() {
300
301
302
303
304
305
306
307 if !s.ensureLeadership() {
308 return -1, lease.ErrNotPrimary
309 }
310
311 if err := s.waitAppliedIndex(); err != nil {
312 return 0, err
313 }
314
315 ttl, err := s.lessor.Renew(id)
316 if err == nil {
317 return ttl, nil
318 }
319 if err != lease.ErrNotPrimary {
320 return -1, err
321 }
322 }
323
324 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
325 defer cancel()
326
327
328 for cctx.Err() == nil {
329 leader, lerr := s.waitLeader(cctx)
330 if lerr != nil {
331 return -1, lerr
332 }
333 for _, url := range leader.PeerURLs {
334 lurl := url + leasehttp.LeasePrefix
335 ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
336 if err == nil || err == lease.ErrLeaseNotFound {
337 return ttl, err
338 }
339 }
340
341 time.Sleep(50 * time.Millisecond)
342 }
343
344 if cctx.Err() == context.DeadlineExceeded {
345 return -1, ErrTimeout
346 }
347 return -1, ErrCanceled
348 }
349
350 func (s *EtcdServer) checkLeaseTimeToLive(ctx context.Context, leaseID lease.LeaseID) (uint64, error) {
351 rev := s.AuthStore().Revision()
352 if !s.AuthStore().IsAuthEnabled() {
353 return rev, nil
354 }
355 authInfo, err := s.AuthInfoFromCtx(ctx)
356 if err != nil {
357 return rev, err
358 }
359 if authInfo == nil {
360 return rev, auth.ErrUserEmpty
361 }
362
363 l := s.lessor.Lookup(leaseID)
364 if l != nil {
365 for _, key := range l.Keys() {
366 if err := s.AuthStore().IsRangePermitted(authInfo, []byte(key), []byte{}); err != nil {
367 return 0, err
368 }
369 }
370 }
371
372 return rev, nil
373 }
374
375 func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
376 if s.isLeader() {
377 if err := s.waitAppliedIndex(); err != nil {
378 return nil, err
379 }
380
381 le := s.lessor.Lookup(lease.LeaseID(r.ID))
382 if le == nil {
383 return nil, lease.ErrLeaseNotFound
384 }
385
386 resp := &pb.LeaseTimeToLiveResponse{Header: &pb.ResponseHeader{}, ID: r.ID, TTL: int64(le.Remaining().Seconds()), GrantedTTL: le.TTL()}
387 if r.Keys {
388 ks := le.Keys()
389 kbs := make([][]byte, len(ks))
390 for i := range ks {
391 kbs[i] = []byte(ks[i])
392 }
393 resp.Keys = kbs
394 }
395 return resp, nil
396 }
397
398 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
399 defer cancel()
400
401
402 for cctx.Err() == nil {
403 leader, err := s.waitLeader(cctx)
404 if err != nil {
405 return nil, err
406 }
407 for _, url := range leader.PeerURLs {
408 lurl := url + leasehttp.LeaseInternalPrefix
409 resp, err := leasehttp.TimeToLiveHTTP(cctx, lease.LeaseID(r.ID), r.Keys, lurl, s.peerRt)
410 if err == nil {
411 return resp.LeaseTimeToLiveResponse, nil
412 }
413 if err == lease.ErrLeaseNotFound {
414 return nil, err
415 }
416 }
417 }
418
419 if cctx.Err() == context.DeadlineExceeded {
420 return nil, ErrTimeout
421 }
422 return nil, ErrCanceled
423 }
424
425 func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
426 var rev uint64
427 var err error
428 if r.Keys {
429
430 rev, err = s.checkLeaseTimeToLive(ctx, lease.LeaseID(r.ID))
431 if err != nil {
432 return nil, err
433 }
434 }
435
436 resp, err := s.leaseTimeToLive(ctx, r)
437 if err != nil {
438 return nil, err
439 }
440
441 if r.Keys {
442 if s.AuthStore().IsAuthEnabled() && rev != s.AuthStore().Revision() {
443 return nil, auth.ErrAuthOldRevision
444 }
445 }
446 return resp, nil
447 }
448
449
450 func (s *EtcdServer) LeaseLeases(ctx context.Context, r *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
451 ls := s.lessor.Leases()
452 lss := make([]*pb.LeaseStatus, len(ls))
453 for i := range ls {
454 lss[i] = &pb.LeaseStatus{ID: int64(ls[i].ID)}
455 }
456 return &pb.LeaseLeasesResponse{Header: newHeader(s), Leases: lss}, nil
457 }
458
459 func (s *EtcdServer) waitLeader(ctx context.Context) (*membership.Member, error) {
460 leader := s.cluster.Member(s.Leader())
461 for leader == nil {
462
463 dur := time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond
464 select {
465 case <-time.After(dur):
466 leader = s.cluster.Member(s.Leader())
467 case <-s.stopping:
468 return nil, ErrStopped
469 case <-ctx.Done():
470 return nil, ErrNoLeader
471 }
472 }
473 if leader == nil || len(leader.PeerURLs) == 0 {
474 return nil, ErrNoLeader
475 }
476 return leader, nil
477 }
478
479 func (s *EtcdServer) Alarm(ctx context.Context, r *pb.AlarmRequest) (*pb.AlarmResponse, error) {
480 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{Alarm: r})
481 if err != nil {
482 return nil, err
483 }
484 return resp.(*pb.AlarmResponse), nil
485 }
486
487 func (s *EtcdServer) AuthEnable(ctx context.Context, r *pb.AuthEnableRequest) (*pb.AuthEnableResponse, error) {
488 resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{AuthEnable: r})
489 if err != nil {
490 return nil, err
491 }
492 return resp.(*pb.AuthEnableResponse), nil
493 }
494
495 func (s *EtcdServer) AuthDisable(ctx context.Context, r *pb.AuthDisableRequest) (*pb.AuthDisableResponse, error) {
496 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthDisable: r})
497 if err != nil {
498 return nil, err
499 }
500 return resp.(*pb.AuthDisableResponse), nil
501 }
502
503 func (s *EtcdServer) AuthStatus(ctx context.Context, r *pb.AuthStatusRequest) (*pb.AuthStatusResponse, error) {
504 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthStatus: r})
505 if err != nil {
506 return nil, err
507 }
508 return resp.(*pb.AuthStatusResponse), nil
509 }
510
511 func (s *EtcdServer) Authenticate(ctx context.Context, r *pb.AuthenticateRequest) (*pb.AuthenticateResponse, error) {
512 if err := s.linearizableReadNotify(ctx); err != nil {
513 return nil, err
514 }
515
516 lg := s.Logger()
517
518
519 defer func() {
520 if r != nil {
521 r.Password = ""
522 }
523 }()
524
525 var resp proto.Message
526 for {
527 checkedRevision, err := s.AuthStore().CheckPassword(r.Name, r.Password)
528 if err != nil {
529 if err != auth.ErrAuthNotEnabled {
530 lg.Warn(
531 "invalid authentication was requested",
532 zap.String("user", r.Name),
533 zap.Error(err),
534 )
535 }
536 return nil, err
537 }
538
539 st, err := s.AuthStore().GenTokenPrefix()
540 if err != nil {
541 return nil, err
542 }
543
544
545
546 internalReq := &pb.InternalAuthenticateRequest{
547 Name: r.Name,
548 SimpleToken: st,
549 }
550
551 resp, err = s.raftRequestOnce(ctx, pb.InternalRaftRequest{Authenticate: internalReq})
552 if err != nil {
553 return nil, err
554 }
555 if checkedRevision == s.AuthStore().Revision() {
556 break
557 }
558
559 lg.Info("revision when password checked became stale; retrying")
560 }
561
562 return resp.(*pb.AuthenticateResponse), nil
563 }
564
565 func (s *EtcdServer) UserAdd(ctx context.Context, r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
566 if r.Options == nil || !r.Options.NoPassword {
567 hashedPassword, err := bcrypt.GenerateFromPassword([]byte(r.Password), s.authStore.BcryptCost())
568 if err != nil {
569 return nil, err
570 }
571 r.HashedPassword = base64.StdEncoding.EncodeToString(hashedPassword)
572 r.Password = ""
573 }
574
575 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserAdd: r})
576 if err != nil {
577 return nil, err
578 }
579 return resp.(*pb.AuthUserAddResponse), nil
580 }
581
582 func (s *EtcdServer) UserDelete(ctx context.Context, r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
583 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserDelete: r})
584 if err != nil {
585 return nil, err
586 }
587 return resp.(*pb.AuthUserDeleteResponse), nil
588 }
589
590 func (s *EtcdServer) UserChangePassword(ctx context.Context, r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
591 if r.Password != "" {
592 hashedPassword, err := bcrypt.GenerateFromPassword([]byte(r.Password), s.authStore.BcryptCost())
593 if err != nil {
594 return nil, err
595 }
596 r.HashedPassword = base64.StdEncoding.EncodeToString(hashedPassword)
597 r.Password = ""
598 }
599
600 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserChangePassword: r})
601 if err != nil {
602 return nil, err
603 }
604 return resp.(*pb.AuthUserChangePasswordResponse), nil
605 }
606
607 func (s *EtcdServer) UserGrantRole(ctx context.Context, r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
608 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGrantRole: r})
609 if err != nil {
610 return nil, err
611 }
612 return resp.(*pb.AuthUserGrantRoleResponse), nil
613 }
614
615 func (s *EtcdServer) UserGet(ctx context.Context, r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
616 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserGet: r})
617 if err != nil {
618 return nil, err
619 }
620 return resp.(*pb.AuthUserGetResponse), nil
621 }
622
623 func (s *EtcdServer) UserList(ctx context.Context, r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
624 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserList: r})
625 if err != nil {
626 return nil, err
627 }
628 return resp.(*pb.AuthUserListResponse), nil
629 }
630
631 func (s *EtcdServer) UserRevokeRole(ctx context.Context, r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
632 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthUserRevokeRole: r})
633 if err != nil {
634 return nil, err
635 }
636 return resp.(*pb.AuthUserRevokeRoleResponse), nil
637 }
638
639 func (s *EtcdServer) RoleAdd(ctx context.Context, r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
640 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleAdd: r})
641 if err != nil {
642 return nil, err
643 }
644 return resp.(*pb.AuthRoleAddResponse), nil
645 }
646
647 func (s *EtcdServer) RoleGrantPermission(ctx context.Context, r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
648 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGrantPermission: r})
649 if err != nil {
650 return nil, err
651 }
652 return resp.(*pb.AuthRoleGrantPermissionResponse), nil
653 }
654
655 func (s *EtcdServer) RoleGet(ctx context.Context, r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
656 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleGet: r})
657 if err != nil {
658 return nil, err
659 }
660 return resp.(*pb.AuthRoleGetResponse), nil
661 }
662
663 func (s *EtcdServer) RoleList(ctx context.Context, r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
664 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleList: r})
665 if err != nil {
666 return nil, err
667 }
668 return resp.(*pb.AuthRoleListResponse), nil
669 }
670
671 func (s *EtcdServer) RoleRevokePermission(ctx context.Context, r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
672 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleRevokePermission: r})
673 if err != nil {
674 return nil, err
675 }
676 return resp.(*pb.AuthRoleRevokePermissionResponse), nil
677 }
678
679 func (s *EtcdServer) RoleDelete(ctx context.Context, r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
680 resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{AuthRoleDelete: r})
681 if err != nil {
682 return nil, err
683 }
684 return resp.(*pb.AuthRoleDeleteResponse), nil
685 }
686
687 func (s *EtcdServer) raftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
688 result, err := s.processInternalRaftRequestOnce(ctx, r)
689 if err != nil {
690 return nil, err
691 }
692 if result.err != nil {
693 return nil, result.err
694 }
695 if startTime, ok := ctx.Value(traceutil.StartTimeKey).(time.Time); ok && result.trace != nil {
696 applyStart := result.trace.GetStartTime()
697
698
699
700 result.trace.SetStartTime(startTime)
701 result.trace.InsertStep(0, applyStart, "process raft request")
702 result.trace.LogIfLong(traceThreshold)
703 }
704 return result.resp, nil
705 }
706
707 func (s *EtcdServer) raftRequest(ctx context.Context, r pb.InternalRaftRequest) (proto.Message, error) {
708 return s.raftRequestOnce(ctx, r)
709 }
710
711
712 func (s *EtcdServer) doSerialize(ctx context.Context, chk func(*auth.AuthInfo) error, get func()) error {
713 trace := traceutil.Get(ctx)
714 ai, err := s.AuthInfoFromCtx(ctx)
715 if err != nil {
716 return err
717 }
718 if ai == nil {
719
720 ai = &auth.AuthInfo{}
721 }
722 if err = chk(ai); err != nil {
723 return err
724 }
725 trace.Step("get authentication metadata")
726
727 get()
728
729
730 if ai.Revision != 0 && ai.Revision != s.authStore.Revision() {
731 return auth.ErrAuthOldRevision
732 }
733 return nil
734 }
735
736 func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
737 ai := s.getAppliedIndex()
738 ci := s.getCommittedIndex()
739 if ci > ai+maxGapBetweenApplyAndCommitIndex {
740 return nil, ErrTooManyRequests
741 }
742
743 r.Header = &pb.RequestHeader{
744 ID: s.reqIDGen.Next(),
745 }
746
747
748 if r.Authenticate == nil {
749 authInfo, err := s.AuthInfoFromCtx(ctx)
750 if err != nil {
751 return nil, err
752 }
753 if authInfo != nil {
754 r.Header.Username = authInfo.Username
755 r.Header.AuthRevision = authInfo.Revision
756 }
757 }
758
759 data, err := r.Marshal()
760 if err != nil {
761 return nil, err
762 }
763
764 if len(data) > int(s.Cfg.MaxRequestBytes) {
765 return nil, ErrRequestTooLarge
766 }
767
768 id := r.ID
769 if id == 0 {
770 id = r.Header.ID
771 }
772 ch := s.w.Register(id)
773
774 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
775 defer cancel()
776
777 start := time.Now()
778 err = s.r.Propose(cctx, data)
779 if err != nil {
780 proposalsFailed.Inc()
781 s.w.Trigger(id, nil)
782 return nil, err
783 }
784 proposalsPending.Inc()
785 defer proposalsPending.Dec()
786
787 select {
788 case x := <-ch:
789 return x.(*applyResult), nil
790 case <-cctx.Done():
791 proposalsFailed.Inc()
792 s.w.Trigger(id, nil)
793 return nil, s.parseProposeCtxErr(cctx.Err(), start)
794 case <-s.done:
795 return nil, ErrStopped
796 }
797 }
798
799
800 func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
801
802 func (s *EtcdServer) linearizableReadLoop() {
803 for {
804 requestId := s.reqIDGen.Next()
805 leaderChangedNotifier := s.LeaderChangedNotify()
806 select {
807 case <-leaderChangedNotifier:
808 continue
809 case <-s.readwaitc:
810 case <-s.stopping:
811 return
812 }
813
814
815
816 trace := traceutil.New("linearizableReadLoop", s.Logger())
817
818 nextnr := newNotifier()
819 s.readMu.Lock()
820 nr := s.readNotifier
821 s.readNotifier = nextnr
822 s.readMu.Unlock()
823
824 confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
825 if isStopped(err) {
826 return
827 }
828 if err != nil {
829 nr.notify(err)
830 continue
831 }
832
833 trace.Step("read index received")
834
835 trace.AddField(traceutil.Field{Key: "readStateIndex", Value: confirmedIndex})
836
837 appliedIndex := s.getAppliedIndex()
838 trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(appliedIndex, 10)})
839
840 if appliedIndex < confirmedIndex {
841 select {
842 case <-s.applyWait.Wait(confirmedIndex):
843 case <-s.stopping:
844 return
845 }
846 }
847
848 nr.notify(nil)
849 trace.Step("applied index is now lower than readState.Index")
850
851 trace.LogAllStepsIfLong(traceThreshold)
852 }
853 }
854
855 func isStopped(err error) bool {
856 return err == raft.ErrStopped || err == ErrStopped
857 }
858
859 func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
860 err := s.sendReadIndex(requestId)
861 if err != nil {
862 return 0, err
863 }
864
865 lg := s.Logger()
866 errorTimer := time.NewTimer(s.Cfg.ReqTimeout())
867 defer errorTimer.Stop()
868 retryTimer := time.NewTimer(readIndexRetryTime)
869 defer retryTimer.Stop()
870
871 firstCommitInTermNotifier := s.FirstCommitInTermNotify()
872
873 for {
874 select {
875 case rs := <-s.r.readStateC:
876 requestIdBytes := uint64ToBigEndianBytes(requestId)
877 gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
878 if !gotOwnResponse {
879
880
881 responseId := uint64(0)
882 if len(rs.RequestCtx) == 8 {
883 responseId = binary.BigEndian.Uint64(rs.RequestCtx)
884 }
885 lg.Warn(
886 "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
887 zap.Uint64("sent-request-id", requestId),
888 zap.Uint64("received-request-id", responseId),
889 )
890 slowReadIndex.Inc()
891 continue
892 }
893 return rs.Index, nil
894 case <-leaderChangedNotifier:
895 readIndexFailed.Inc()
896
897 return 0, ErrLeaderChanged
898 case <-firstCommitInTermNotifier:
899 firstCommitInTermNotifier = s.FirstCommitInTermNotify()
900 lg.Info("first commit in current term: resending ReadIndex request")
901 err := s.sendReadIndex(requestId)
902 if err != nil {
903 return 0, err
904 }
905 retryTimer.Reset(readIndexRetryTime)
906 continue
907 case <-retryTimer.C:
908 lg.Warn(
909 "waiting for ReadIndex response took too long, retrying",
910 zap.Uint64("sent-request-id", requestId),
911 zap.Duration("retry-timeout", readIndexRetryTime),
912 )
913 err := s.sendReadIndex(requestId)
914 if err != nil {
915 return 0, err
916 }
917 retryTimer.Reset(readIndexRetryTime)
918 continue
919 case <-errorTimer.C:
920 lg.Warn(
921 "timed out waiting for read index response (local node might have slow network)",
922 zap.Duration("timeout", s.Cfg.ReqTimeout()),
923 )
924 slowReadIndex.Inc()
925 return 0, ErrTimeout
926 case <-s.stopping:
927 return 0, ErrStopped
928 }
929 }
930 }
931
932 func uint64ToBigEndianBytes(number uint64) []byte {
933 byteResult := make([]byte, 8)
934 binary.BigEndian.PutUint64(byteResult, number)
935 return byteResult
936 }
937
938 func (s *EtcdServer) sendReadIndex(requestIndex uint64) error {
939 ctxToSend := uint64ToBigEndianBytes(requestIndex)
940
941 cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
942 err := s.r.ReadIndex(cctx, ctxToSend)
943 cancel()
944 if err == raft.ErrStopped {
945 return err
946 }
947 if err != nil {
948 lg := s.Logger()
949 lg.Warn("failed to get read index from Raft", zap.Error(err))
950 readIndexFailed.Inc()
951 return err
952 }
953 return nil
954 }
955
956 func (s *EtcdServer) LinearizableReadNotify(ctx context.Context) error {
957 return s.linearizableReadNotify(ctx)
958 }
959
960 func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
961 s.readMu.RLock()
962 nc := s.readNotifier
963 s.readMu.RUnlock()
964
965
966 select {
967 case s.readwaitc <- struct{}{}:
968 default:
969 }
970
971
972 select {
973 case <-nc.c:
974 return nc.err
975 case <-ctx.Done():
976 return ctx.Err()
977 case <-s.done:
978 return ErrStopped
979 }
980 }
981
982 func (s *EtcdServer) AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error) {
983 authInfo, err := s.AuthStore().AuthInfoFromCtx(ctx)
984 if authInfo != nil || err != nil {
985 return authInfo, err
986 }
987 if !s.Cfg.ClientCertAuthEnabled {
988 return nil, nil
989 }
990 authInfo = s.AuthStore().AuthInfoFromTLS(ctx)
991 return authInfo, nil
992 }
993
994 func (s *EtcdServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
995 switch r.Action {
996 case pb.DowngradeRequest_VALIDATE:
997 return s.downgradeValidate(ctx, r.Version)
998 case pb.DowngradeRequest_ENABLE:
999 return s.downgradeEnable(ctx, r)
1000 case pb.DowngradeRequest_CANCEL:
1001 return s.downgradeCancel(ctx)
1002 default:
1003 return nil, ErrUnknownMethod
1004 }
1005 }
1006
1007 func (s *EtcdServer) downgradeValidate(ctx context.Context, v string) (*pb.DowngradeResponse, error) {
1008 resp := &pb.DowngradeResponse{}
1009
1010 targetVersion, err := convertToClusterVersion(v)
1011 if err != nil {
1012 return nil, err
1013 }
1014
1015
1016
1017 err = s.linearizableReadNotify(ctx)
1018 if err != nil {
1019 return nil, err
1020 }
1021
1022 cv := s.ClusterVersion()
1023 if cv == nil {
1024 return nil, ErrClusterVersionUnavailable
1025 }
1026 resp.Version = cv.String()
1027
1028 allowedTargetVersion := membership.AllowedDowngradeVersion(cv)
1029 if !targetVersion.Equal(*allowedTargetVersion) {
1030 return nil, ErrInvalidDowngradeTargetVersion
1031 }
1032
1033 downgradeInfo := s.cluster.DowngradeInfo()
1034 if downgradeInfo.Enabled {
1035
1036 return nil, ErrDowngradeInProcess
1037 }
1038 return resp, nil
1039 }
1040
1041 func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
1042
1043 v := r.Version
1044 lg := s.Logger()
1045 if resp, err := s.downgradeValidate(ctx, v); err != nil {
1046 lg.Warn("reject downgrade request", zap.Error(err))
1047 return resp, err
1048 }
1049 targetVersion, err := convertToClusterVersion(v)
1050 if err != nil {
1051 lg.Warn("reject downgrade request", zap.Error(err))
1052 return nil, err
1053 }
1054
1055 raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: true, Ver: targetVersion.String()}
1056 _, err = s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
1057 if err != nil {
1058 lg.Warn("reject downgrade request", zap.Error(err))
1059 return nil, err
1060 }
1061 resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
1062 return &resp, nil
1063 }
1064
1065 func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) {
1066
1067
1068 if err := s.linearizableReadNotify(ctx); err != nil {
1069 return nil, err
1070 }
1071
1072 downgradeInfo := s.cluster.DowngradeInfo()
1073 if !downgradeInfo.Enabled {
1074 return nil, ErrNoInflightDowngrade
1075 }
1076
1077 raftRequest := membershippb.DowngradeInfoSetRequest{Enabled: false}
1078 _, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeInfoSet: &raftRequest})
1079 if err != nil {
1080 return nil, err
1081 }
1082 resp := pb.DowngradeResponse{Version: s.ClusterVersion().String()}
1083 return &resp, nil
1084 }
1085
View as plain text