...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/storage.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package etcdserver
    16  
    17  import (
    18  	"errors"
    19  	"io"
    20  
    21  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    22  	"go.etcd.io/etcd/client/pkg/v3/types"
    23  	"go.etcd.io/etcd/pkg/v3/pbutil"
    24  	"go.etcd.io/etcd/raft/v3/raftpb"
    25  	"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
    26  	"go.etcd.io/etcd/server/v3/wal"
    27  	"go.etcd.io/etcd/server/v3/wal/walpb"
    28  
    29  	"go.uber.org/zap"
    30  )
    31  
    32  type Storage interface {
    33  	// Save function saves ents and state to the underlying stable storage.
    34  	// Save MUST block until st and ents are on stable storage.
    35  	Save(st raftpb.HardState, ents []raftpb.Entry) error
    36  	// SaveSnap function saves snapshot to the underlying stable storage.
    37  	SaveSnap(snap raftpb.Snapshot) error
    38  	// Close closes the Storage and performs finalization.
    39  	Close() error
    40  	// Release releases the locked wal files older than the provided snapshot.
    41  	Release(snap raftpb.Snapshot) error
    42  	// Sync WAL
    43  	Sync() error
    44  }
    45  
    46  type storage struct {
    47  	*wal.WAL
    48  	*snap.Snapshotter
    49  }
    50  
    51  func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
    52  	return &storage{w, s}
    53  }
    54  
    55  // SaveSnap saves the snapshot file to disk and writes the WAL snapshot entry.
    56  func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
    57  	walsnap := walpb.Snapshot{
    58  		Index:     snap.Metadata.Index,
    59  		Term:      snap.Metadata.Term,
    60  		ConfState: &snap.Metadata.ConfState,
    61  	}
    62  	// save the snapshot file before writing the snapshot to the wal.
    63  	// This makes it possible for the snapshot file to become orphaned, but prevents
    64  	// a WAL snapshot entry from having no corresponding snapshot file.
    65  	err := st.Snapshotter.SaveSnap(snap)
    66  	if err != nil {
    67  		return err
    68  	}
    69  	// gofail: var raftBeforeWALSaveSnaphot struct{}
    70  
    71  	return st.WAL.SaveSnapshot(walsnap)
    72  }
    73  
    74  // Release releases resources older than the given snap and are no longer needed:
    75  // - releases the locks to the wal files that are older than the provided wal for the given snap.
    76  // - deletes any .snap.db files that are older than the given snap.
    77  func (st *storage) Release(snap raftpb.Snapshot) error {
    78  	if err := st.WAL.ReleaseLockTo(snap.Metadata.Index); err != nil {
    79  		return err
    80  	}
    81  	return st.Snapshotter.ReleaseSnapDBs(snap)
    82  }
    83  
    84  // readWAL reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
    85  // after the position of the given snap in the WAL.
    86  // The snap must have been previously saved to the WAL, or this call will panic.
    87  func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot, unsafeNoFsync bool) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
    88  	var (
    89  		err       error
    90  		wmetadata []byte
    91  	)
    92  
    93  	repaired := false
    94  	for {
    95  		if w, err = wal.Open(lg, waldir, snap); err != nil {
    96  			lg.Fatal("failed to open WAL", zap.Error(err))
    97  		}
    98  		if unsafeNoFsync {
    99  			w.SetUnsafeNoFsync()
   100  		}
   101  		if wmetadata, st, ents, err = w.ReadAll(); err != nil {
   102  			w.Close()
   103  			// we can only repair ErrUnexpectedEOF and we never repair twice.
   104  			if repaired || !errors.Is(err, io.ErrUnexpectedEOF) {
   105  				lg.Fatal("failed to read WAL, cannot be repaired", zap.Error(err))
   106  			}
   107  			if !wal.Repair(lg, waldir) {
   108  				lg.Fatal("failed to repair WAL", zap.Error(err))
   109  			} else {
   110  				lg.Info("repaired WAL", zap.Error(err))
   111  				repaired = true
   112  			}
   113  			continue
   114  		}
   115  		break
   116  	}
   117  	var metadata pb.Metadata
   118  	pbutil.MustUnmarshal(&metadata, wmetadata)
   119  	id = types.ID(metadata.NodeID)
   120  	cid = types.ID(metadata.ClusterID)
   121  	return w, id, cid, st, ents
   122  }
   123  

View as plain text