...

Source file src/github.com/prometheus/alertmanager/nflog/nflog_test.go

Documentation: github.com/prometheus/alertmanager/nflog

     1  // Copyright 2016 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package nflog
    15  
    16  import (
    17  	"bytes"
    18  	"io"
    19  	"os"
    20  	"path/filepath"
    21  	"sync"
    22  	"testing"
    23  	"time"
    24  
    25  	pb "github.com/prometheus/alertmanager/nflog/nflogpb"
    26  
    27  	"github.com/prometheus/client_golang/prometheus"
    28  	"github.com/stretchr/testify/require"
    29  )
    30  
    31  func TestLogGC(t *testing.T) {
    32  	now := utcNow()
    33  	// We only care about key names and expiration timestamps.
    34  	newEntry := func(ts time.Time) *pb.MeshEntry {
    35  		return &pb.MeshEntry{
    36  			ExpiresAt: ts,
    37  		}
    38  	}
    39  
    40  	l := &Log{
    41  		st: state{
    42  			"a1": newEntry(now),
    43  			"a2": newEntry(now.Add(time.Second)),
    44  			"a3": newEntry(now.Add(-time.Second)),
    45  		},
    46  		now:     func() time.Time { return now },
    47  		metrics: newMetrics(nil),
    48  	}
    49  	n, err := l.GC()
    50  	require.NoError(t, err, "unexpected error in garbage collection")
    51  	require.Equal(t, 2, n, "unexpected number of removed entries")
    52  
    53  	expected := state{
    54  		"a2": newEntry(now.Add(time.Second)),
    55  	}
    56  	require.Equal(t, l.st, expected, "unexpected state after garbage collection")
    57  }
    58  
    59  func TestLogSnapshot(t *testing.T) {
    60  	// Check whether storing and loading the snapshot is symmetric.
    61  	now := utcNow()
    62  
    63  	cases := []struct {
    64  		entries []*pb.MeshEntry
    65  	}{
    66  		{
    67  			entries: []*pb.MeshEntry{
    68  				{
    69  					Entry: &pb.Entry{
    70  						GroupKey:  []byte("d8e8fca2dc0f896fd7cb4cb0031ba249"),
    71  						Receiver:  &pb.Receiver{GroupName: "abc", Integration: "test1", Idx: 1},
    72  						GroupHash: []byte("126a8a51b9d1bbd07fddc65819a542c3"),
    73  						Resolved:  false,
    74  						Timestamp: now,
    75  					},
    76  					ExpiresAt: now,
    77  				}, {
    78  					Entry: &pb.Entry{
    79  						GroupKey:  []byte("d8e8fca2dc0f8abce7cb4cb0031ba249"),
    80  						Receiver:  &pb.Receiver{GroupName: "def", Integration: "test2", Idx: 29},
    81  						GroupHash: []byte("122c2331b9d1bbd07fddc65819a542c3"),
    82  						Resolved:  true,
    83  						Timestamp: now,
    84  					},
    85  					ExpiresAt: now,
    86  				}, {
    87  					Entry: &pb.Entry{
    88  						GroupKey:  []byte("aaaaaca2dc0f896fd7cb4cb0031ba249"),
    89  						Receiver:  &pb.Receiver{GroupName: "ghi", Integration: "test3", Idx: 0},
    90  						GroupHash: []byte("126a8a51b9d1bbd07fddc6e3e3e542c3"),
    91  						Resolved:  false,
    92  						Timestamp: now,
    93  					},
    94  					ExpiresAt: now,
    95  				},
    96  			},
    97  		},
    98  	}
    99  
   100  	for _, c := range cases {
   101  		f, err := os.CreateTemp("", "snapshot")
   102  		require.NoError(t, err, "creating temp file failed")
   103  
   104  		l1 := &Log{
   105  			st:      state{},
   106  			metrics: newMetrics(nil),
   107  		}
   108  		// Setup internal state manually.
   109  		for _, e := range c.entries {
   110  			l1.st[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = e
   111  		}
   112  		_, err = l1.Snapshot(f)
   113  		require.NoError(t, err, "creating snapshot failed")
   114  		require.NoError(t, f.Close(), "closing snapshot file failed")
   115  
   116  		f, err = os.Open(f.Name())
   117  		require.NoError(t, err, "opening snapshot file failed")
   118  
   119  		// Check again against new nlog instance.
   120  		l2 := &Log{}
   121  		err = l2.loadSnapshot(f)
   122  		require.NoError(t, err, "error loading snapshot")
   123  		require.Equal(t, l1.st, l2.st, "state after loading snapshot did not match snapshotted state")
   124  
   125  		require.NoError(t, f.Close(), "closing snapshot file failed")
   126  	}
   127  }
   128  
   129  func TestWithMaintenance_SupportsCustomCallback(t *testing.T) {
   130  	f, err := os.CreateTemp("", "snapshot")
   131  	require.NoError(t, err, "creating temp file failed")
   132  
   133  	stopc := make(chan struct{})
   134  	var mtx sync.Mutex
   135  	var mc int
   136  	l, err := New(WithMetrics(prometheus.NewPedanticRegistry()), WithSnapshot(f.Name()), WithMaintenance(100*time.Millisecond, stopc, nil, func() (int64, error) {
   137  		mtx.Lock()
   138  		mc++
   139  		mtx.Unlock()
   140  
   141  		return 0, nil
   142  	}))
   143  	require.NoError(t, err)
   144  
   145  	go l.run()
   146  	time.Sleep(200 * time.Millisecond)
   147  	close(stopc)
   148  
   149  	require.Eventually(t, func() bool {
   150  		mtx.Lock()
   151  		defer mtx.Unlock()
   152  		return mc >= 2
   153  	}, 500*time.Millisecond, 100*time.Millisecond)
   154  }
   155  
   156  func TestReplaceFile(t *testing.T) {
   157  	dir, err := os.MkdirTemp("", "replace_file")
   158  	require.NoError(t, err, "creating temp dir failed")
   159  
   160  	origFilename := filepath.Join(dir, "testfile")
   161  
   162  	of, err := os.Create(origFilename)
   163  	require.NoError(t, err, "creating file failed")
   164  
   165  	nf, err := openReplace(origFilename)
   166  	require.NoError(t, err, "opening replacement file failed")
   167  
   168  	_, err = nf.Write([]byte("test"))
   169  	require.NoError(t, err, "writing replace file failed")
   170  
   171  	require.NotEqual(t, nf.Name(), of.Name(), "replacement file must have different name while editing")
   172  	require.NoError(t, nf.Close(), "closing replacement file failed")
   173  	require.NoError(t, of.Close(), "closing original file failed")
   174  
   175  	ofr, err := os.Open(origFilename)
   176  	require.NoError(t, err, "opening original file failed")
   177  	defer ofr.Close()
   178  
   179  	res, err := io.ReadAll(ofr)
   180  	require.NoError(t, err, "reading original file failed")
   181  	require.Equal(t, "test", string(res), "unexpected file contents")
   182  }
   183  
   184  func TestStateMerge(t *testing.T) {
   185  	now := utcNow()
   186  
   187  	// We only care about key names and timestamps for the
   188  	// merging logic.
   189  	newEntry := func(name string, ts, exp time.Time) *pb.MeshEntry {
   190  		return &pb.MeshEntry{
   191  			Entry: &pb.Entry{
   192  				Timestamp: ts,
   193  				GroupKey:  []byte("key"),
   194  				Receiver: &pb.Receiver{
   195  					GroupName:   name,
   196  					Idx:         1,
   197  					Integration: "integr",
   198  				},
   199  			},
   200  			ExpiresAt: exp,
   201  		}
   202  	}
   203  
   204  	exp := now.Add(time.Minute)
   205  
   206  	cases := []struct {
   207  		a, b  state
   208  		final state
   209  	}{
   210  		{
   211  			a: state{
   212  				"key:a1/integr/1": newEntry("a1", now, exp),
   213  				"key:a2/integr/1": newEntry("a2", now, exp),
   214  				"key:a3/integr/1": newEntry("a3", now, exp),
   215  			},
   216  			b: state{
   217  				"key:b1/integr/1": newEntry("b1", now, exp),                                          // new key, should be added
   218  				"key:b2/integr/1": newEntry("b2", now.Add(-time.Minute), now.Add(-time.Millisecond)), // new key, expired, should not be added
   219  				"key:a2/integr/1": newEntry("a2", now.Add(-time.Minute), exp),                        // older timestamp, should be dropped
   220  				"key:a3/integr/1": newEntry("a3", now.Add(time.Minute), exp),                         // newer timestamp, should overwrite
   221  			},
   222  			final: state{
   223  				"key:a1/integr/1": newEntry("a1", now, exp),
   224  				"key:a2/integr/1": newEntry("a2", now, exp),
   225  				"key:a3/integr/1": newEntry("a3", now.Add(time.Minute), exp),
   226  				"key:b1/integr/1": newEntry("b1", now, exp),
   227  			},
   228  		},
   229  	}
   230  
   231  	for _, c := range cases {
   232  		ca, cb := c.a.clone(), c.b.clone()
   233  
   234  		res := c.a.clone()
   235  		for _, e := range cb {
   236  			res.merge(e, now)
   237  		}
   238  		require.Equal(t, c.final, res, "Merge result should match expectation")
   239  		require.Equal(t, c.b, cb, "Merged state should remain unmodified")
   240  		require.NotEqual(t, c.final, ca, "Merge should not change original state")
   241  	}
   242  }
   243  
   244  func TestStateDataCoding(t *testing.T) {
   245  	// Check whether encoding and decoding the data is symmetric.
   246  	now := utcNow()
   247  
   248  	cases := []struct {
   249  		entries []*pb.MeshEntry
   250  	}{
   251  		{
   252  			entries: []*pb.MeshEntry{
   253  				{
   254  					Entry: &pb.Entry{
   255  						GroupKey:  []byte("d8e8fca2dc0f896fd7cb4cb0031ba249"),
   256  						Receiver:  &pb.Receiver{GroupName: "abc", Integration: "test1", Idx: 1},
   257  						GroupHash: []byte("126a8a51b9d1bbd07fddc65819a542c3"),
   258  						Resolved:  false,
   259  						Timestamp: now,
   260  					},
   261  					ExpiresAt: now,
   262  				}, {
   263  					Entry: &pb.Entry{
   264  						GroupKey:  []byte("d8e8fca2dc0f8abce7cb4cb0031ba249"),
   265  						Receiver:  &pb.Receiver{GroupName: "def", Integration: "test2", Idx: 29},
   266  						GroupHash: []byte("122c2331b9d1bbd07fddc65819a542c3"),
   267  						Resolved:  true,
   268  						Timestamp: now,
   269  					},
   270  					ExpiresAt: now,
   271  				}, {
   272  					Entry: &pb.Entry{
   273  						GroupKey:  []byte("aaaaaca2dc0f896fd7cb4cb0031ba249"),
   274  						Receiver:  &pb.Receiver{GroupName: "ghi", Integration: "test3", Idx: 0},
   275  						GroupHash: []byte("126a8a51b9d1bbd07fddc6e3e3e542c3"),
   276  						Resolved:  false,
   277  						Timestamp: now,
   278  					},
   279  					ExpiresAt: now,
   280  				},
   281  			},
   282  		},
   283  	}
   284  
   285  	for _, c := range cases {
   286  		// Create gossip data from input.
   287  		in := state{}
   288  		for _, e := range c.entries {
   289  			in[stateKey(string(e.Entry.GroupKey), e.Entry.Receiver)] = e
   290  		}
   291  		msg, err := in.MarshalBinary()
   292  		require.NoError(t, err)
   293  
   294  		out, err := decodeState(bytes.NewReader(msg))
   295  		require.NoError(t, err, "decoding message failed")
   296  
   297  		require.Equal(t, in, out, "decoded data doesn't match encoded data")
   298  	}
   299  }
   300  
   301  func TestQuery(t *testing.T) {
   302  	nl, err := New(WithRetention(time.Second))
   303  	if err != nil {
   304  		require.NoError(t, err, "constructing nflog failed")
   305  	}
   306  
   307  	recv := new(pb.Receiver)
   308  
   309  	// no key param
   310  	_, err = nl.Query(QGroupKey("key"))
   311  	require.EqualError(t, err, "no query parameters specified")
   312  
   313  	// no recv param
   314  	_, err = nl.Query(QReceiver(recv))
   315  	require.EqualError(t, err, "no query parameters specified")
   316  
   317  	// no entry
   318  	_, err = nl.Query(QGroupKey("nonexistentkey"), QReceiver(recv))
   319  	require.EqualError(t, err, "not found")
   320  
   321  	// existing entry
   322  	firingAlerts := []uint64{1, 2, 3}
   323  	resolvedAlerts := []uint64{4, 5}
   324  
   325  	err = nl.Log(recv, "key", firingAlerts, resolvedAlerts, 0)
   326  	require.NoError(t, err, "logging notification failed")
   327  
   328  	entries, err := nl.Query(QGroupKey("key"), QReceiver(recv))
   329  	require.NoError(t, err, "querying nflog failed")
   330  	entry := entries[0]
   331  	require.EqualValues(t, firingAlerts, entry.FiringAlerts)
   332  	require.EqualValues(t, resolvedAlerts, entry.ResolvedAlerts)
   333  }
   334  
   335  func TestStateDecodingError(t *testing.T) {
   336  	// Check whether decoding copes with erroneous data.
   337  	s := state{"": &pb.MeshEntry{}}
   338  
   339  	msg, err := s.MarshalBinary()
   340  	require.NoError(t, err)
   341  
   342  	_, err = decodeState(bytes.NewReader(msg))
   343  	require.Equal(t, ErrInvalidState, err)
   344  }
   345  

View as plain text