1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v3compactor
16
17 import (
18 "reflect"
19 "testing"
20 "time"
21
22 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
23 "go.etcd.io/etcd/client/pkg/v3/testutil"
24
25 "github.com/jonboulle/clockwork"
26 "go.uber.org/zap"
27 )
28
29 func TestPeriodicHourly(t *testing.T) {
30 retentionHours := 2
31 retentionDuration := time.Duration(retentionHours) * time.Hour
32
33 fc := clockwork.NewFakeClock()
34
35 rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
36 compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
37 tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
38
39 tb.Run()
40 defer tb.Stop()
41
42 initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
43
44
45 for i := 0; i < initialIntervals; i++ {
46 rg.Wait(1)
47 fc.Advance(tb.getRetryInterval())
48 }
49
50
51 a, err := compactable.Wait(1)
52 if err != nil {
53 t.Fatal(err)
54 }
55 expectedRevision := int64(1)
56 if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
57 t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
58 }
59
60
61
62 for i := 0; i < 3; i++ {
63
64 for j := 0; j < intervalsPerPeriod; j++ {
65 rg.Wait(1)
66 fc.Advance(tb.getRetryInterval())
67 }
68
69 a, err = compactable.Wait(1)
70 if err != nil {
71 t.Fatal(err)
72 }
73
74 expectedRevision = int64((i + 1) * 10)
75 if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
76 t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
77 }
78 }
79 }
80
81 func TestPeriodicMinutes(t *testing.T) {
82 retentionMinutes := 5
83 retentionDuration := time.Duration(retentionMinutes) * time.Minute
84
85 fc := clockwork.NewFakeClock()
86 rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
87 compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
88 tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
89
90 tb.Run()
91 defer tb.Stop()
92
93 initialIntervals, intervalsPerPeriod := tb.getRetentions(), 10
94
95
96 for i := 0; i < initialIntervals; i++ {
97 rg.Wait(1)
98 fc.Advance(tb.getRetryInterval())
99 }
100
101
102 a, err := compactable.Wait(1)
103 if err != nil {
104 t.Fatal(err)
105 }
106 expectedRevision := int64(1)
107 if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
108 t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
109 }
110
111
112 for i := 0; i < 5; i++ {
113
114 for j := 0; j < intervalsPerPeriod; j++ {
115 rg.Wait(1)
116 fc.Advance(tb.getRetryInterval())
117 }
118
119 a, err := compactable.Wait(1)
120 if err != nil {
121 t.Fatal(err)
122 }
123
124 expectedRevision = int64((i + 1) * 10)
125 if !reflect.DeepEqual(a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision}) {
126 t.Errorf("compact request = %v, want %v", a[0].Params[0], &pb.CompactionRequest{Revision: expectedRevision})
127 }
128 }
129 }
130
131 func TestPeriodicPause(t *testing.T) {
132 fc := clockwork.NewFakeClock()
133 retentionDuration := time.Hour
134 rg := &fakeRevGetter{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond), 0}
135 compactable := &fakeCompactable{testutil.NewRecorderStreamWithWaitTimout(10 * time.Millisecond)}
136 tb := newPeriodic(zap.NewExample(), fc, retentionDuration, rg, compactable)
137
138 tb.Run()
139 tb.Pause()
140
141 n := tb.getRetentions()
142
143
144 for i := 0; i < n*3; i++ {
145 rg.Wait(1)
146 fc.Advance(tb.getRetryInterval())
147 }
148
149
150 select {
151 case a := <-compactable.Chan():
152 t.Fatalf("unexpected action %v", a)
153 case <-time.After(10 * time.Millisecond):
154 }
155
156
157 tb.Resume()
158 rg.Wait(1)
159
160
161 fc.Advance(tb.getRetryInterval())
162
163
164 a, err := compactable.Wait(1)
165 if err != nil {
166 t.Fatal(err)
167 }
168
169
170 wreq := &pb.CompactionRequest{Revision: int64(1 + 2*n + 1)}
171 if !reflect.DeepEqual(a[0].Params[0], wreq) {
172 t.Errorf("compact request = %v, want %v", a[0].Params[0], wreq.Revision)
173 }
174 }
175
View as plain text