1
2
3
4
5
6
7
8
9
10
11
12
13
14 package nflog
15
16 import (
17 "bytes"
18 "io"
19 "os"
20 "path/filepath"
21 "sync"
22 "testing"
23 "time"
24
25 pb "github.com/prometheus/alertmanager/nflog/nflogpb"
26
27 "github.com/prometheus/client_golang/prometheus"
28 "github.com/stretchr/testify/require"
29 )
30
31 func TestLogGC(t *testing.T) {
32 now := utcNow()
33
34 newEntry := func(ts time.Time) *pb.MeshEntry {
35 return &pb.MeshEntry{
36 ExpiresAt: ts,
37 }
38 }
39
40 l := &Log{
41 st: state{
42 "a1": newEntry(now),
43 "a2": newEntry(now.Add(time.Second)),
44 "a3": newEntry(now.Add(-time.Second)),
45 },
46 now: func() time.Time { return now },
47 metrics: newMetrics(nil),
48 }
49 n, err := l.GC()
50 require.NoError(t, err, "unexpected error in garbage collection")
51 require.Equal(t, 2, n, "unexpected number of removed entries")
52
53 expected := state{
54 "a2": newEntry(now.Add(time.Second)),
55 }
56 require.Equal(t, l.st, expected, "unexpected state after garbage collection")
57 }
58
59 func TestLogSnapshot(t *testing.T) {
60
61 now := utcNow()
62
63 cases := []struct {
64 entries []*pb.MeshEntry
65 }{
66 {
67 entries: []*pb.MeshEntry{
68 {
69 Entry: &pb.Entry{
70 GroupKey: []byte("d8e8fca2dc0f896fd7cb4cb0031ba249"),
71 Receiver: &pb.Receiver{GroupName: "abc", Integration: "test1", Idx: 1},
72 GroupHash: []byte("126a8a51b9d1bbd07fddc65819a542c3"),
73 Resolved: false,
74 Timestamp: now,
75 },
76 ExpiresAt: now,
77 }, {
78 Entry: &pb.Entry{
79 GroupKey: []byte("d8e8fca2dc0f8abce7cb4cb0031ba249"),
80 Receiver: &pb.Receiver{GroupName: "def", Integration: "test2", Idx: 29},
81 GroupHash: []byte("122c2331b9d1bbd07fddc65819a542c3"),
82 Resolved: true,
83 Timestamp: now,
84 },
85 ExpiresAt: now,
86 }, {
87 Entry: &pb.Entry{
88 GroupKey: []byte("aaaaaca2dc0f896fd7cb4cb0031ba249"),
89 Receiver: &pb.Receiver{GroupName: "ghi", Integration: "test3", Idx: 0},
90 GroupHash: []byte("126a8a51b9d1bbd07fddc6e3e3e542c3"),
91 Resolved: false,
92 Timestamp: now,
93 },
94 ExpiresAt: now,
95 },
96 },
97 },
98 }
99
100 for _, c := range cases {
101 f, err := os.CreateTemp("", "snapshot")
102 require.NoError(t, err, "creating temp file failed")
103
104 l1 := &Log{
105 st: state{},
106 metrics: newMetrics(nil),
107 }
108
109 for _, e := range c.entries {
110 l1.st[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = e
111 }
112 _, err = l1.Snapshot(f)
113 require.NoError(t, err, "creating snapshot failed")
114 require.NoError(t, f.Close(), "closing snapshot file failed")
115
116 f, err = os.Open(f.Name())
117 require.NoError(t, err, "opening snapshot file failed")
118
119
120 l2 := &Log{}
121 err = l2.loadSnapshot(f)
122 require.NoError(t, err, "error loading snapshot")
123 require.Equal(t, l1.st, l2.st, "state after loading snapshot did not match snapshotted state")
124
125 require.NoError(t, f.Close(), "closing snapshot file failed")
126 }
127 }
128
129 func TestWithMaintenance_SupportsCustomCallback(t *testing.T) {
130 f, err := os.CreateTemp("", "snapshot")
131 require.NoError(t, err, "creating temp file failed")
132
133 stopc := make(chan struct{})
134 var mtx sync.Mutex
135 var mc int
136 l, err := New(WithMetrics(prometheus.NewPedanticRegistry()), WithSnapshot(f.Name()), WithMaintenance(100*time.Millisecond, stopc, nil, func() (int64, error) {
137 mtx.Lock()
138 mc++
139 mtx.Unlock()
140
141 return 0, nil
142 }))
143 require.NoError(t, err)
144
145 go l.run()
146 time.Sleep(200 * time.Millisecond)
147 close(stopc)
148
149 require.Eventually(t, func() bool {
150 mtx.Lock()
151 defer mtx.Unlock()
152 return mc >= 2
153 }, 500*time.Millisecond, 100*time.Millisecond)
154 }
155
156 func TestReplaceFile(t *testing.T) {
157 dir, err := os.MkdirTemp("", "replace_file")
158 require.NoError(t, err, "creating temp dir failed")
159
160 origFilename := filepath.Join(dir, "testfile")
161
162 of, err := os.Create(origFilename)
163 require.NoError(t, err, "creating file failed")
164
165 nf, err := openReplace(origFilename)
166 require.NoError(t, err, "opening replacement file failed")
167
168 _, err = nf.Write([]byte("test"))
169 require.NoError(t, err, "writing replace file failed")
170
171 require.NotEqual(t, nf.Name(), of.Name(), "replacement file must have different name while editing")
172 require.NoError(t, nf.Close(), "closing replacement file failed")
173 require.NoError(t, of.Close(), "closing original file failed")
174
175 ofr, err := os.Open(origFilename)
176 require.NoError(t, err, "opening original file failed")
177 defer ofr.Close()
178
179 res, err := io.ReadAll(ofr)
180 require.NoError(t, err, "reading original file failed")
181 require.Equal(t, "test", string(res), "unexpected file contents")
182 }
183
184 func TestStateMerge(t *testing.T) {
185 now := utcNow()
186
187
188
189 newEntry := func(name string, ts, exp time.Time) *pb.MeshEntry {
190 return &pb.MeshEntry{
191 Entry: &pb.Entry{
192 Timestamp: ts,
193 GroupKey: []byte("key"),
194 Receiver: &pb.Receiver{
195 GroupName: name,
196 Idx: 1,
197 Integration: "integr",
198 },
199 },
200 ExpiresAt: exp,
201 }
202 }
203
204 exp := now.Add(time.Minute)
205
206 cases := []struct {
207 a, b state
208 final state
209 }{
210 {
211 a: state{
212 "key:a1/integr/1": newEntry("a1", now, exp),
213 "key:a2/integr/1": newEntry("a2", now, exp),
214 "key:a3/integr/1": newEntry("a3", now, exp),
215 },
216 b: state{
217 "key:b1/integr/1": newEntry("b1", now, exp),
218 "key:b2/integr/1": newEntry("b2", now.Add(-time.Minute), now.Add(-time.Millisecond)),
219 "key:a2/integr/1": newEntry("a2", now.Add(-time.Minute), exp),
220 "key:a3/integr/1": newEntry("a3", now.Add(time.Minute), exp),
221 },
222 final: state{
223 "key:a1/integr/1": newEntry("a1", now, exp),
224 "key:a2/integr/1": newEntry("a2", now, exp),
225 "key:a3/integr/1": newEntry("a3", now.Add(time.Minute), exp),
226 "key:b1/integr/1": newEntry("b1", now, exp),
227 },
228 },
229 }
230
231 for _, c := range cases {
232 ca, cb := c.a.clone(), c.b.clone()
233
234 res := c.a.clone()
235 for _, e := range cb {
236 res.merge(e, now)
237 }
238 require.Equal(t, c.final, res, "Merge result should match expectation")
239 require.Equal(t, c.b, cb, "Merged state should remain unmodified")
240 require.NotEqual(t, c.final, ca, "Merge should not change original state")
241 }
242 }
243
244 func TestStateDataCoding(t *testing.T) {
245
246 now := utcNow()
247
248 cases := []struct {
249 entries []*pb.MeshEntry
250 }{
251 {
252 entries: []*pb.MeshEntry{
253 {
254 Entry: &pb.Entry{
255 GroupKey: []byte("d8e8fca2dc0f896fd7cb4cb0031ba249"),
256 Receiver: &pb.Receiver{GroupName: "abc", Integration: "test1", Idx: 1},
257 GroupHash: []byte("126a8a51b9d1bbd07fddc65819a542c3"),
258 Resolved: false,
259 Timestamp: now,
260 },
261 ExpiresAt: now,
262 }, {
263 Entry: &pb.Entry{
264 GroupKey: []byte("d8e8fca2dc0f8abce7cb4cb0031ba249"),
265 Receiver: &pb.Receiver{GroupName: "def", Integration: "test2", Idx: 29},
266 GroupHash: []byte("122c2331b9d1bbd07fddc65819a542c3"),
267 Resolved: true,
268 Timestamp: now,
269 },
270 ExpiresAt: now,
271 }, {
272 Entry: &pb.Entry{
273 GroupKey: []byte("aaaaaca2dc0f896fd7cb4cb0031ba249"),
274 Receiver: &pb.Receiver{GroupName: "ghi", Integration: "test3", Idx: 0},
275 GroupHash: []byte("126a8a51b9d1bbd07fddc6e3e3e542c3"),
276 Resolved: false,
277 Timestamp: now,
278 },
279 ExpiresAt: now,
280 },
281 },
282 },
283 }
284
285 for _, c := range cases {
286
287 in := state{}
288 for _, e := range c.entries {
289 in[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = e
290 }
291 msg, err := in.MarshalBinary()
292 require.NoError(t, err)
293
294 out, err := decodeState(bytes.NewReader(msg))
295 require.NoError(t, err, "decoding message failed")
296
297 require.Equal(t, in, out, "decoded data doesn't match encoded data")
298 }
299 }
300
301 func TestQuery(t *testing.T) {
302 nl, err := New(WithRetention(time.Second))
303 if err != nil {
304 require.NoError(t, err, "constructing nflog failed")
305 }
306
307 recv := new(pb.Receiver)
308
309
310 _, err = nl.Query(QGroupKey("key"))
311 require.EqualError(t, err, "no query parameters specified")
312
313
314 _, err = nl.Query(QReceiver(recv))
315 require.EqualError(t, err, "no query parameters specified")
316
317
318 _, err = nl.Query(QGroupKey("nonexistentkey"), QReceiver(recv))
319 require.EqualError(t, err, "not found")
320
321
322 firingAlerts := []uint64{1, 2, 3}
323 resolvedAlerts := []uint64{4, 5}
324
325 err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0)
326 require.NoError(t, err, "logging notification failed")
327
328 entries, err := nl.Query(QGroupKey("key"), QReceiver(recv))
329 require.NoError(t, err, "querying nflog failed")
330 entry := entries[0]
331 require.EqualValues(t, firingAlerts, entry.FiringAlerts)
332 require.EqualValues(t, resolvedAlerts, entry.ResolvedAlerts)
333 }
334
335 func TestStateDecodingError(t *testing.T) {
336
337 s := state{"": &pb.MeshEntry{}}
338
339 msg, err := s.MarshalBinary()
340 require.NoError(t, err)
341
342 _, err = decodeState(bytes.NewReader(msg))
343 require.Equal(t, ErrInvalidState, err)
344 }
345
View as plain text