1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdserver
16
17 import (
18 "encoding/json"
19 "fmt"
20 "path"
21 "strconv"
22 "time"
23
24 "github.com/coreos/go-semver/semver"
25 "go.etcd.io/etcd/pkg/v3/pbutil"
26 "go.etcd.io/etcd/server/v3/etcdserver/api"
27 "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
28 "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
29
30 "go.uber.org/zap"
31 )
32
33 const v2Version = "v2"
34
35
36 type ApplierV2 interface {
37 Delete(r *RequestV2) Response
38 Post(r *RequestV2) Response
39 Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
40 QGet(r *RequestV2) Response
41 Sync(r *RequestV2) Response
42 }
43
44 func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 {
45 if lg == nil {
46 lg = zap.NewNop()
47 }
48 return &applierV2store{lg: lg, store: s, cluster: c}
49 }
50
51 type applierV2store struct {
52 lg *zap.Logger
53 store v2store.Store
54 cluster *membership.RaftCluster
55 }
56
57 func (a *applierV2store) Delete(r *RequestV2) Response {
58 switch {
59 case r.PrevIndex > 0 || r.PrevValue != "":
60 return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
61 default:
62 return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
63 }
64 }
65
66 func (a *applierV2store) Post(r *RequestV2) Response {
67 return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
68 }
69
70 func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
71 ttlOptions := r.TTLOptions()
72 exists, existsSet := pbutil.GetBool(r.PrevExist)
73 switch {
74 case existsSet:
75 if exists {
76 if r.PrevIndex == 0 && r.PrevValue == "" {
77 return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
78 }
79 return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
80 }
81 return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
82 case r.PrevIndex > 0 || r.PrevValue != "":
83 return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
84 default:
85 if storeMemberAttributeRegexp.MatchString(r.Path) {
86 id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path))
87 var attr membership.Attributes
88 if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
89 a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
90 }
91 if a.cluster != nil {
92 a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
93 }
94
95 return Response{}
96 }
97
98 if r.Path == membership.StoreClusterVersionKey() {
99 if a.cluster != nil {
100
101 a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
102 }
103 return Response{}
104 }
105 return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
106 }
107 }
108
109 func (a *applierV2store) QGet(r *RequestV2) Response {
110 return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
111 }
112
113 func (a *applierV2store) Sync(r *RequestV2) Response {
114 a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
115 return Response{}
116 }
117
118
119
120 func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
121 stringer := panicAlternativeStringer{
122 stringer: r,
123 alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
124 }
125 defer func(start time.Time) {
126 success := resp.Err == nil
127 applySec.WithLabelValues(v2Version, r.Method, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
128 warnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
129 }(time.Now())
130
131 switch r.Method {
132 case "POST":
133 return s.applyV2.Post(r)
134 case "PUT":
135 return s.applyV2.Put(r, shouldApplyV3)
136 case "DELETE":
137 return s.applyV2.Delete(r)
138 case "QGET":
139 return s.applyV2.QGet(r)
140 case "SYNC":
141 return s.applyV2.Sync(r)
142 default:
143
144 return Response{Err: ErrUnknownMethod}
145 }
146 }
147
148 func (r *RequestV2) TTLOptions() v2store.TTLOptionSet {
149 refresh, _ := pbutil.GetBool(r.Refresh)
150 ttlOptions := v2store.TTLOptionSet{Refresh: refresh}
151 if r.Expiration != 0 {
152 ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
153 }
154 return ttlOptions
155 }
156
157 func toResponse(ev *v2store.Event, err error) Response {
158 return Response{Event: ev, Err: err}
159 }
160
View as plain text