...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/snap/snapshotter.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/snap

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package snap
    16  
    17  import (
    18  	"errors"
    19  	"fmt"
    20  	"hash/crc32"
    21  	"io/ioutil"
    22  	"os"
    23  	"path/filepath"
    24  	"sort"
    25  	"strconv"
    26  	"strings"
    27  	"time"
    28  
    29  	pioutil "go.etcd.io/etcd/pkg/v3/ioutil"
    30  	"go.etcd.io/etcd/pkg/v3/pbutil"
    31  	"go.etcd.io/etcd/raft/v3"
    32  	"go.etcd.io/etcd/raft/v3/raftpb"
    33  	"go.etcd.io/etcd/server/v3/etcdserver/api/snap/snappb"
    34  	"go.etcd.io/etcd/server/v3/wal/walpb"
    35  
    36  	"go.uber.org/zap"
    37  )
    38  
    39  const snapSuffix = ".snap"
    40  
    41  var (
    42  	ErrNoSnapshot    = errors.New("snap: no available snapshot")
    43  	ErrEmptySnapshot = errors.New("snap: empty snapshot")
    44  	ErrCRCMismatch   = errors.New("snap: crc mismatch")
    45  	crcTable         = crc32.MakeTable(crc32.Castagnoli)
    46  
    47  	// A map of valid files that can be present in the snap folder.
    48  	validFiles = map[string]bool{
    49  		"db": true,
    50  	}
    51  )
    52  
    53  type Snapshotter struct {
    54  	lg  *zap.Logger
    55  	dir string
    56  }
    57  
    58  func New(lg *zap.Logger, dir string) *Snapshotter {
    59  	if lg == nil {
    60  		lg = zap.NewNop()
    61  	}
    62  	return &Snapshotter{
    63  		lg:  lg,
    64  		dir: dir,
    65  	}
    66  }
    67  
    68  func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) error {
    69  	if raft.IsEmptySnap(snapshot) {
    70  		return nil
    71  	}
    72  	return s.save(&snapshot)
    73  }
    74  
    75  func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
    76  	start := time.Now()
    77  
    78  	fname := fmt.Sprintf("%016x-%016x%s", snapshot.Metadata.Term, snapshot.Metadata.Index, snapSuffix)
    79  	b := pbutil.MustMarshal(snapshot)
    80  	crc := crc32.Update(0, crcTable, b)
    81  	snap := snappb.Snapshot{Crc: crc, Data: b}
    82  	d, err := snap.Marshal()
    83  	if err != nil {
    84  		return err
    85  	}
    86  	snapMarshallingSec.Observe(time.Since(start).Seconds())
    87  
    88  	spath := filepath.Join(s.dir, fname)
    89  
    90  	fsyncStart := time.Now()
    91  	err = pioutil.WriteAndSyncFile(spath, d, 0666)
    92  	snapFsyncSec.Observe(time.Since(fsyncStart).Seconds())
    93  
    94  	if err != nil {
    95  		s.lg.Warn("failed to write a snap file", zap.String("path", spath), zap.Error(err))
    96  		rerr := os.Remove(spath)
    97  		if rerr != nil {
    98  			s.lg.Warn("failed to remove a broken snap file", zap.String("path", spath), zap.Error(err))
    99  		}
   100  		return err
   101  	}
   102  
   103  	snapSaveSec.Observe(time.Since(start).Seconds())
   104  	return nil
   105  }
   106  
   107  // Load returns the newest snapshot.
   108  func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
   109  	return s.loadMatching(func(*raftpb.Snapshot) bool { return true })
   110  }
   111  
   112  // LoadNewestAvailable loads the newest snapshot available that is in walSnaps.
   113  func (s *Snapshotter) LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) {
   114  	return s.loadMatching(func(snapshot *raftpb.Snapshot) bool {
   115  		m := snapshot.Metadata
   116  		for i := len(walSnaps) - 1; i >= 0; i-- {
   117  			if m.Term == walSnaps[i].Term && m.Index == walSnaps[i].Index {
   118  				return true
   119  			}
   120  		}
   121  		return false
   122  	})
   123  }
   124  
   125  // loadMatching returns the newest snapshot where matchFn returns true.
   126  func (s *Snapshotter) loadMatching(matchFn func(*raftpb.Snapshot) bool) (*raftpb.Snapshot, error) {
   127  	names, err := s.snapNames()
   128  	if err != nil {
   129  		return nil, err
   130  	}
   131  	var snap *raftpb.Snapshot
   132  	for _, name := range names {
   133  		if snap, err = loadSnap(s.lg, s.dir, name); err == nil && matchFn(snap) {
   134  			return snap, nil
   135  		}
   136  	}
   137  	return nil, ErrNoSnapshot
   138  }
   139  
   140  func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
   141  	fpath := filepath.Join(dir, name)
   142  	snap, err := Read(lg, fpath)
   143  	if err != nil {
   144  		brokenPath := fpath + ".broken"
   145  		if lg != nil {
   146  			lg.Warn("failed to read a snap file", zap.String("path", fpath), zap.Error(err))
   147  		}
   148  		if rerr := os.Rename(fpath, brokenPath); rerr != nil {
   149  			if lg != nil {
   150  				lg.Warn("failed to rename a broken snap file", zap.String("path", fpath), zap.String("broken-path", brokenPath), zap.Error(rerr))
   151  			}
   152  		} else {
   153  			if lg != nil {
   154  				lg.Warn("renamed to a broken snap file", zap.String("path", fpath), zap.String("broken-path", brokenPath))
   155  			}
   156  		}
   157  	}
   158  	return snap, err
   159  }
   160  
   161  // Read reads the snapshot named by snapname and returns the snapshot.
   162  func Read(lg *zap.Logger, snapname string) (*raftpb.Snapshot, error) {
   163  	b, err := ioutil.ReadFile(snapname)
   164  	if err != nil {
   165  		if lg != nil {
   166  			lg.Warn("failed to read a snap file", zap.String("path", snapname), zap.Error(err))
   167  		}
   168  		return nil, err
   169  	}
   170  
   171  	if len(b) == 0 {
   172  		if lg != nil {
   173  			lg.Warn("failed to read empty snapshot file", zap.String("path", snapname))
   174  		}
   175  		return nil, ErrEmptySnapshot
   176  	}
   177  
   178  	var serializedSnap snappb.Snapshot
   179  	if err = serializedSnap.Unmarshal(b); err != nil {
   180  		if lg != nil {
   181  			lg.Warn("failed to unmarshal snappb.Snapshot", zap.String("path", snapname), zap.Error(err))
   182  		}
   183  		return nil, err
   184  	}
   185  
   186  	if len(serializedSnap.Data) == 0 || serializedSnap.Crc == 0 {
   187  		if lg != nil {
   188  			lg.Warn("failed to read empty snapshot data", zap.String("path", snapname))
   189  		}
   190  		return nil, ErrEmptySnapshot
   191  	}
   192  
   193  	crc := crc32.Update(0, crcTable, serializedSnap.Data)
   194  	if crc != serializedSnap.Crc {
   195  		if lg != nil {
   196  			lg.Warn("snap file is corrupt",
   197  				zap.String("path", snapname),
   198  				zap.Uint32("prev-crc", serializedSnap.Crc),
   199  				zap.Uint32("new-crc", crc),
   200  			)
   201  		}
   202  		return nil, ErrCRCMismatch
   203  	}
   204  
   205  	var snap raftpb.Snapshot
   206  	if err = snap.Unmarshal(serializedSnap.Data); err != nil {
   207  		if lg != nil {
   208  			lg.Warn("failed to unmarshal raftpb.Snapshot", zap.String("path", snapname), zap.Error(err))
   209  		}
   210  		return nil, err
   211  	}
   212  	return &snap, nil
   213  }
   214  
   215  // snapNames returns the filename of the snapshots in logical time order (from newest to oldest).
   216  // If there is no available snapshots, an ErrNoSnapshot will be returned.
   217  func (s *Snapshotter) snapNames() ([]string, error) {
   218  	dir, err := os.Open(s.dir)
   219  	if err != nil {
   220  		return nil, err
   221  	}
   222  	defer dir.Close()
   223  	names, err := dir.Readdirnames(-1)
   224  	if err != nil {
   225  		return nil, err
   226  	}
   227  	filenames, err := s.cleanupSnapdir(names)
   228  	if err != nil {
   229  		return nil, err
   230  	}
   231  	snaps := checkSuffix(s.lg, filenames)
   232  	if len(snaps) == 0 {
   233  		return nil, ErrNoSnapshot
   234  	}
   235  	sort.Sort(sort.Reverse(sort.StringSlice(snaps)))
   236  	return snaps, nil
   237  }
   238  
   239  func checkSuffix(lg *zap.Logger, names []string) []string {
   240  	snaps := []string{}
   241  	for i := range names {
   242  		if strings.HasSuffix(names[i], snapSuffix) {
   243  			snaps = append(snaps, names[i])
   244  		} else {
   245  			// If we find a file which is not a snapshot then check if it's
   246  			// a vaild file. If not throw out a warning.
   247  			if _, ok := validFiles[names[i]]; !ok {
   248  				if lg != nil {
   249  					lg.Warn("found unexpected non-snap file; skipping", zap.String("path", names[i]))
   250  				}
   251  			}
   252  		}
   253  	}
   254  	return snaps
   255  }
   256  
   257  // cleanupSnapdir removes any files that should not be in the snapshot directory:
   258  // - db.tmp prefixed files that can be orphaned by defragmentation
   259  func (s *Snapshotter) cleanupSnapdir(filenames []string) (names []string, err error) {
   260  	names = make([]string, 0, len(filenames))
   261  	for _, filename := range filenames {
   262  		if strings.HasPrefix(filename, "db.tmp") {
   263  			s.lg.Info("found orphaned defragmentation file; deleting", zap.String("path", filename))
   264  			if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
   265  				return names, fmt.Errorf("failed to remove orphaned .snap.db file %s: %v", filename, rmErr)
   266  			}
   267  		} else {
   268  			names = append(names, filename)
   269  		}
   270  	}
   271  	return names, nil
   272  }
   273  
   274  func (s *Snapshotter) ReleaseSnapDBs(snap raftpb.Snapshot) error {
   275  	dir, err := os.Open(s.dir)
   276  	if err != nil {
   277  		return err
   278  	}
   279  	defer dir.Close()
   280  	filenames, err := dir.Readdirnames(-1)
   281  	if err != nil {
   282  		return err
   283  	}
   284  	for _, filename := range filenames {
   285  		if strings.HasSuffix(filename, ".snap.db") {
   286  			hexIndex := strings.TrimSuffix(filepath.Base(filename), ".snap.db")
   287  			index, err := strconv.ParseUint(hexIndex, 16, 64)
   288  			if err != nil {
   289  				s.lg.Error("failed to parse index from filename", zap.String("path", filename), zap.String("error", err.Error()))
   290  				continue
   291  			}
   292  			if index < snap.Metadata.Index {
   293  				s.lg.Info("found orphaned .snap.db file; deleting", zap.String("path", filename))
   294  				if rmErr := os.Remove(filepath.Join(s.dir, filename)); rmErr != nil && !os.IsNotExist(rmErr) {
   295  					s.lg.Error("failed to remove orphaned .snap.db file", zap.String("path", filename), zap.String("error", rmErr.Error()))
   296  				}
   297  			}
   298  		}
   299  	}
   300  	return nil
   301  }
   302  

View as plain text