...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor/periodic_test.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package v3compactor
    16  
    17  import (
    18  	"reflect"
    19  	"testing"
    20  	"time"
    21  
    22  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    23  	"go.etcd.io/etcd/client/pkg/v3/testutil"
    24  
    25  	"github.com/jonboulle/clockwork"
    26  	"go.uber.org/zap"
    27  )
    28  
    29  func TestPeriodicHourly(t *testing.T) {
    30  	retentionHours := 2
    31  	retentionDuration := time.Duration(retentionHours) * time.Hour
    32  
    33  	fc := clockwork.NewFakeClock()
    34  	// TODO: Do not depand or real time (Recorder.Wait) in unit tests.
    35  	rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
    36  	compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
    37  	tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
    38  
    39  	tb.Run()
    40  	defer tb.Stop()
    41  
    42  	initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
    43  
    44  	// compaction doesn't happen til 2 hours elapse
    45  	for i := 0; i < initialIntervals; i++ {
    46  		rg.Wait(1)
    47  		fc.Advance(tb.getRetryInterval())
    48  	}
    49  
    50  	// very first compaction
    51  	a, err := compactable.Wait(1)
    52  	if err != nil {
    53  		t.Fatal(err)
    54  	}
    55  	expectedRevision := int64(1)
    56  	if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
    57  		t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
    58  	}
    59  
    60  	// simulate 3 hours
    61  	// now compactor kicks in, every hour
    62  	for i := 0; i < 3; i++ {
    63  		// advance one hour, one revision for each interval
    64  		for j := 0; j < intervalsPerPeriod; j++ {
    65  			rg.Wait(1)
    66  			fc.Advance(tb.getRetryInterval())
    67  		}
    68  
    69  		a, err = compactable.Wait(1)
    70  		if err != nil {
    71  			t.Fatal(err)
    72  		}
    73  
    74  		expectedRevision = int64((i + 1) * 10)
    75  		if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
    76  			t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
    77  		}
    78  	}
    79  }
    80  
    81  func TestPeriodicMinutes(t *testing.T) {
    82  	retentionMinutes := 5
    83  	retentionDuration := time.Duration(retentionMinutes) * time.Minute
    84  
    85  	fc := clockwork.NewFakeClock()
    86  	rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
    87  	compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
    88  	tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
    89  
    90  	tb.Run()
    91  	defer tb.Stop()
    92  
    93  	initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
    94  
    95  	// compaction doesn't happen til 5 minutes elapse
    96  	for i := 0; i < initialIntervals; i++ {
    97  		rg.Wait(1)
    98  		fc.Advance(tb.getRetryInterval())
    99  	}
   100  
   101  	// very first compaction
   102  	a, err := compactable.Wait(1)
   103  	if err != nil {
   104  		t.Fatal(err)
   105  	}
   106  	expectedRevision := int64(1)
   107  	if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
   108  		t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
   109  	}
   110  
   111  	// compaction happens at every interval
   112  	for i := 0; i < 5; i++ {
   113  		// advance 5-minute, one revision for each interval
   114  		for j := 0; j < intervalsPerPeriod; j++ {
   115  			rg.Wait(1)
   116  			fc.Advance(tb.getRetryInterval())
   117  		}
   118  
   119  		a, err := compactable.Wait(1)
   120  		if err != nil {
   121  			t.Fatal(err)
   122  		}
   123  
   124  		expectedRevision = int64((i + 1) * 10)
   125  		if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
   126  			t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
   127  		}
   128  	}
   129  }
   130  
   131  func TestPeriodicPause(t *testing.T) {
   132  	fc := clockwork.NewFakeClock()
   133  	retentionDuration := time.Hour
   134  	rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
   135  	compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
   136  	tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
   137  
   138  	tb.Run()
   139  	tb.Pause()
   140  
   141  	n := tb.getRetentions()
   142  
   143  	// tb will collect 3 hours of revisions but not compact since paused
   144  	for i := 0; i < n*3; i++ {
   145  		rg.Wait(1)
   146  		fc.Advance(tb.getRetryInterval())
   147  	}
   148  	// t.revs = [21 22 23 24 25 26 27 28 29 30]
   149  
   150  	select {
   151  	case a := <-compactable.Chan():
   152  		t.Fatalf("unexpected action %v", a)
   153  	case <-time.After(10 * time.Millisecond):
   154  	}
   155  
   156  	// tb resumes to being blocked on the clock
   157  	tb.Resume()
   158  	rg.Wait(1)
   159  
   160  	// unblock clock, will kick off a compaction at T=3h6m by retry
   161  	fc.Advance(tb.getRetryInterval())
   162  
   163  	// T=3h6m
   164  	a, err := compactable.Wait(1)
   165  	if err != nil {
   166  		t.Fatal(err)
   167  	}
   168  
   169  	// compact the revision from hour 2:06
   170  	wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
   171  	if !reflect.DeepEqual(a[0].Params[0], wreq) {
   172  		t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
   173  	}
   174  }
   175  

View as plain text