1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package membership
16
17 import (
18 "bytes"
19 "encoding/json"
20 "fmt"
21 "path"
22
23 "go.etcd.io/etcd/client/pkg/v3/types"
24 "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
25 "go.etcd.io/etcd/server/v3/mvcc/backend"
26 "go.etcd.io/etcd/server/v3/mvcc/buckets"
27
28 "github.com/coreos/go-semver/semver"
29 "go.uber.org/zap"
30 )
31
32 const (
33 attributesSuffix = "attributes"
34 raftAttributesSuffix = "raftAttributes"
35
36
37 storePrefix = "/0"
38 )
39
40 var (
41 StoreMembersPrefix = path.Join(storePrefix, "members")
42 storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
43 errMemberAlreadyExist = fmt.Errorf("member already exists")
44 errMemberNotFound = fmt.Errorf("member not found")
45 )
46
47 func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) error {
48 mkey := backendMemberKey(m.ID)
49 mvalue, err := json.Marshal(m)
50 if err != nil {
51 lg.Panic("failed to marshal member", zap.Error(err))
52 }
53
54 tx := be.BatchTx()
55 tx.LockInsideApply()
56 defer tx.Unlock()
57 if unsafeMemberExists(tx, mkey) {
58 return errMemberAlreadyExist
59 }
60 tx.UnsafePut(buckets.Members, mkey, mvalue)
61 return nil
62 }
63
64
65
66 func TrimClusterFromBackend(be backend.Backend) error {
67 tx := be.BatchTx()
68 tx.LockOutsideApply()
69 defer tx.Unlock()
70 tx.UnsafeDeleteBucket(buckets.Cluster)
71 return nil
72 }
73
74 func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error {
75 mkey := backendMemberKey(id)
76
77 tx := be.BatchTx()
78 tx.LockInsideApply()
79 defer tx.Unlock()
80 tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
81 if !unsafeMemberExists(tx, mkey) {
82 return errMemberNotFound
83 }
84 tx.UnsafeDelete(buckets.Members, mkey)
85 return nil
86 }
87
88 func unsafeMemberExists(tx backend.ReadTx, mkey []byte) bool {
89 var found bool
90 tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
91 if bytes.Equal(k, mkey) {
92 found = true
93 }
94 return nil
95 })
96 return found
97 }
98
99 func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
100 members := make(map[types.ID]*Member)
101 removed := make(map[types.ID]bool)
102
103 tx := be.ReadTx()
104 tx.RLock()
105 defer tx.RUnlock()
106 err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
107 memberId := mustParseMemberIDFromBytes(lg, k)
108 m := &Member{ID: memberId}
109 if err := json.Unmarshal(v, &m); err != nil {
110 return err
111 }
112 members[memberId] = m
113 return nil
114 })
115 if err != nil {
116 return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err)
117 }
118
119 err = tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
120 memberId := mustParseMemberIDFromBytes(lg, k)
121 removed[memberId] = true
122 return nil
123 })
124 if err != nil {
125 return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err)
126 }
127 return members, removed, nil
128 }
129
130 func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) {
131 members, removed, err := readMembersFromBackend(lg, be)
132 if err != nil {
133 lg.Panic("couldn't read members from backend", zap.Error(err))
134 }
135 return members, removed
136 }
137
138
139
140 func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
141 lg.Info("Trimming membership information from the backend...")
142 tx := be.BatchTx()
143 tx.LockOutsideApply()
144 defer tx.Unlock()
145 err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
146 tx.UnsafeDelete(buckets.Members, k)
147 lg.Debug("Removed member from the backend",
148 zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
149 return nil
150 })
151 if err != nil {
152 return err
153 }
154 return tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
155 tx.UnsafeDelete(buckets.MembersRemoved, k)
156 lg.Debug("Removed removed_member from the backend",
157 zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
158 return nil
159 })
160 }
161
162
163
164 func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
165 members, removed := membersFromStore(lg, s)
166
167 for mID := range members {
168 _, err := s.Delete(MemberStoreKey(mID), true, true)
169 if err != nil {
170 return err
171 }
172 }
173 for mID := range removed {
174 _, err := s.Delete(RemovedMemberStoreKey(mID), true, true)
175 if err != nil {
176 return err
177 }
178 }
179
180 return nil
181 }
182
183
184 func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
185 ckey := backendClusterVersionKey()
186
187 tx := be.BatchTx()
188 tx.LockInsideApply()
189 defer tx.Unlock()
190 tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
191 }
192
193
194 func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) {
195 dkey := backendDowngradeKey()
196 dvalue, err := json.Marshal(downgrade)
197 if err != nil {
198 lg.Panic("failed to marshal downgrade information", zap.Error(err))
199 }
200 tx := be.BatchTx()
201 tx.LockInsideApply()
202 defer tx.Unlock()
203 tx.UnsafePut(buckets.Cluster, dkey, dvalue)
204 }
205
206 func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
207 err := unsafeSaveMemberToStore(lg, s, m)
208 if err != nil {
209 lg.Panic(
210 "failed to save member to store",
211 zap.String("member-id", m.ID.String()),
212 zap.Error(err),
213 )
214 }
215 }
216
217 func unsafeSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) error {
218 b, err := json.Marshal(m.RaftAttributes)
219 if err != nil {
220 lg.Panic("failed to marshal raftAttributes", zap.Error(err))
221 }
222 p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
223 _, err = s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
224 return err
225 }
226
227 func unsafeDeleteMemberFromStore(s v2store.Store, id types.ID) error {
228 if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
229 return err
230 }
231 if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
232 return err
233 }
234 return nil
235 }
236
237 func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {
238 b, err := json.Marshal(m.RaftAttributes)
239 if err != nil {
240 lg.Panic("failed to marshal raftAttributes", zap.Error(err))
241 }
242 p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
243 if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
244 lg.Panic(
245 "failed to update raftAttributes",
246 zap.String("path", p),
247 zap.Error(err),
248 )
249 }
250 }
251
252 func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) {
253 b, err := json.Marshal(m.Attributes)
254 if err != nil {
255 lg.Panic("failed to marshal attributes", zap.Error(err))
256 }
257 p := path.Join(MemberStoreKey(m.ID), attributesSuffix)
258 if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
259 lg.Panic(
260 "failed to update attributes",
261 zap.String("path", p),
262 zap.Error(err),
263 )
264 }
265 }
266
267 func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) {
268 if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
269 lg.Panic(
270 "failed to save cluster version to store",
271 zap.String("path", StoreClusterVersionKey()),
272 zap.Error(err),
273 )
274 }
275 }
276
277
278
279 func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) {
280 m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)}
281 attrs := make(map[string][]byte)
282 raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
283 attrKey := path.Join(n.Key, attributesSuffix)
284 for _, nn := range n.Nodes {
285 if nn.Key != raftAttrKey && nn.Key != attrKey {
286 return nil, fmt.Errorf("unknown key %q", nn.Key)
287 }
288 attrs[nn.Key] = []byte(*nn.Value)
289 }
290 if data := attrs[raftAttrKey]; data != nil {
291 if err := json.Unmarshal(data, &m.RaftAttributes); err != nil {
292 return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err)
293 }
294 } else {
295 return nil, fmt.Errorf("raftAttributes key doesn't exist")
296 }
297 if data := attrs[attrKey]; data != nil {
298 if err := json.Unmarshal(data, &m.Attributes); err != nil {
299 return m, fmt.Errorf("unmarshal attributes error: %v", err)
300 }
301 }
302 return m, nil
303 }
304
305 func backendMemberKey(id types.ID) []byte {
306 return []byte(id.String())
307 }
308
309 func backendClusterVersionKey() []byte {
310 return []byte("clusterVersion")
311 }
312
313 func backendDowngradeKey() []byte {
314 return []byte("downgrade")
315 }
316
317 func mustCreateBackendBuckets(be backend.Backend) {
318 tx := be.BatchTx()
319 tx.LockOutsideApply()
320 defer tx.Unlock()
321 tx.UnsafeCreateBucket(buckets.Members)
322 tx.UnsafeCreateBucket(buckets.MembersRemoved)
323 tx.UnsafeCreateBucket(buckets.Cluster)
324 }
325
326 func MemberStoreKey(id types.ID) string {
327 return path.Join(StoreMembersPrefix, id.String())
328 }
329
330 func StoreClusterVersionKey() string {
331 return path.Join(storePrefix, "version")
332 }
333
334 func MemberAttributesStorePath(id types.ID) string {
335 return path.Join(MemberStoreKey(id), attributesSuffix)
336 }
337
338 func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
339 id, err := types.IDFromString(string(key))
340 if err != nil {
341 lg.Panic("failed to parse member id from key", zap.Error(err))
342 }
343 return id
344 }
345
346 func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID {
347 id, err := types.IDFromString(path.Base(key))
348 if err != nil {
349 lg.Panic("failed to parse member id from key", zap.Error(err))
350 }
351 return id
352 }
353
354 func RemovedMemberStoreKey(id types.ID) string {
355 return path.Join(storeRemovedMembersPrefix, id.String())
356 }
357
View as plain text