...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor/periodic.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor

     1  // Copyright 2017 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package v3compactor
    16  
    17  import (
    18  	"context"
    19  	"sync"
    20  	"time"
    21  
    22  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    23  	"go.etcd.io/etcd/server/v3/mvcc"
    24  
    25  	"github.com/jonboulle/clockwork"
    26  	"go.uber.org/zap"
    27  )
    28  
    29  // Periodic compacts the log by purging revisions older than
    30  // the configured retention time.
    31  type Periodic struct {
    32  	lg     *zap.Logger
    33  	clock  clockwork.Clock
    34  	period time.Duration
    35  
    36  	rg RevGetter
    37  	c  Compactable
    38  
    39  	revs   []int64
    40  	ctx    context.Context
    41  	cancel context.CancelFunc
    42  
    43  	// mu protects paused
    44  	mu     sync.RWMutex
    45  	paused bool
    46  }
    47  
    48  // newPeriodic creates a new instance of Periodic compactor that purges
    49  // the log older than h Duration.
    50  func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
    51  	pc := &Periodic{
    52  		lg:     lg,
    53  		clock:  clock,
    54  		period: h,
    55  		rg:     rg,
    56  		c:      c,
    57  		revs:   make([]int64, 0),
    58  	}
    59  	pc.ctx, pc.cancel = context.WithCancel(context.Background())
    60  	return pc
    61  }
    62  
    63  /*
    64  Compaction period 1-hour:
    65    1. compute compaction period, which is 1-hour
    66    2. record revisions for every 1/10 of 1-hour (6-minute)
    67    3. keep recording revisions with no compaction for first 1-hour
    68    4. do compact with revs[0]
    69  	- success? contiue on for-loop and move sliding window; revs = revs[1:]
    70  	- failure? update revs, and retry after 1/10 of 1-hour (6-minute)
    71  
    72  Compaction period 24-hour:
    73    1. compute compaction period, which is 1-hour
    74    2. record revisions for every 1/10 of 1-hour (6-minute)
    75    3. keep recording revisions with no compaction for first 24-hour
    76    4. do compact with revs[0]
    77  	- success? contiue on for-loop and move sliding window; revs = revs[1:]
    78  	- failure? update revs, and retry after 1/10 of 1-hour (6-minute)
    79  
    80  Compaction period 59-min:
    81    1. compute compaction period, which is 59-min
    82    2. record revisions for every 1/10 of 59-min (5.9-min)
    83    3. keep recording revisions with no compaction for first 59-min
    84    4. do compact with revs[0]
    85  	- success? contiue on for-loop and move sliding window; revs = revs[1:]
    86  	- failure? update revs, and retry after 1/10 of 59-min (5.9-min)
    87  
    88  Compaction period 5-sec:
    89    1. compute compaction period, which is 5-sec
    90    2. record revisions for every 1/10 of 5-sec (0.5-sec)
    91    3. keep recording revisions with no compaction for first 5-sec
    92    4. do compact with revs[0]
    93  	- success? contiue on for-loop and move sliding window; revs = revs[1:]
    94  	- failure? update revs, and retry after 1/10 of 5-sec (0.5-sec)
    95  */
    96  
    97  // Run runs periodic compactor.
    98  func (pc *Periodic) Run() {
    99  	compactInterval := pc.getCompactInterval()
   100  	retryInterval := pc.getRetryInterval()
   101  	retentions := pc.getRetentions()
   102  
   103  	go func() {
   104  		lastSuccess := pc.clock.Now()
   105  		baseInterval := pc.period
   106  		for {
   107  			pc.revs = append(pc.revs, pc.rg.Rev())
   108  			if len(pc.revs) > retentions {
   109  				pc.revs = pc.revs[1:] // pc.revs[0] is always the rev at pc.period ago
   110  			}
   111  
   112  			select {
   113  			case <-pc.ctx.Done():
   114  				return
   115  			case <-pc.clock.After(retryInterval):
   116  				pc.mu.Lock()
   117  				p := pc.paused
   118  				pc.mu.Unlock()
   119  				if p {
   120  					continue
   121  				}
   122  			}
   123  
   124  			if pc.clock.Now().Sub(lastSuccess) < baseInterval {
   125  				continue
   126  			}
   127  
   128  			// wait up to initial given period
   129  			if baseInterval == pc.period {
   130  				baseInterval = compactInterval
   131  			}
   132  			rev := pc.revs[0]
   133  
   134  			pc.lg.Info(
   135  				"starting auto periodic compaction",
   136  				zap.Int64("revision", rev),
   137  				zap.Duration("compact-period", pc.period),
   138  			)
   139  			startTime := pc.clock.Now()
   140  			_, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev})
   141  			if err == nil || err == mvcc.ErrCompacted {
   142  				pc.lg.Info(
   143  					"completed auto periodic compaction",
   144  					zap.Int64("revision", rev),
   145  					zap.Duration("compact-period", pc.period),
   146  					zap.Duration("took", pc.clock.Now().Sub(startTime)),
   147  				)
   148  				lastSuccess = pc.clock.Now()
   149  			} else {
   150  				pc.lg.Warn(
   151  					"failed auto periodic compaction",
   152  					zap.Int64("revision", rev),
   153  					zap.Duration("compact-period", pc.period),
   154  					zap.Duration("retry-interval", retryInterval),
   155  					zap.Error(err),
   156  				)
   157  			}
   158  		}
   159  	}()
   160  }
   161  
   162  // if given compaction period x is <1-hour, compact every x duration.
   163  // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
   164  // if given compaction period x is >1-hour, compact every hour.
   165  // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
   166  func (pc *Periodic) getCompactInterval() time.Duration {
   167  	itv := pc.period
   168  	if itv > time.Hour {
   169  		itv = time.Hour
   170  	}
   171  	return itv
   172  }
   173  
   174  func (pc *Periodic) getRetentions() int {
   175  	return int(pc.period/pc.getRetryInterval()) + 1
   176  }
   177  
   178  const retryDivisor = 10
   179  
   180  func (pc *Periodic) getRetryInterval() time.Duration {
   181  	itv := pc.period
   182  	if itv > time.Hour {
   183  		itv = time.Hour
   184  	}
   185  	return itv / retryDivisor
   186  }
   187  
   188  // Stop stops periodic compactor.
   189  func (pc *Periodic) Stop() {
   190  	pc.cancel()
   191  }
   192  
   193  // Pause pauses periodic compactor.
   194  func (pc *Periodic) Pause() {
   195  	pc.mu.Lock()
   196  	pc.paused = true
   197  	pc.mu.Unlock()
   198  }
   199  
   200  // Resume resumes periodic compactor.
   201  func (pc *Periodic) Resume() {
   202  	pc.mu.Lock()
   203  	pc.paused = false
   204  	pc.mu.Unlock()
   205  }
   206  

View as plain text