...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/membership/store.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/membership

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package membership
    16  
    17  import (
    18  	"bytes"
    19  	"encoding/json"
    20  	"fmt"
    21  	"path"
    22  
    23  	"go.etcd.io/etcd/client/pkg/v3/types"
    24  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
    25  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    26  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    27  
    28  	"github.com/coreos/go-semver/semver"
    29  	"go.uber.org/zap"
    30  )
    31  
    32  const (
    33  	attributesSuffix     = "attributes"
    34  	raftAttributesSuffix = "raftAttributes"
    35  
    36  	// the prefix for storing membership related information in store provided by store pkg.
    37  	storePrefix = "/0"
    38  )
    39  
    40  var (
    41  	StoreMembersPrefix        = path.Join(storePrefix, "members")
    42  	storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
    43  	errMemberAlreadyExist     = fmt.Errorf("member already exists")
    44  	errMemberNotFound         = fmt.Errorf("member not found")
    45  )
    46  
    47  func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) error {
    48  	mkey := backendMemberKey(m.ID)
    49  	mvalue, err := json.Marshal(m)
    50  	if err != nil {
    51  		lg.Panic("failed to marshal member", zap.Error(err))
    52  	}
    53  
    54  	tx := be.BatchTx()
    55  	tx.LockInsideApply()
    56  	defer tx.Unlock()
    57  	if unsafeMemberExists(tx, mkey) {
    58  		return errMemberAlreadyExist
    59  	}
    60  	tx.UnsafePut(buckets.Members, mkey, mvalue)
    61  	return nil
    62  }
    63  
    64  // TrimClusterFromBackend removes all information about cluster (versions)
    65  // from the v3 backend.
    66  func TrimClusterFromBackend(be backend.Backend) error {
    67  	tx := be.BatchTx()
    68  	tx.LockOutsideApply()
    69  	defer tx.Unlock()
    70  	tx.UnsafeDeleteBucket(buckets.Cluster)
    71  	return nil
    72  }
    73  
    74  func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error {
    75  	mkey := backendMemberKey(id)
    76  
    77  	tx := be.BatchTx()
    78  	tx.LockInsideApply()
    79  	defer tx.Unlock()
    80  	tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
    81  	if !unsafeMemberExists(tx, mkey) {
    82  		return errMemberNotFound
    83  	}
    84  	tx.UnsafeDelete(buckets.Members, mkey)
    85  	return nil
    86  }
    87  
    88  func unsafeMemberExists(tx backend.ReadTx, mkey []byte) bool {
    89  	var found bool
    90  	tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
    91  		if bytes.Equal(k, mkey) {
    92  			found = true
    93  		}
    94  		return nil
    95  	})
    96  	return found
    97  }
    98  
    99  func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
   100  	members := make(map[types.ID]*Member)
   101  	removed := make(map[types.ID]bool)
   102  
   103  	tx := be.ReadTx()
   104  	tx.RLock()
   105  	defer tx.RUnlock()
   106  	err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
   107  		memberId := mustParseMemberIDFromBytes(lg, k)
   108  		m := &Member{ID: memberId}
   109  		if err := json.Unmarshal(v, &m); err != nil {
   110  			return err
   111  		}
   112  		members[memberId] = m
   113  		return nil
   114  	})
   115  	if err != nil {
   116  		return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err)
   117  	}
   118  
   119  	err = tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
   120  		memberId := mustParseMemberIDFromBytes(lg, k)
   121  		removed[memberId] = true
   122  		return nil
   123  	})
   124  	if err != nil {
   125  		return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err)
   126  	}
   127  	return members, removed, nil
   128  }
   129  
   130  func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) {
   131  	members, removed, err := readMembersFromBackend(lg, be)
   132  	if err != nil {
   133  		lg.Panic("couldn't read members from backend", zap.Error(err))
   134  	}
   135  	return members, removed
   136  }
   137  
   138  // TrimMembershipFromBackend removes all information about members &
   139  // removed_members from the v3 backend.
   140  func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
   141  	lg.Info("Trimming membership information from the backend...")
   142  	tx := be.BatchTx()
   143  	tx.LockOutsideApply()
   144  	defer tx.Unlock()
   145  	err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
   146  		tx.UnsafeDelete(buckets.Members, k)
   147  		lg.Debug("Removed member from the backend",
   148  			zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
   149  		return nil
   150  	})
   151  	if err != nil {
   152  		return err
   153  	}
   154  	return tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
   155  		tx.UnsafeDelete(buckets.MembersRemoved, k)
   156  		lg.Debug("Removed removed_member from the backend",
   157  			zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
   158  		return nil
   159  	})
   160  }
   161  
   162  // TrimMembershipFromV2Store removes all information about members &
   163  // removed_members from the v2 store.
   164  func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
   165  	members, removed := membersFromStore(lg, s)
   166  
   167  	for mID := range members {
   168  		_, err := s.Delete(MemberStoreKey(mID), true, true)
   169  		if err != nil {
   170  			return err
   171  		}
   172  	}
   173  	for mID := range removed {
   174  		_, err := s.Delete(RemovedMemberStoreKey(mID), true, true)
   175  		if err != nil {
   176  			return err
   177  		}
   178  	}
   179  
   180  	return nil
   181  }
   182  
   183  // The field is populated since etcd v3.5.
   184  func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
   185  	ckey := backendClusterVersionKey()
   186  
   187  	tx := be.BatchTx()
   188  	tx.LockInsideApply()
   189  	defer tx.Unlock()
   190  	tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
   191  }
   192  
   193  // The field is populated since etcd v3.5.
   194  func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) {
   195  	dkey := backendDowngradeKey()
   196  	dvalue, err := json.Marshal(downgrade)
   197  	if err != nil {
   198  		lg.Panic("failed to marshal downgrade information", zap.Error(err))
   199  	}
   200  	tx := be.BatchTx()
   201  	tx.LockInsideApply()
   202  	defer tx.Unlock()
   203  	tx.UnsafePut(buckets.Cluster, dkey, dvalue)
   204  }
   205  
   206  func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
   207  	err := unsafeSaveMemberToStore(lg, s, m)
   208  	if err != nil {
   209  		lg.Panic(
   210  			"failed to save member to store",
   211  			zap.String("member-id", m.ID.String()),
   212  			zap.Error(err),
   213  		)
   214  	}
   215  }
   216  
   217  func unsafeSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) error {
   218  	b, err := json.Marshal(m.RaftAttributes)
   219  	if err != nil {
   220  		lg.Panic("failed to marshal raftAttributes", zap.Error(err))
   221  	}
   222  	p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
   223  	_, err = s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
   224  	return err
   225  }
   226  
   227  func unsafeDeleteMemberFromStore(s v2store.Store, id types.ID) error {
   228  	if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
   229  		return err
   230  	}
   231  	if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
   232  		return err
   233  	}
   234  	return nil
   235  }
   236  
   237  func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {
   238  	b, err := json.Marshal(m.RaftAttributes)
   239  	if err != nil {
   240  		lg.Panic("failed to marshal raftAttributes", zap.Error(err))
   241  	}
   242  	p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
   243  	if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
   244  		lg.Panic(
   245  			"failed to update raftAttributes",
   246  			zap.String("path", p),
   247  			zap.Error(err),
   248  		)
   249  	}
   250  }
   251  
   252  func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) {
   253  	b, err := json.Marshal(m.Attributes)
   254  	if err != nil {
   255  		lg.Panic("failed to marshal attributes", zap.Error(err))
   256  	}
   257  	p := path.Join(MemberStoreKey(m.ID), attributesSuffix)
   258  	if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
   259  		lg.Panic(
   260  			"failed to update attributes",
   261  			zap.String("path", p),
   262  			zap.Error(err),
   263  		)
   264  	}
   265  }
   266  
   267  func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) {
   268  	if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
   269  		lg.Panic(
   270  			"failed to save cluster version to store",
   271  			zap.String("path", StoreClusterVersionKey()),
   272  			zap.Error(err),
   273  		)
   274  	}
   275  }
   276  
   277  // nodeToMember builds member from a key value node.
   278  // the child nodes of the given node MUST be sorted by key.
   279  func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) {
   280  	m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)}
   281  	attrs := make(map[string][]byte)
   282  	raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
   283  	attrKey := path.Join(n.Key, attributesSuffix)
   284  	for _, nn := range n.Nodes {
   285  		if nn.Key != raftAttrKey && nn.Key != attrKey {
   286  			return nil, fmt.Errorf("unknown key %q", nn.Key)
   287  		}
   288  		attrs[nn.Key] = []byte(*nn.Value)
   289  	}
   290  	if data := attrs[raftAttrKey]; data != nil {
   291  		if err := json.Unmarshal(data, &m.RaftAttributes); err != nil {
   292  			return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err)
   293  		}
   294  	} else {
   295  		return nil, fmt.Errorf("raftAttributes key doesn't exist")
   296  	}
   297  	if data := attrs[attrKey]; data != nil {
   298  		if err := json.Unmarshal(data, &m.Attributes); err != nil {
   299  			return m, fmt.Errorf("unmarshal attributes error: %v", err)
   300  		}
   301  	}
   302  	return m, nil
   303  }
   304  
   305  func backendMemberKey(id types.ID) []byte {
   306  	return []byte(id.String())
   307  }
   308  
   309  func backendClusterVersionKey() []byte {
   310  	return []byte("clusterVersion")
   311  }
   312  
   313  func backendDowngradeKey() []byte {
   314  	return []byte("downgrade")
   315  }
   316  
   317  func mustCreateBackendBuckets(be backend.Backend) {
   318  	tx := be.BatchTx()
   319  	tx.LockOutsideApply()
   320  	defer tx.Unlock()
   321  	tx.UnsafeCreateBucket(buckets.Members)
   322  	tx.UnsafeCreateBucket(buckets.MembersRemoved)
   323  	tx.UnsafeCreateBucket(buckets.Cluster)
   324  }
   325  
   326  func MemberStoreKey(id types.ID) string {
   327  	return path.Join(StoreMembersPrefix, id.String())
   328  }
   329  
   330  func StoreClusterVersionKey() string {
   331  	return path.Join(storePrefix, "version")
   332  }
   333  
   334  func MemberAttributesStorePath(id types.ID) string {
   335  	return path.Join(MemberStoreKey(id), attributesSuffix)
   336  }
   337  
   338  func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
   339  	id, err := types.IDFromString(string(key))
   340  	if err != nil {
   341  		lg.Panic("failed to parse member id from key", zap.Error(err))
   342  	}
   343  	return id
   344  }
   345  
   346  func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID {
   347  	id, err := types.IDFromString(path.Base(key))
   348  	if err != nil {
   349  		lg.Panic("failed to parse member id from key", zap.Error(err))
   350  	}
   351  	return id
   352  }
   353  
   354  func RemovedMemberStoreKey(id types.ID) string {
   355  	return path.Join(storeRemovedMembersPrefix, id.String())
   356  }
   357  

View as plain text