1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdserver
16
17 import (
18 "bytes"
19 "context"
20 "fmt"
21 "sort"
22 "strconv"
23 "time"
24
25 "github.com/coreos/go-semver/semver"
26 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
27 "go.etcd.io/etcd/api/v3/membershippb"
28 "go.etcd.io/etcd/api/v3/mvccpb"
29 "go.etcd.io/etcd/client/pkg/v3/types"
30 "go.etcd.io/etcd/pkg/v3/traceutil"
31 "go.etcd.io/etcd/server/v3/auth"
32 "go.etcd.io/etcd/server/v3/etcdserver/api"
33 "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
34 "go.etcd.io/etcd/server/v3/lease"
35 "go.etcd.io/etcd/server/v3/mvcc"
36
37 "github.com/gogo/protobuf/proto"
38 "go.uber.org/zap"
39 )
40
41 const (
42 v3Version = "v3"
43 )
44
45 type applyResult struct {
46 resp proto.Message
47 err error
48
49
50
51 physc <-chan struct{}
52 trace *traceutil.Trace
53 }
54
55
56 type applierV3Internal interface {
57 ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3)
58 ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3)
59 DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3)
60 }
61
62
63 type applierV3 interface {
64 Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult
65
66 Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
67 Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error)
68 DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error)
69 Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
70 Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)
71
72 LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
73 LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error)
74
75 LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error)
76
77 Alarm(*pb.AlarmRequest) (*pb.AlarmResponse, error)
78
79 Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error)
80
81 AuthEnable() (*pb.AuthEnableResponse, error)
82 AuthDisable() (*pb.AuthDisableResponse, error)
83 AuthStatus() (*pb.AuthStatusResponse, error)
84
85 UserAdd(ua *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error)
86 UserDelete(ua *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error)
87 UserChangePassword(ua *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error)
88 UserGrantRole(ua *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error)
89 UserGet(ua *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error)
90 UserRevokeRole(ua *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error)
91 RoleAdd(ua *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error)
92 RoleGrantPermission(ua *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error)
93 RoleGet(ua *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error)
94 RoleRevokePermission(ua *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error)
95 RoleDelete(ua *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error)
96 UserList(ua *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error)
97 RoleList(ua *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error)
98 }
99
100 type checkReqFunc func(mvcc.ReadView, *pb.RequestOp) error
101
102 type applierV3backend struct {
103 s *EtcdServer
104
105 checkPut checkReqFunc
106 checkRange checkReqFunc
107 }
108
109 func (s *EtcdServer) newApplierV3Backend() applierV3 {
110 base := &applierV3backend{s: s}
111 base.checkPut = func(rv mvcc.ReadView, req *pb.RequestOp) error {
112 return base.checkRequestPut(rv, req)
113 }
114 base.checkRange = func(rv mvcc.ReadView, req *pb.RequestOp) error {
115 return base.checkRequestRange(rv, req)
116 }
117 return base
118 }
119
120 func (s *EtcdServer) newApplierV3Internal() applierV3Internal {
121 base := &applierV3backend{s: s}
122 return base
123 }
124
125 func (s *EtcdServer) newApplierV3() applierV3 {
126 return newAuthApplierV3(
127 s.AuthStore(),
128 newQuotaApplierV3(s, s.newApplierV3Backend()),
129 s.lessor,
130 )
131 }
132
133 func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
134 op := "unknown"
135 ar := &applyResult{}
136 defer func(start time.Time) {
137 success := ar.err == nil || ar.err == mvcc.ErrCompacted
138 applySec.WithLabelValues(v3Version, op, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
139 warnOfExpensiveRequest(a.s.Logger(), a.s.Cfg.WarningApplyDuration, start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
140 if !success {
141 warnOfFailedRequest(a.s.Logger(), start, &pb.InternalRaftStringer{Request: r}, ar.resp, ar.err)
142 }
143 }(time.Now())
144
145 switch {
146 case r.ClusterVersionSet != nil:
147 op = "ClusterVersionSet"
148 a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet, shouldApplyV3)
149 return nil
150 case r.ClusterMemberAttrSet != nil:
151 op = "ClusterMemberAttrSet"
152 a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet, shouldApplyV3)
153 return nil
154 case r.DowngradeInfoSet != nil:
155 op = "DowngradeInfoSet"
156 a.s.applyV3Internal.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3)
157 return nil
158 }
159
160 if !shouldApplyV3 {
161 return nil
162 }
163
164
165 switch {
166 case r.Range != nil:
167 op = "Range"
168 ar.resp, ar.err = a.s.applyV3.Range(context.TODO(), nil, r.Range)
169 case r.Put != nil:
170 op = "Put"
171 ar.resp, ar.trace, ar.err = a.s.applyV3.Put(context.TODO(), nil, r.Put)
172 case r.DeleteRange != nil:
173 op = "DeleteRange"
174 ar.resp, ar.err = a.s.applyV3.DeleteRange(nil, r.DeleteRange)
175 case r.Txn != nil:
176 op = "Txn"
177 ar.resp, ar.trace, ar.err = a.s.applyV3.Txn(context.TODO(), r.Txn)
178 case r.Compaction != nil:
179 op = "Compaction"
180 ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
181 case r.LeaseGrant != nil:
182 op = "LeaseGrant"
183 ar.resp, ar.err = a.s.applyV3.LeaseGrant(r.LeaseGrant)
184 case r.LeaseRevoke != nil:
185 op = "LeaseRevoke"
186 ar.resp, ar.err = a.s.applyV3.LeaseRevoke(r.LeaseRevoke)
187 case r.LeaseCheckpoint != nil:
188 op = "LeaseCheckpoint"
189 ar.resp, ar.err = a.s.applyV3.LeaseCheckpoint(r.LeaseCheckpoint)
190 case r.Alarm != nil:
191 op = "Alarm"
192 ar.resp, ar.err = a.s.applyV3.Alarm(r.Alarm)
193 case r.Authenticate != nil:
194 op = "Authenticate"
195 ar.resp, ar.err = a.s.applyV3.Authenticate(r.Authenticate)
196 case r.AuthEnable != nil:
197 op = "AuthEnable"
198 ar.resp, ar.err = a.s.applyV3.AuthEnable()
199 case r.AuthDisable != nil:
200 op = "AuthDisable"
201 ar.resp, ar.err = a.s.applyV3.AuthDisable()
202 case r.AuthStatus != nil:
203 ar.resp, ar.err = a.s.applyV3.AuthStatus()
204 case r.AuthUserAdd != nil:
205 op = "AuthUserAdd"
206 ar.resp, ar.err = a.s.applyV3.UserAdd(r.AuthUserAdd)
207 case r.AuthUserDelete != nil:
208 op = "AuthUserDelete"
209 ar.resp, ar.err = a.s.applyV3.UserDelete(r.AuthUserDelete)
210 case r.AuthUserChangePassword != nil:
211 op = "AuthUserChangePassword"
212 ar.resp, ar.err = a.s.applyV3.UserChangePassword(r.AuthUserChangePassword)
213 case r.AuthUserGrantRole != nil:
214 op = "AuthUserGrantRole"
215 ar.resp, ar.err = a.s.applyV3.UserGrantRole(r.AuthUserGrantRole)
216 case r.AuthUserGet != nil:
217 op = "AuthUserGet"
218 ar.resp, ar.err = a.s.applyV3.UserGet(r.AuthUserGet)
219 case r.AuthUserRevokeRole != nil:
220 op = "AuthUserRevokeRole"
221 ar.resp, ar.err = a.s.applyV3.UserRevokeRole(r.AuthUserRevokeRole)
222 case r.AuthRoleAdd != nil:
223 op = "AuthRoleAdd"
224 ar.resp, ar.err = a.s.applyV3.RoleAdd(r.AuthRoleAdd)
225 case r.AuthRoleGrantPermission != nil:
226 op = "AuthRoleGrantPermission"
227 ar.resp, ar.err = a.s.applyV3.RoleGrantPermission(r.AuthRoleGrantPermission)
228 case r.AuthRoleGet != nil:
229 op = "AuthRoleGet"
230 ar.resp, ar.err = a.s.applyV3.RoleGet(r.AuthRoleGet)
231 case r.AuthRoleRevokePermission != nil:
232 op = "AuthRoleRevokePermission"
233 ar.resp, ar.err = a.s.applyV3.RoleRevokePermission(r.AuthRoleRevokePermission)
234 case r.AuthRoleDelete != nil:
235 op = "AuthRoleDelete"
236 ar.resp, ar.err = a.s.applyV3.RoleDelete(r.AuthRoleDelete)
237 case r.AuthUserList != nil:
238 op = "AuthUserList"
239 ar.resp, ar.err = a.s.applyV3.UserList(r.AuthUserList)
240 case r.AuthRoleList != nil:
241 op = "AuthRoleList"
242 ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
243 default:
244 a.s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r))
245 }
246 return ar
247 }
248
249 func (a *applierV3backend) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
250 resp = &pb.PutResponse{}
251 resp.Header = &pb.ResponseHeader{}
252 trace = traceutil.Get(ctx)
253
254 if trace.IsEmpty() {
255 trace = traceutil.New("put",
256 a.s.Logger(),
257 traceutil.Field{Key: "key", Value: string(p.Key)},
258 traceutil.Field{Key: "req_size", Value: p.Size()},
259 )
260 }
261 val, leaseID := p.Value, lease.LeaseID(p.Lease)
262 if txn == nil {
263 if leaseID != lease.NoLease {
264 if l := a.s.lessor.Lookup(leaseID); l == nil {
265 return nil, nil, lease.ErrLeaseNotFound
266 }
267 }
268 txn = a.s.KV().Write(trace)
269 defer txn.End()
270 }
271
272 var rr *mvcc.RangeResult
273 if p.IgnoreValue || p.IgnoreLease || p.PrevKv {
274 trace.StepWithFunction(func() {
275 rr, err = txn.Range(context.TODO(), p.Key, nil, mvcc.RangeOptions{})
276 }, "get previous kv pair")
277
278 if err != nil {
279 return nil, nil, err
280 }
281 }
282 if p.IgnoreValue || p.IgnoreLease {
283 if rr == nil || len(rr.KVs) == 0 {
284
285 return nil, nil, ErrKeyNotFound
286 }
287 }
288 if p.IgnoreValue {
289 val = rr.KVs[0].Value
290 }
291 if p.IgnoreLease {
292 leaseID = lease.LeaseID(rr.KVs[0].Lease)
293 }
294 if p.PrevKv {
295 if rr != nil && len(rr.KVs) != 0 {
296 resp.PrevKv = &rr.KVs[0]
297 }
298 }
299
300 resp.Header.Revision = txn.Put(p.Key, val, leaseID)
301 trace.AddField(traceutil.Field{Key: "response_revision", Value: resp.Header.Revision})
302 return resp, trace, nil
303 }
304
305 func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
306 resp := &pb.DeleteRangeResponse{}
307 resp.Header = &pb.ResponseHeader{}
308 end := mkGteRange(dr.RangeEnd)
309
310 if txn == nil {
311 txn = a.s.kv.Write(traceutil.TODO())
312 defer txn.End()
313 }
314
315 if dr.PrevKv {
316 rr, err := txn.Range(context.TODO(), dr.Key, end, mvcc.RangeOptions{})
317 if err != nil {
318 return nil, err
319 }
320 if rr != nil {
321 resp.PrevKvs = make([]*mvccpb.KeyValue, len(rr.KVs))
322 for i := range rr.KVs {
323 resp.PrevKvs[i] = &rr.KVs[i]
324 }
325 }
326 }
327
328 resp.Deleted, resp.Header.Revision = txn.DeleteRange(dr.Key, end)
329 return resp, nil
330 }
331
332 func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
333 trace := traceutil.Get(ctx)
334
335 resp := &pb.RangeResponse{}
336 resp.Header = &pb.ResponseHeader{}
337
338 if txn == nil {
339 txn = a.s.kv.Read(mvcc.ConcurrentReadTxMode, trace)
340 defer txn.End()
341 }
342
343 limit := r.Limit
344 if r.SortOrder != pb.RangeRequest_NONE ||
345 r.MinModRevision != 0 || r.MaxModRevision != 0 ||
346 r.MinCreateRevision != 0 || r.MaxCreateRevision != 0 {
347
348 limit = 0
349 }
350 if limit > 0 {
351
352 limit = limit + 1
353 }
354
355 ro := mvcc.RangeOptions{
356 Limit: limit,
357 Rev: r.Revision,
358 Count: r.CountOnly,
359 }
360
361 rr, err := txn.Range(ctx, r.Key, mkGteRange(r.RangeEnd), ro)
362 if err != nil {
363 return nil, err
364 }
365
366 if r.MaxModRevision != 0 {
367 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision > r.MaxModRevision }
368 pruneKVs(rr, f)
369 }
370 if r.MinModRevision != 0 {
371 f := func(kv *mvccpb.KeyValue) bool { return kv.ModRevision < r.MinModRevision }
372 pruneKVs(rr, f)
373 }
374 if r.MaxCreateRevision != 0 {
375 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision > r.MaxCreateRevision }
376 pruneKVs(rr, f)
377 }
378 if r.MinCreateRevision != 0 {
379 f := func(kv *mvccpb.KeyValue) bool { return kv.CreateRevision < r.MinCreateRevision }
380 pruneKVs(rr, f)
381 }
382
383 sortOrder := r.SortOrder
384 if r.SortTarget != pb.RangeRequest_KEY && sortOrder == pb.RangeRequest_NONE {
385
386
387
388 sortOrder = pb.RangeRequest_ASCEND
389 }
390 if sortOrder != pb.RangeRequest_NONE {
391 var sorter sort.Interface
392 switch {
393 case r.SortTarget == pb.RangeRequest_KEY:
394 sorter = &kvSortByKey{&kvSort{rr.KVs}}
395 case r.SortTarget == pb.RangeRequest_VERSION:
396 sorter = &kvSortByVersion{&kvSort{rr.KVs}}
397 case r.SortTarget == pb.RangeRequest_CREATE:
398 sorter = &kvSortByCreate{&kvSort{rr.KVs}}
399 case r.SortTarget == pb.RangeRequest_MOD:
400 sorter = &kvSortByMod{&kvSort{rr.KVs}}
401 case r.SortTarget == pb.RangeRequest_VALUE:
402 sorter = &kvSortByValue{&kvSort{rr.KVs}}
403 }
404 switch {
405 case sortOrder == pb.RangeRequest_ASCEND:
406 sort.Sort(sorter)
407 case sortOrder == pb.RangeRequest_DESCEND:
408 sort.Sort(sort.Reverse(sorter))
409 }
410 }
411
412 if r.Limit > 0 && len(rr.KVs) > int(r.Limit) {
413 rr.KVs = rr.KVs[:r.Limit]
414 resp.More = true
415 }
416 trace.Step("filter and sort the key-value pairs")
417 resp.Header.Revision = rr.Rev
418 resp.Count = int64(rr.Count)
419 resp.Kvs = make([]*mvccpb.KeyValue, len(rr.KVs))
420 for i := range rr.KVs {
421 if r.KeysOnly {
422 rr.KVs[i].Value = nil
423 }
424 resp.Kvs[i] = &rr.KVs[i]
425 }
426 trace.Step("assemble the response")
427 return resp, nil
428 }
429
430 func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
431 lg := a.s.Logger()
432 trace := traceutil.Get(ctx)
433 if trace.IsEmpty() {
434 trace = traceutil.New("transaction", a.s.Logger())
435 ctx = context.WithValue(ctx, traceutil.TraceKey, trace)
436 }
437 isWrite := !isTxnReadonly(rt)
438
439
440
441 var txn mvcc.TxnWrite
442 if isWrite && a.s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer {
443 txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.SharedBufReadTxMode, trace))
444 } else {
445 txn = mvcc.NewReadOnlyTxnWrite(a.s.KV().Read(mvcc.ConcurrentReadTxMode, trace))
446 }
447
448 var txnPath []bool
449 trace.StepWithFunction(
450 func() {
451 txnPath = compareToPath(txn, rt)
452 },
453 "compare",
454 )
455
456 if isWrite {
457 trace.AddField(traceutil.Field{Key: "read_only", Value: false})
458 if _, err := checkRequests(txn, rt, txnPath, a.checkPut); err != nil {
459 txn.End()
460 return nil, nil, err
461 }
462 }
463 if _, err := checkRequests(txn, rt, txnPath, a.checkRange); err != nil {
464 txn.End()
465 return nil, nil, err
466 }
467 trace.Step("check requests")
468 txnResp, _ := newTxnResp(rt, txnPath)
469
470
471
472
473
474 if isWrite {
475 txn.End()
476 txn = a.s.KV().Write(trace)
477 }
478 _, err := a.applyTxn(ctx, txn, rt, txnPath, txnResp)
479 if err != nil {
480 if isWrite {
481
482 txn.End()
483
484
485 lg.Panic("unexpected error during txn with writes", zap.Error(err))
486 } else {
487 lg.Error("unexpected error during readonly txn", zap.Error(err))
488 }
489 }
490 rev := txn.Rev()
491 if len(txn.Changes()) != 0 {
492 rev++
493 }
494 txn.End()
495
496 txnResp.Header.Revision = rev
497 trace.AddField(
498 traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)},
499 traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision},
500 )
501 return txnResp, trace, err
502 }
503
504
505 func newTxnResp(rt *pb.TxnRequest, txnPath []bool) (txnResp *pb.TxnResponse, txnCount int) {
506 reqs := rt.Success
507 if !txnPath[0] {
508 reqs = rt.Failure
509 }
510 resps := make([]*pb.ResponseOp, len(reqs))
511 txnResp = &pb.TxnResponse{
512 Responses: resps,
513 Succeeded: txnPath[0],
514 Header: &pb.ResponseHeader{},
515 }
516 for i, req := range reqs {
517 switch tv := req.Request.(type) {
518 case *pb.RequestOp_RequestRange:
519 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseRange{}}
520 case *pb.RequestOp_RequestPut:
521 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponsePut{}}
522 case *pb.RequestOp_RequestDeleteRange:
523 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseDeleteRange{}}
524 case *pb.RequestOp_RequestTxn:
525 resp, txns := newTxnResp(tv.RequestTxn, txnPath[1:])
526 resps[i] = &pb.ResponseOp{Response: &pb.ResponseOp_ResponseTxn{ResponseTxn: resp}}
527 txnPath = txnPath[1+txns:]
528 txnCount += txns + 1
529 default:
530 }
531 }
532 return txnResp, txnCount
533 }
534
535 func compareToPath(rv mvcc.ReadView, rt *pb.TxnRequest) []bool {
536 txnPath := make([]bool, 1)
537 ops := rt.Success
538 if txnPath[0] = applyCompares(rv, rt.Compare); !txnPath[0] {
539 ops = rt.Failure
540 }
541 for _, op := range ops {
542 tv, ok := op.Request.(*pb.RequestOp_RequestTxn)
543 if !ok || tv.RequestTxn == nil {
544 continue
545 }
546 txnPath = append(txnPath, compareToPath(rv, tv.RequestTxn)...)
547 }
548 return txnPath
549 }
550
551 func applyCompares(rv mvcc.ReadView, cmps []*pb.Compare) bool {
552 for _, c := range cmps {
553 if !applyCompare(rv, c) {
554 return false
555 }
556 }
557 return true
558 }
559
560
561
562 func applyCompare(rv mvcc.ReadView, c *pb.Compare) bool {
563
564
565
566
567
568 rr, err := rv.Range(context.TODO(), c.Key, mkGteRange(c.RangeEnd), mvcc.RangeOptions{})
569 if err != nil {
570 return false
571 }
572 if len(rr.KVs) == 0 {
573 if c.Target == pb.Compare_VALUE {
574
575
576 return false
577 }
578 return compareKV(c, mvccpb.KeyValue{})
579 }
580 for _, kv := range rr.KVs {
581 if !compareKV(c, kv) {
582 return false
583 }
584 }
585 return true
586 }
587
588 func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool {
589 var result int
590 rev := int64(0)
591 switch c.Target {
592 case pb.Compare_VALUE:
593 v := []byte{}
594 if tv, _ := c.TargetUnion.(*pb.Compare_Value); tv != nil {
595 v = tv.Value
596 }
597 result = bytes.Compare(ckv.Value, v)
598 case pb.Compare_CREATE:
599 if tv, _ := c.TargetUnion.(*pb.Compare_CreateRevision); tv != nil {
600 rev = tv.CreateRevision
601 }
602 result = compareInt64(ckv.CreateRevision, rev)
603 case pb.Compare_MOD:
604 if tv, _ := c.TargetUnion.(*pb.Compare_ModRevision); tv != nil {
605 rev = tv.ModRevision
606 }
607 result = compareInt64(ckv.ModRevision, rev)
608 case pb.Compare_VERSION:
609 if tv, _ := c.TargetUnion.(*pb.Compare_Version); tv != nil {
610 rev = tv.Version
611 }
612 result = compareInt64(ckv.Version, rev)
613 case pb.Compare_LEASE:
614 if tv, _ := c.TargetUnion.(*pb.Compare_Lease); tv != nil {
615 rev = tv.Lease
616 }
617 result = compareInt64(ckv.Lease, rev)
618 }
619 switch c.Result {
620 case pb.Compare_EQUAL:
621 return result == 0
622 case pb.Compare_NOT_EQUAL:
623 return result != 0
624 case pb.Compare_GREATER:
625 return result > 0
626 case pb.Compare_LESS:
627 return result < 0
628 }
629 return true
630 }
631
632 func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) {
633 trace := traceutil.Get(ctx)
634 reqs := rt.Success
635 if !txnPath[0] {
636 reqs = rt.Failure
637 }
638
639 for i, req := range reqs {
640 respi := tresp.Responses[i].Response
641 switch tv := req.Request.(type) {
642 case *pb.RequestOp_RequestRange:
643 trace.StartSubTrace(
644 traceutil.Field{Key: "req_type", Value: "range"},
645 traceutil.Field{Key: "range_begin", Value: string(tv.RequestRange.Key)},
646 traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)})
647 resp, err := a.Range(ctx, txn, tv.RequestRange)
648 if err != nil {
649 return 0, fmt.Errorf("applyTxn: failed Range: %w", err)
650 }
651 respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp
652 trace.StopSubTrace()
653 case *pb.RequestOp_RequestPut:
654 trace.StartSubTrace(
655 traceutil.Field{Key: "req_type", Value: "put"},
656 traceutil.Field{Key: "key", Value: string(tv.RequestPut.Key)},
657 traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()})
658 resp, _, err := a.Put(ctx, txn, tv.RequestPut)
659 if err != nil {
660 return 0, fmt.Errorf("applyTxn: failed Put: %w", err)
661 }
662 respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp
663 trace.StopSubTrace()
664 case *pb.RequestOp_RequestDeleteRange:
665 resp, err := a.DeleteRange(txn, tv.RequestDeleteRange)
666 if err != nil {
667 return 0, fmt.Errorf("applyTxn: failed DeleteRange: %w", err)
668 }
669 respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp
670 case *pb.RequestOp_RequestTxn:
671 resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn
672 applyTxns, err := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp)
673 if err != nil {
674
675 return 0, err
676 }
677 txns += applyTxns + 1
678 txnPath = txnPath[applyTxns+1:]
679 default:
680
681 }
682 }
683 return txns, nil
684 }
685
686 func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
687 resp := &pb.CompactionResponse{}
688 resp.Header = &pb.ResponseHeader{}
689 trace := traceutil.New("compact",
690 a.s.Logger(),
691 traceutil.Field{Key: "revision", Value: compaction.Revision},
692 )
693
694 ch, err := a.s.KV().Compact(trace, compaction.Revision)
695 if err != nil {
696 return nil, ch, nil, err
697 }
698
699 rr, _ := a.s.KV().Range(context.TODO(), []byte("compaction"), nil, mvcc.RangeOptions{})
700 resp.Header.Revision = rr.Rev
701 return resp, ch, trace, err
702 }
703
704 func (a *applierV3backend) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
705 l, err := a.s.lessor.Grant(lease.LeaseID(lc.ID), lc.TTL)
706 resp := &pb.LeaseGrantResponse{}
707 if err == nil {
708 resp.ID = int64(l.ID)
709 resp.TTL = l.TTL()
710 resp.Header = newHeader(a.s)
711 }
712 return resp, err
713 }
714
715 func (a *applierV3backend) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
716 err := a.s.lessor.Revoke(lease.LeaseID(lc.ID))
717 return &pb.LeaseRevokeResponse{Header: newHeader(a.s)}, err
718 }
719
720 func (a *applierV3backend) LeaseCheckpoint(lc *pb.LeaseCheckpointRequest) (*pb.LeaseCheckpointResponse, error) {
721 for _, c := range lc.Checkpoints {
722 err := a.s.lessor.Checkpoint(lease.LeaseID(c.ID), c.Remaining_TTL)
723 if err != nil {
724 return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, err
725 }
726 }
727 return &pb.LeaseCheckpointResponse{Header: newHeader(a.s)}, nil
728 }
729
730 func (a *applierV3backend) Alarm(ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
731 resp := &pb.AlarmResponse{}
732 oldCount := len(a.s.alarmStore.Get(ar.Alarm))
733
734 lg := a.s.Logger()
735 switch ar.Action {
736 case pb.AlarmRequest_GET:
737 resp.Alarms = a.s.alarmStore.Get(ar.Alarm)
738 case pb.AlarmRequest_ACTIVATE:
739 if ar.Alarm == pb.AlarmType_NONE {
740 break
741 }
742 m := a.s.alarmStore.Activate(types.ID(ar.MemberID), ar.Alarm)
743 if m == nil {
744 break
745 }
746 resp.Alarms = append(resp.Alarms, m)
747 activated := oldCount == 0 && len(a.s.alarmStore.Get(m.Alarm)) == 1
748 if !activated {
749 break
750 }
751
752 lg.Warn("alarm raised", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
753 switch m.Alarm {
754 case pb.AlarmType_CORRUPT:
755 a.s.applyV3 = newApplierV3Corrupt(a)
756 case pb.AlarmType_NOSPACE:
757 a.s.applyV3 = newApplierV3Capped(a)
758 default:
759 lg.Panic("unimplemented alarm activation", zap.String("alarm", fmt.Sprintf("%+v", m)))
760 }
761 case pb.AlarmRequest_DEACTIVATE:
762 m := a.s.alarmStore.Deactivate(types.ID(ar.MemberID), ar.Alarm)
763 if m == nil {
764 break
765 }
766 resp.Alarms = append(resp.Alarms, m)
767 deactivated := oldCount > 0 && len(a.s.alarmStore.Get(ar.Alarm)) == 0
768 if !deactivated {
769 break
770 }
771
772 switch m.Alarm {
773 case pb.AlarmType_NOSPACE, pb.AlarmType_CORRUPT:
774
775 lg.Warn("alarm disarmed", zap.String("alarm", m.Alarm.String()), zap.String("from", types.ID(m.MemberID).String()))
776 a.s.applyV3 = a.s.newApplierV3()
777 default:
778 lg.Warn("unimplemented alarm deactivation", zap.String("alarm", fmt.Sprintf("%+v", m)))
779 }
780 default:
781 return nil, nil
782 }
783 return resp, nil
784 }
785
786 type applierV3Capped struct {
787 applierV3
788 q backendQuota
789 }
790
791
792
793 func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }
794
795 func (a *applierV3Capped) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
796 return nil, nil, ErrNoSpace
797 }
798
799 func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
800 if a.q.Cost(r) > 0 {
801 return nil, nil, ErrNoSpace
802 }
803 return a.applierV3.Txn(ctx, r)
804 }
805
806 func (a *applierV3Capped) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
807 return nil, ErrNoSpace
808 }
809
810 func (a *applierV3backend) AuthEnable() (*pb.AuthEnableResponse, error) {
811 err := a.s.AuthStore().AuthEnable()
812 if err != nil {
813 return nil, err
814 }
815 return &pb.AuthEnableResponse{Header: newHeader(a.s)}, nil
816 }
817
818 func (a *applierV3backend) AuthDisable() (*pb.AuthDisableResponse, error) {
819 a.s.AuthStore().AuthDisable()
820 return &pb.AuthDisableResponse{Header: newHeader(a.s)}, nil
821 }
822
823 func (a *applierV3backend) AuthStatus() (*pb.AuthStatusResponse, error) {
824 enabled := a.s.AuthStore().IsAuthEnabled()
825 authRevision := a.s.AuthStore().Revision()
826 return &pb.AuthStatusResponse{Header: newHeader(a.s), Enabled: enabled, AuthRevision: authRevision}, nil
827 }
828
829 func (a *applierV3backend) Authenticate(r *pb.InternalAuthenticateRequest) (*pb.AuthenticateResponse, error) {
830 ctx := context.WithValue(context.WithValue(a.s.ctx, auth.AuthenticateParamIndex{}, a.s.consistIndex.ConsistentIndex()), auth.AuthenticateParamSimpleTokenPrefix{}, r.SimpleToken)
831 resp, err := a.s.AuthStore().Authenticate(ctx, r.Name, r.Password)
832 if resp != nil {
833 resp.Header = newHeader(a.s)
834 }
835 return resp, err
836 }
837
838 func (a *applierV3backend) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, error) {
839 resp, err := a.s.AuthStore().UserAdd(r)
840 if resp != nil {
841 resp.Header = newHeader(a.s)
842 }
843 return resp, err
844 }
845
846 func (a *applierV3backend) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDeleteResponse, error) {
847 resp, err := a.s.AuthStore().UserDelete(r)
848 if resp != nil {
849 resp.Header = newHeader(a.s)
850 }
851 return resp, err
852 }
853
854 func (a *applierV3backend) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) {
855 resp, err := a.s.AuthStore().UserChangePassword(r)
856 if resp != nil {
857 resp.Header = newHeader(a.s)
858 }
859 return resp, err
860 }
861
862 func (a *applierV3backend) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) {
863 resp, err := a.s.AuthStore().UserGrantRole(r)
864 if resp != nil {
865 resp.Header = newHeader(a.s)
866 }
867 return resp, err
868 }
869
870 func (a *applierV3backend) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) {
871 resp, err := a.s.AuthStore().UserGet(r)
872 if resp != nil {
873 resp.Header = newHeader(a.s)
874 }
875 return resp, err
876 }
877
878 func (a *applierV3backend) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUserRevokeRoleResponse, error) {
879 resp, err := a.s.AuthStore().UserRevokeRole(r)
880 if resp != nil {
881 resp.Header = newHeader(a.s)
882 }
883 return resp, err
884 }
885
886 func (a *applierV3backend) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, error) {
887 resp, err := a.s.AuthStore().RoleAdd(r)
888 if resp != nil {
889 resp.Header = newHeader(a.s)
890 }
891 return resp, err
892 }
893
894 func (a *applierV3backend) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) (*pb.AuthRoleGrantPermissionResponse, error) {
895 resp, err := a.s.AuthStore().RoleGrantPermission(r)
896 if resp != nil {
897 resp.Header = newHeader(a.s)
898 }
899 return resp, err
900 }
901
902 func (a *applierV3backend) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) {
903 resp, err := a.s.AuthStore().RoleGet(r)
904 if resp != nil {
905 resp.Header = newHeader(a.s)
906 }
907 return resp, err
908 }
909
910 func (a *applierV3backend) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) {
911 resp, err := a.s.AuthStore().RoleRevokePermission(r)
912 if resp != nil {
913 resp.Header = newHeader(a.s)
914 }
915 return resp, err
916 }
917
918 func (a *applierV3backend) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDeleteResponse, error) {
919 resp, err := a.s.AuthStore().RoleDelete(r)
920 if resp != nil {
921 resp.Header = newHeader(a.s)
922 }
923 return resp, err
924 }
925
926 func (a *applierV3backend) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) {
927 resp, err := a.s.AuthStore().UserList(r)
928 if resp != nil {
929 resp.Header = newHeader(a.s)
930 }
931 return resp, err
932 }
933
934 func (a *applierV3backend) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) {
935 resp, err := a.s.AuthStore().RoleList(r)
936 if resp != nil {
937 resp.Header = newHeader(a.s)
938 }
939 return resp, err
940 }
941
942 func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
943 a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability, shouldApplyV3)
944 }
945
946 func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
947 a.s.cluster.UpdateAttributes(
948 types.ID(r.Member_ID),
949 membership.Attributes{
950 Name: r.MemberAttributes.Name,
951 ClientURLs: r.MemberAttributes.ClientUrls,
952 },
953 shouldApplyV3,
954 )
955 }
956
957 func (a *applierV3backend) DowngradeInfoSet(r *membershippb.DowngradeInfoSetRequest, shouldApplyV3 membership.ShouldApplyV3) {
958 d := membership.DowngradeInfo{Enabled: false}
959 if r.Enabled {
960 d = membership.DowngradeInfo{Enabled: true, TargetVersion: r.Ver}
961 }
962 a.s.cluster.SetDowngradeInfo(&d, shouldApplyV3)
963 }
964
965 type quotaApplierV3 struct {
966 applierV3
967 q Quota
968 }
969
970 func newQuotaApplierV3(s *EtcdServer, app applierV3) applierV3 {
971 return "aApplierV3{app, NewBackendQuota(s, "v3-applier")}
972 }
973
974 func (a *quotaApplierV3) Put(ctx context.Context, txn mvcc.TxnWrite, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
975 ok := a.q.Available(p)
976 resp, trace, err := a.applierV3.Put(ctx, txn, p)
977 if err == nil && !ok {
978 err = ErrNoSpace
979 }
980 return resp, trace, err
981 }
982
983 func (a *quotaApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
984 ok := a.q.Available(rt)
985 resp, trace, err := a.applierV3.Txn(ctx, rt)
986 if err == nil && !ok {
987 err = ErrNoSpace
988 }
989 return resp, trace, err
990 }
991
992 func (a *quotaApplierV3) LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
993 ok := a.q.Available(lc)
994 resp, err := a.applierV3.LeaseGrant(lc)
995 if err == nil && !ok {
996 err = ErrNoSpace
997 }
998 return resp, err
999 }
1000
1001 type kvSort struct{ kvs []mvccpb.KeyValue }
1002
1003 func (s *kvSort) Swap(i, j int) {
1004 t := s.kvs[i]
1005 s.kvs[i] = s.kvs[j]
1006 s.kvs[j] = t
1007 }
1008 func (s *kvSort) Len() int { return len(s.kvs) }
1009
1010 type kvSortByKey struct{ *kvSort }
1011
1012 func (s *kvSortByKey) Less(i, j int) bool {
1013 return bytes.Compare(s.kvs[i].Key, s.kvs[j].Key) < 0
1014 }
1015
1016 type kvSortByVersion struct{ *kvSort }
1017
1018 func (s *kvSortByVersion) Less(i, j int) bool {
1019 return (s.kvs[i].Version - s.kvs[j].Version) < 0
1020 }
1021
1022 type kvSortByCreate struct{ *kvSort }
1023
1024 func (s *kvSortByCreate) Less(i, j int) bool {
1025 return (s.kvs[i].CreateRevision - s.kvs[j].CreateRevision) < 0
1026 }
1027
1028 type kvSortByMod struct{ *kvSort }
1029
1030 func (s *kvSortByMod) Less(i, j int) bool {
1031 return (s.kvs[i].ModRevision - s.kvs[j].ModRevision) < 0
1032 }
1033
1034 type kvSortByValue struct{ *kvSort }
1035
1036 func (s *kvSortByValue) Less(i, j int) bool {
1037 return bytes.Compare(s.kvs[i].Value, s.kvs[j].Value) < 0
1038 }
1039
1040 func checkRequests(rv mvcc.ReadView, rt *pb.TxnRequest, txnPath []bool, f checkReqFunc) (int, error) {
1041 txnCount := 0
1042 reqs := rt.Success
1043 if !txnPath[0] {
1044 reqs = rt.Failure
1045 }
1046 for _, req := range reqs {
1047 if tv, ok := req.Request.(*pb.RequestOp_RequestTxn); ok && tv.RequestTxn != nil {
1048 txns, err := checkRequests(rv, tv.RequestTxn, txnPath[1:], f)
1049 if err != nil {
1050 return 0, err
1051 }
1052 txnCount += txns + 1
1053 txnPath = txnPath[txns+1:]
1054 continue
1055 }
1056 if err := f(rv, req); err != nil {
1057 return 0, err
1058 }
1059 }
1060 return txnCount, nil
1061 }
1062
1063 func (a *applierV3backend) checkRequestPut(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
1064 tv, ok := reqOp.Request.(*pb.RequestOp_RequestPut)
1065 if !ok || tv.RequestPut == nil {
1066 return nil
1067 }
1068 req := tv.RequestPut
1069 if req.IgnoreValue || req.IgnoreLease {
1070
1071 rr, err := rv.Range(context.TODO(), req.Key, nil, mvcc.RangeOptions{})
1072 if err != nil {
1073 return err
1074 }
1075 if rr == nil || len(rr.KVs) == 0 {
1076 return ErrKeyNotFound
1077 }
1078 }
1079 if lease.LeaseID(req.Lease) != lease.NoLease {
1080 if l := a.s.lessor.Lookup(lease.LeaseID(req.Lease)); l == nil {
1081 return lease.ErrLeaseNotFound
1082 }
1083 }
1084 return nil
1085 }
1086
1087 func (a *applierV3backend) checkRequestRange(rv mvcc.ReadView, reqOp *pb.RequestOp) error {
1088 tv, ok := reqOp.Request.(*pb.RequestOp_RequestRange)
1089 if !ok || tv.RequestRange == nil {
1090 return nil
1091 }
1092 req := tv.RequestRange
1093 switch {
1094 case req.Revision == 0:
1095 return nil
1096 case req.Revision > rv.Rev():
1097 return mvcc.ErrFutureRev
1098 case req.Revision < rv.FirstRev():
1099 return mvcc.ErrCompacted
1100 }
1101 return nil
1102 }
1103
1104 func compareInt64(a, b int64) int {
1105 switch {
1106 case a < b:
1107 return -1
1108 case a > b:
1109 return 1
1110 default:
1111 return 0
1112 }
1113 }
1114
1115
1116
1117
1118
1119 func mkGteRange(rangeEnd []byte) []byte {
1120 if len(rangeEnd) == 1 && rangeEnd[0] == 0 {
1121 return []byte{}
1122 }
1123 return rangeEnd
1124 }
1125
1126 func noSideEffect(r *pb.InternalRaftRequest) bool {
1127 return r.Range != nil || r.AuthUserGet != nil || r.AuthRoleGet != nil || r.AuthStatus != nil
1128 }
1129
1130 func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
1131 f := func(ops []*pb.RequestOp) []*pb.RequestOp {
1132 j := 0
1133 for i := 0; i < len(ops); i++ {
1134 if _, ok := ops[i].Request.(*pb.RequestOp_RequestRange); ok {
1135 continue
1136 }
1137 ops[j] = ops[i]
1138 j++
1139 }
1140
1141 return ops[:j]
1142 }
1143
1144 txn.Success = f(txn.Success)
1145 txn.Failure = f(txn.Failure)
1146 }
1147
1148 func pruneKVs(rr *mvcc.RangeResult, isPrunable func(*mvccpb.KeyValue) bool) {
1149 j := 0
1150 for i := range rr.KVs {
1151 rr.KVs[j] = rr.KVs[i]
1152 if !isPrunable(&rr.KVs[i]) {
1153 j++
1154 }
1155 }
1156 rr.KVs = rr.KVs[:j]
1157 }
1158
1159 func newHeader(s *EtcdServer) *pb.ResponseHeader {
1160 return &pb.ResponseHeader{
1161 ClusterId: uint64(s.Cluster().ID()),
1162 MemberId: uint64(s.ID()),
1163 Revision: s.KV().Rev(),
1164 RaftTerm: s.Term(),
1165 }
1166 }
1167
View as plain text