...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/cindex/cindex.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/cindex

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package cindex
    16  
    17  import (
    18  	"encoding/binary"
    19  	"sync"
    20  	"sync/atomic"
    21  
    22  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    23  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    24  )
    25  
    26  type Backend interface {
    27  	BatchTx() backend.BatchTx
    28  	ReadTx() backend.ReadTx
    29  }
    30  
    31  // ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
    32  type ConsistentIndexer interface {
    33  
    34  	// ConsistentIndex returns the consistent index of current executing entry.
    35  	ConsistentIndex() uint64
    36  
    37  	// ConsistentApplyingIndex returns the consistent applying index of current executing entry.
    38  	ConsistentApplyingIndex() (uint64, uint64)
    39  
    40  	// UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction.
    41  	UnsafeConsistentIndex() uint64
    42  
    43  	// SetConsistentIndex set the consistent index of current executing entry.
    44  	SetConsistentIndex(v uint64, term uint64)
    45  
    46  	// SetConsistentApplyingIndex set the consistent applying index of current executing entry.
    47  	SetConsistentApplyingIndex(v uint64, term uint64)
    48  
    49  	// UnsafeSave must be called holding the lock on the tx.
    50  	// It saves consistentIndex to the underlying stable storage.
    51  	UnsafeSave(tx backend.BatchTx)
    52  
    53  	// SetBackend set the available backend.BatchTx for ConsistentIndexer.
    54  	SetBackend(be Backend)
    55  }
    56  
    57  // consistentIndex implements the ConsistentIndexer interface.
    58  type consistentIndex struct {
    59  	// consistentIndex represents the offset of an entry in a consistent replica log.
    60  	// It caches the "consistent_index" key's value.
    61  	// Accessed through atomics so must be 64-bit aligned.
    62  	consistentIndex uint64
    63  	// term represents the RAFT term of committed entry in a consistent replica log.
    64  	// Accessed through atomics so must be 64-bit aligned.
    65  	// The value is being persisted in the backend since v3.5.
    66  	term uint64
    67  
    68  	// applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index
    69  	// and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be
    70  	// saved to consistentIndex and term above in the txPostLockInsideApplyHook.
    71  	applyingIndex uint64
    72  	applyingTerm  uint64
    73  
    74  	// be is used for initial read consistentIndex
    75  	be Backend
    76  	// mutex is protecting be.
    77  	mutex sync.Mutex
    78  }
    79  
    80  // NewConsistentIndex creates a new consistent index.
    81  // If `be` is nil, it must be set (SetBackend) before first access using `ConsistentIndex()`.
    82  func NewConsistentIndex(be Backend) ConsistentIndexer {
    83  	return &consistentIndex{be: be}
    84  }
    85  
    86  func (ci *consistentIndex) ConsistentIndex() uint64 {
    87  	if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
    88  		return index
    89  	}
    90  	ci.mutex.Lock()
    91  	defer ci.mutex.Unlock()
    92  
    93  	v, term := ReadConsistentIndex(ci.be.ReadTx())
    94  	ci.SetConsistentIndex(v, term)
    95  	return v
    96  }
    97  
    98  func (ci *consistentIndex) UnsafeConsistentIndex() uint64 {
    99  	if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
   100  		return index
   101  	}
   102  
   103  	v, term := unsafeReadConsistentIndex(ci.be.ReadTx())
   104  	ci.SetConsistentIndex(v, term)
   105  	return v
   106  }
   107  
   108  func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
   109  	atomic.StoreUint64(&ci.consistentIndex, v)
   110  	atomic.StoreUint64(&ci.term, term)
   111  }
   112  
   113  func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
   114  	index := atomic.LoadUint64(&ci.consistentIndex)
   115  	term := atomic.LoadUint64(&ci.term)
   116  	UnsafeUpdateConsistentIndex(tx, index, term)
   117  }
   118  
   119  func (ci *consistentIndex) SetBackend(be Backend) {
   120  	ci.mutex.Lock()
   121  	defer ci.mutex.Unlock()
   122  	ci.be = be
   123  	// After the backend is changed, the first access should re-read it.
   124  	ci.SetConsistentIndex(0, 0)
   125  }
   126  
   127  func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
   128  	return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm)
   129  }
   130  
   131  func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) {
   132  	atomic.StoreUint64(&ci.applyingIndex, v)
   133  	atomic.StoreUint64(&ci.applyingTerm, term)
   134  }
   135  
   136  func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
   137  	return &fakeConsistentIndex{index: index}
   138  }
   139  
   140  type fakeConsistentIndex struct {
   141  	index uint64
   142  	term  uint64
   143  }
   144  
   145  func (f *fakeConsistentIndex) ConsistentIndex() uint64 {
   146  	return atomic.LoadUint64(&f.index)
   147  }
   148  func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
   149  	return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term)
   150  }
   151  func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 {
   152  	return atomic.LoadUint64(&f.index)
   153  }
   154  
   155  func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
   156  	atomic.StoreUint64(&f.index, index)
   157  	atomic.StoreUint64(&f.term, term)
   158  }
   159  func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) {
   160  	atomic.StoreUint64(&f.index, index)
   161  	atomic.StoreUint64(&f.term, term)
   162  }
   163  
   164  func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
   165  func (f *fakeConsistentIndex) SetBackend(_ Backend)         {}
   166  
   167  // UnsafeCreateMetaBucket creates the `meta` bucket (if it does not exists yet).
   168  func UnsafeCreateMetaBucket(tx backend.BatchTx) {
   169  	tx.UnsafeCreateBucket(buckets.Meta)
   170  }
   171  
   172  // CreateMetaBucket creates the `meta` bucket (if it does not exists yet).
   173  func CreateMetaBucket(tx backend.BatchTx) {
   174  	tx.LockOutsideApply()
   175  	defer tx.Unlock()
   176  	tx.UnsafeCreateBucket(buckets.Meta)
   177  }
   178  
   179  // unsafeGetConsistentIndex loads consistent index & term from given transaction.
   180  // returns 0,0 if the data are not found.
   181  // Term is persisted since v3.5.
   182  func unsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
   183  	_, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaConsistentIndexKeyName, nil, 0)
   184  	if len(vs) == 0 {
   185  		return 0, 0
   186  	}
   187  	v := binary.BigEndian.Uint64(vs[0])
   188  	_, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0)
   189  	if len(ts) == 0 {
   190  		return v, 0
   191  	}
   192  	t := binary.BigEndian.Uint64(ts[0])
   193  	return v, t
   194  }
   195  
   196  // ReadConsistentIndex loads consistent index and term from given transaction.
   197  // returns 0 if the data are not found.
   198  func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
   199  	tx.Lock()
   200  	defer tx.Unlock()
   201  	return unsafeReadConsistentIndex(tx)
   202  }
   203  
   204  func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
   205  	if index == 0 {
   206  		// Never save 0 as it means that we didn't loaded the real index yet.
   207  		return
   208  	}
   209  
   210  	bs1 := make([]byte, 8)
   211  	binary.BigEndian.PutUint64(bs1, index)
   212  	// put the index into the underlying backend
   213  	// tx has been locked in TxnBegin, so there is no need to lock it again
   214  	tx.UnsafePut(buckets.Meta, buckets.MetaConsistentIndexKeyName, bs1)
   215  	if term > 0 {
   216  		bs2 := make([]byte, 8)
   217  		binary.BigEndian.PutUint64(bs2, term)
   218  		tx.UnsafePut(buckets.Meta, buckets.MetaTermKeyName, bs2)
   219  	}
   220  }
   221  
   222  func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
   223  	tx.LockOutsideApply()
   224  	defer tx.Unlock()
   225  	UnsafeUpdateConsistentIndex(tx, index, term)
   226  }
   227  

View as plain text