...

Source file src/go.etcd.io/etcd/server/v3/mvcc/kvstore_compaction.go

Documentation: go.etcd.io/etcd/server/v3/mvcc

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package mvcc
    16  
    17  import (
    18  	"encoding/binary"
    19  	"fmt"
    20  	"time"
    21  
    22  	humanize "github.com/dustin/go-humanize"
    23  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    24  	"go.uber.org/zap"
    25  )
    26  
    27  func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error) {
    28  	totalStart := time.Now()
    29  	keep := s.kvindex.Compact(compactMainRev)
    30  	indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond))
    31  
    32  	totalStart = time.Now()
    33  	defer func() { dbCompactionTotalMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) }()
    34  	keyCompactions := 0
    35  	defer func() { dbCompactionKeysCounter.Add(float64(keyCompactions)) }()
    36  	defer func() { dbCompactionLast.Set(float64(time.Now().Unix())) }()
    37  
    38  	end := make([]byte, 8)
    39  	binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
    40  
    41  	batchNum := s.cfg.CompactionBatchLimit
    42  	h := newKVHasher(prevCompactRev, compactMainRev, keep)
    43  	last := make([]byte, 8+1+8)
    44  	for {
    45  		var rev revision
    46  
    47  		start := time.Now()
    48  
    49  		tx := s.b.BatchTx()
    50  		tx.LockOutsideApply()
    51  		keys, values := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum))
    52  		for i := range keys {
    53  			rev = bytesToRev(keys[i])
    54  			if _, ok := keep[rev]; !ok {
    55  				tx.UnsafeDelete(buckets.Key, keys[i])
    56  				keyCompactions++
    57  			}
    58  			h.WriteKeyValue(keys[i], values[i])
    59  		}
    60  
    61  		if len(keys) < s.cfg.CompactionBatchLimit {
    62  			rbytes := make([]byte, 8+1+8)
    63  			revToBytes(revision{main: compactMainRev}, rbytes)
    64  			tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes)
    65  			tx.Unlock()
    66  			hash := h.Hash()
    67  			size, sizeInUse := s.b.Size(), s.b.SizeInUse()
    68  			s.lg.Info(
    69  				"finished scheduled compaction",
    70  				zap.Int64("compact-revision", compactMainRev),
    71  				zap.Duration("took", time.Since(totalStart)),
    72  				zap.Uint32("hash", hash.Hash),
    73  				zap.Int64("current-db-size-bytes", size),
    74  				zap.String("current-db-size", humanize.Bytes(uint64(size))),
    75  				zap.Int64("current-db-size-in-use-bytes", sizeInUse),
    76  				zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse))),
    77  			)
    78  			return hash, nil
    79  		}
    80  
    81  		// update last
    82  		revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
    83  		tx.Unlock()
    84  		// Immediately commit the compaction deletes instead of letting them accumulate in the write buffer
    85  		s.b.ForceCommit()
    86  		dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
    87  
    88  		select {
    89  		case <-time.After(10 * time.Millisecond):
    90  		case <-s.stopc:
    91  			return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
    92  		}
    93  	}
    94  }
    95  

View as plain text