...

Source file src/go.etcd.io/etcd/raft/v3/log.go

Documentation: go.etcd.io/etcd/raft/v3

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package raft
    16  
    17  import (
    18  	"fmt"
    19  	"log"
    20  
    21  	pb "go.etcd.io/etcd/raft/v3/raftpb"
    22  )
    23  
    24  type raftLog struct {
    25  	// storage contains all stable entries since the last snapshot.
    26  	storage Storage
    27  
    28  	// unstable contains all unstable entries and snapshot.
    29  	// they will be saved into storage.
    30  	unstable unstable
    31  
    32  	// committed is the highest log position that is known to be in
    33  	// stable storage on a quorum of nodes.
    34  	committed uint64
    35  	// applied is the highest log position that the application has
    36  	// been instructed to apply to its state machine.
    37  	// Invariant: applied <= committed
    38  	applied uint64
    39  
    40  	logger Logger
    41  
    42  	// maxNextEntsSize is the maximum number aggregate byte size of the messages
    43  	// returned from calls to nextEnts.
    44  	maxNextEntsSize uint64
    45  }
    46  
    47  // newLog returns log using the given storage and default options. It
    48  // recovers the log to the state that it just commits and applies the
    49  // latest snapshot.
    50  func newLog(storage Storage, logger Logger) *raftLog {
    51  	return newLogWithSize(storage, logger, noLimit)
    52  }
    53  
    54  // newLogWithSize returns a log using the given storage and max
    55  // message size.
    56  func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog {
    57  	if storage == nil {
    58  		log.Panic("storage must not be nil")
    59  	}
    60  	log := &raftLog{
    61  		storage:         storage,
    62  		logger:          logger,
    63  		maxNextEntsSize: maxNextEntsSize,
    64  	}
    65  	firstIndex, err := storage.FirstIndex()
    66  	if err != nil {
    67  		panic(err) // TODO(bdarnell)
    68  	}
    69  	lastIndex, err := storage.LastIndex()
    70  	if err != nil {
    71  		panic(err) // TODO(bdarnell)
    72  	}
    73  	log.unstable.offset = lastIndex + 1
    74  	log.unstable.logger = logger
    75  	// Initialize our committed and applied pointers to the time of the last compaction.
    76  	log.committed = firstIndex - 1
    77  	log.applied = firstIndex - 1
    78  
    79  	return log
    80  }
    81  
    82  func (l *raftLog) String() string {
    83  	return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
    84  }
    85  
    86  // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
    87  // it returns (last index of new entries, true).
    88  func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
    89  	if l.matchTerm(index, logTerm) {
    90  		lastnewi = index + uint64(len(ents))
    91  		ci := l.findConflict(ents)
    92  		switch {
    93  		case ci == 0:
    94  		case ci <= l.committed:
    95  			l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
    96  		default:
    97  			offset := index + 1
    98  			l.append(ents[ci-offset:]...)
    99  		}
   100  		l.commitTo(min(committed, lastnewi))
   101  		return lastnewi, true
   102  	}
   103  	return 0, false
   104  }
   105  
   106  func (l *raftLog) append(ents ...pb.Entry) uint64 {
   107  	if len(ents) == 0 {
   108  		return l.lastIndex()
   109  	}
   110  	if after := ents[0].Index - 1; after < l.committed {
   111  		l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
   112  	}
   113  	l.unstable.truncateAndAppend(ents)
   114  	return l.lastIndex()
   115  }
   116  
   117  // findConflict finds the index of the conflict.
   118  // It returns the first pair of conflicting entries between the existing
   119  // entries and the given entries, if there are any.
   120  // If there is no conflicting entries, and the existing entries contains
   121  // all the given entries, zero will be returned.
   122  // If there is no conflicting entries, but the given entries contains new
   123  // entries, the index of the first new entry will be returned.
   124  // An entry is considered to be conflicting if it has the same index but
   125  // a different term.
   126  // The index of the given entries MUST be continuously increasing.
   127  func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
   128  	for _, ne := range ents {
   129  		if !l.matchTerm(ne.Index, ne.Term) {
   130  			if ne.Index <= l.lastIndex() {
   131  				l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
   132  					ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
   133  			}
   134  			return ne.Index
   135  		}
   136  	}
   137  	return 0
   138  }
   139  
   140  // findConflictByTerm takes an (index, term) pair (indicating a conflicting log
   141  // entry on a leader/follower during an append) and finds the largest index in
   142  // log l with a term <= `term` and an index <= `index`. If no such index exists
   143  // in the log, the log's first index is returned.
   144  //
   145  // The index provided MUST be equal to or less than l.lastIndex(). Invalid
   146  // inputs log a warning and the input index is returned.
   147  func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 {
   148  	if li := l.lastIndex(); index > li {
   149  		// NB: such calls should not exist, but since there is a straightfoward
   150  		// way to recover, do it.
   151  		//
   152  		// It is tempting to also check something about the first index, but
   153  		// there is odd behavior with peers that have no log, in which case
   154  		// lastIndex will return zero and firstIndex will return one, which
   155  		// leads to calls with an index of zero into this method.
   156  		l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm",
   157  			index, li)
   158  		return index
   159  	}
   160  	for {
   161  		logTerm, err := l.term(index)
   162  		if logTerm <= term || err != nil {
   163  			break
   164  		}
   165  		index--
   166  	}
   167  	return index
   168  }
   169  
   170  func (l *raftLog) unstableEntries() []pb.Entry {
   171  	if len(l.unstable.entries) == 0 {
   172  		return nil
   173  	}
   174  	return l.unstable.entries
   175  }
   176  
   177  // nextEnts returns all the available entries for execution.
   178  // If applied is smaller than the index of snapshot, it returns all committed
   179  // entries after the index of snapshot.
   180  func (l *raftLog) nextEnts() (ents []pb.Entry) {
   181  	off := max(l.applied+1, l.firstIndex())
   182  	if l.committed+1 > off {
   183  		ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize)
   184  		if err != nil {
   185  			l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
   186  		}
   187  		return ents
   188  	}
   189  	return nil
   190  }
   191  
   192  // hasNextEnts returns if there is any available entries for execution. This
   193  // is a fast check without heavy raftLog.slice() in raftLog.nextEnts().
   194  func (l *raftLog) hasNextEnts() bool {
   195  	off := max(l.applied+1, l.firstIndex())
   196  	return l.committed+1 > off
   197  }
   198  
   199  // hasPendingSnapshot returns if there is pending snapshot waiting for applying.
   200  func (l *raftLog) hasPendingSnapshot() bool {
   201  	return l.unstable.snapshot != nil && !IsEmptySnap(*l.unstable.snapshot)
   202  }
   203  
   204  func (l *raftLog) snapshot() (pb.Snapshot, error) {
   205  	if l.unstable.snapshot != nil {
   206  		return *l.unstable.snapshot, nil
   207  	}
   208  	return l.storage.Snapshot()
   209  }
   210  
   211  func (l *raftLog) firstIndex() uint64 {
   212  	if i, ok := l.unstable.maybeFirstIndex(); ok {
   213  		return i
   214  	}
   215  	index, err := l.storage.FirstIndex()
   216  	if err != nil {
   217  		panic(err) // TODO(bdarnell)
   218  	}
   219  	return index
   220  }
   221  
   222  func (l *raftLog) lastIndex() uint64 {
   223  	if i, ok := l.unstable.maybeLastIndex(); ok {
   224  		return i
   225  	}
   226  	i, err := l.storage.LastIndex()
   227  	if err != nil {
   228  		panic(err) // TODO(bdarnell)
   229  	}
   230  	return i
   231  }
   232  
   233  func (l *raftLog) commitTo(tocommit uint64) {
   234  	// never decrease commit
   235  	if l.committed < tocommit {
   236  		if l.lastIndex() < tocommit {
   237  			l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
   238  		}
   239  		l.committed = tocommit
   240  	}
   241  }
   242  
   243  func (l *raftLog) appliedTo(i uint64) {
   244  	if i == 0 {
   245  		return
   246  	}
   247  	if l.committed < i || i < l.applied {
   248  		l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
   249  	}
   250  	l.applied = i
   251  }
   252  
   253  func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
   254  
   255  func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
   256  
   257  func (l *raftLog) lastTerm() uint64 {
   258  	t, err := l.term(l.lastIndex())
   259  	if err != nil {
   260  		l.logger.Panicf("unexpected error when getting the last term (%v)", err)
   261  	}
   262  	return t
   263  }
   264  
   265  func (l *raftLog) term(i uint64) (uint64, error) {
   266  	// the valid term range is [index of dummy entry, last index]
   267  	dummyIndex := l.firstIndex() - 1
   268  	if i < dummyIndex || i > l.lastIndex() {
   269  		// TODO: return an error instead?
   270  		return 0, nil
   271  	}
   272  
   273  	if t, ok := l.unstable.maybeTerm(i); ok {
   274  		return t, nil
   275  	}
   276  
   277  	t, err := l.storage.Term(i)
   278  	if err == nil {
   279  		return t, nil
   280  	}
   281  	if err == ErrCompacted || err == ErrUnavailable {
   282  		return 0, err
   283  	}
   284  	panic(err) // TODO(bdarnell)
   285  }
   286  
   287  func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) {
   288  	if i > l.lastIndex() {
   289  		return nil, nil
   290  	}
   291  	return l.slice(i, l.lastIndex()+1, maxsize)
   292  }
   293  
   294  // allEntries returns all entries in the log.
   295  func (l *raftLog) allEntries() []pb.Entry {
   296  	ents, err := l.entries(l.firstIndex(), noLimit)
   297  	if err == nil {
   298  		return ents
   299  	}
   300  	if err == ErrCompacted { // try again if there was a racing compaction
   301  		return l.allEntries()
   302  	}
   303  	// TODO (xiangli): handle error?
   304  	panic(err)
   305  }
   306  
   307  // isUpToDate determines if the given (lastIndex,term) log is more up-to-date
   308  // by comparing the index and term of the last entries in the existing logs.
   309  // If the logs have last entries with different terms, then the log with the
   310  // later term is more up-to-date. If the logs end with the same term, then
   311  // whichever log has the larger lastIndex is more up-to-date. If the logs are
   312  // the same, the given log is up-to-date.
   313  func (l *raftLog) isUpToDate(lasti, term uint64) bool {
   314  	return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
   315  }
   316  
   317  func (l *raftLog) matchTerm(i, term uint64) bool {
   318  	t, err := l.term(i)
   319  	if err != nil {
   320  		return false
   321  	}
   322  	return t == term
   323  }
   324  
   325  func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
   326  	if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
   327  		l.commitTo(maxIndex)
   328  		return true
   329  	}
   330  	return false
   331  }
   332  
   333  func (l *raftLog) restore(s pb.Snapshot) {
   334  	l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
   335  	l.committed = s.Metadata.Index
   336  	l.unstable.restore(s)
   337  }
   338  
   339  // slice returns a slice of log entries from lo through hi-1, inclusive.
   340  func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
   341  	err := l.mustCheckOutOfBounds(lo, hi)
   342  	if err != nil {
   343  		return nil, err
   344  	}
   345  	if lo == hi {
   346  		return nil, nil
   347  	}
   348  	var ents []pb.Entry
   349  	if lo < l.unstable.offset {
   350  		storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize)
   351  		if err == ErrCompacted {
   352  			return nil, err
   353  		} else if err == ErrUnavailable {
   354  			l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
   355  		} else if err != nil {
   356  			panic(err) // TODO(bdarnell)
   357  		}
   358  
   359  		// check if ents has reached the size limitation
   360  		if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo {
   361  			return storedEnts, nil
   362  		}
   363  
   364  		ents = storedEnts
   365  	}
   366  	if hi > l.unstable.offset {
   367  		unstable := l.unstable.slice(max(lo, l.unstable.offset), hi)
   368  		if len(ents) > 0 {
   369  			combined := make([]pb.Entry, len(ents)+len(unstable))
   370  			n := copy(combined, ents)
   371  			copy(combined[n:], unstable)
   372  			ents = combined
   373  		} else {
   374  			ents = unstable
   375  		}
   376  	}
   377  	return limitSize(ents, maxSize), nil
   378  }
   379  
   380  // l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
   381  func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
   382  	if lo > hi {
   383  		l.logger.Panicf("invalid slice %d > %d", lo, hi)
   384  	}
   385  	fi := l.firstIndex()
   386  	if lo < fi {
   387  		return ErrCompacted
   388  	}
   389  
   390  	length := l.lastIndex() + 1 - fi
   391  	if hi > fi+length {
   392  		l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
   393  	}
   394  	return nil
   395  }
   396  
   397  func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 {
   398  	if err == nil {
   399  		return t
   400  	}
   401  	if err == ErrCompacted {
   402  		return 0
   403  	}
   404  	l.logger.Panicf("unexpected error (%v)", err)
   405  	return 0
   406  }
   407  

View as plain text