...

Source file src/github.com/prometheus/alertmanager/cluster/cluster_test.go

Documentation: github.com/prometheus/alertmanager/cluster

     1  // Copyright 2018 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package cluster
    15  
    16  import (
    17  	"context"
    18  	"testing"
    19  	"time"
    20  
    21  	"github.com/go-kit/log"
    22  	"github.com/hashicorp/go-sockaddr"
    23  	"github.com/stretchr/testify/require"
    24  
    25  	"github.com/prometheus/client_golang/prometheus"
    26  )
    27  
    28  func TestClusterJoinAndReconnect(t *testing.T) {
    29  	ip, _ := sockaddr.GetPrivateIP()
    30  	if ip == "" {
    31  		t.Skipf("skipping tests because no private IP address can be found")
    32  		return
    33  	}
    34  	t.Run("TestJoinLeave", testJoinLeave)
    35  	t.Run("TestReconnect", testReconnect)
    36  	t.Run("TestRemoveFailedPeers", testRemoveFailedPeers)
    37  	t.Run("TestInitiallyFailingPeers", testInitiallyFailingPeers)
    38  	t.Run("TestTLSConnection", testTLSConnection)
    39  }
    40  
    41  func testJoinLeave(t *testing.T) {
    42  	logger := log.NewNopLogger()
    43  	p, err := Create(
    44  		logger,
    45  		prometheus.NewRegistry(),
    46  		"127.0.0.1:0",
    47  		"",
    48  		[]string{},
    49  		true,
    50  		DefaultPushPullInterval,
    51  		DefaultGossipInterval,
    52  		DefaultTCPTimeout,
    53  		DefaultProbeTimeout,
    54  		DefaultProbeInterval,
    55  		nil,
    56  		false,
    57  	)
    58  	require.NoError(t, err)
    59  	require.NotNil(t, p)
    60  	err = p.Join(
    61  		DefaultReconnectInterval,
    62  		DefaultReconnectTimeout,
    63  	)
    64  	require.NoError(t, err)
    65  	require.False(t, p.Ready())
    66  	{
    67  		ctx, cancel := context.WithCancel(context.Background())
    68  		cancel()
    69  		require.Equal(t, context.Canceled, p.WaitReady(ctx))
    70  	}
    71  	require.Equal(t, p.Status(), "settling")
    72  	go p.Settle(context.Background(), 0*time.Second)
    73  	require.NoError(t, p.WaitReady(context.Background()))
    74  	require.Equal(t, p.Status(), "ready")
    75  
    76  	// Create the peer who joins the first.
    77  	p2, err := Create(
    78  		logger,
    79  		prometheus.NewRegistry(),
    80  		"127.0.0.1:0",
    81  		"",
    82  		[]string{p.Self().Address()},
    83  		true,
    84  		DefaultPushPullInterval,
    85  		DefaultGossipInterval,
    86  		DefaultTCPTimeout,
    87  		DefaultProbeTimeout,
    88  		DefaultProbeInterval,
    89  		nil,
    90  		false,
    91  	)
    92  	require.NoError(t, err)
    93  	require.NotNil(t, p2)
    94  	err = p2.Join(
    95  		DefaultReconnectInterval,
    96  		DefaultReconnectTimeout,
    97  	)
    98  	require.NoError(t, err)
    99  	go p2.Settle(context.Background(), 0*time.Second)
   100  	require.NoError(t, p2.WaitReady(context.Background()))
   101  
   102  	require.Equal(t, 2, p.ClusterSize())
   103  	p2.Leave(0 * time.Second)
   104  	require.Equal(t, 1, p.ClusterSize())
   105  	require.Equal(t, 1, len(p.failedPeers))
   106  	require.Equal(t, p2.Self().Address(), p.peers[p2.Self().Address()].Node.Address())
   107  	require.Equal(t, p2.Name(), p.failedPeers[0].Name)
   108  }
   109  
   110  func testReconnect(t *testing.T) {
   111  	logger := log.NewNopLogger()
   112  	p, err := Create(
   113  		logger,
   114  		prometheus.NewRegistry(),
   115  		"127.0.0.1:0",
   116  		"",
   117  		[]string{},
   118  		true,
   119  		DefaultPushPullInterval,
   120  		DefaultGossipInterval,
   121  		DefaultTCPTimeout,
   122  		DefaultProbeTimeout,
   123  		DefaultProbeInterval,
   124  		nil,
   125  		false,
   126  	)
   127  	require.NoError(t, err)
   128  	require.NotNil(t, p)
   129  	err = p.Join(
   130  		DefaultReconnectInterval,
   131  		DefaultReconnectTimeout,
   132  	)
   133  	require.NoError(t, err)
   134  	go p.Settle(context.Background(), 0*time.Second)
   135  	require.NoError(t, p.WaitReady(context.Background()))
   136  
   137  	p2, err := Create(
   138  		logger,
   139  		prometheus.NewRegistry(),
   140  		"127.0.0.1:0",
   141  		"",
   142  		[]string{},
   143  		true,
   144  		DefaultPushPullInterval,
   145  		DefaultGossipInterval,
   146  		DefaultTCPTimeout,
   147  		DefaultProbeTimeout,
   148  		DefaultProbeInterval,
   149  		nil,
   150  		false,
   151  	)
   152  	require.NoError(t, err)
   153  	require.NotNil(t, p2)
   154  	err = p2.Join(
   155  		DefaultReconnectInterval,
   156  		DefaultReconnectTimeout,
   157  	)
   158  	require.NoError(t, err)
   159  	go p2.Settle(context.Background(), 0*time.Second)
   160  	require.NoError(t, p2.WaitReady(context.Background()))
   161  
   162  	p.peerJoin(p2.Self())
   163  	p.peerLeave(p2.Self())
   164  
   165  	require.Equal(t, 1, p.ClusterSize())
   166  	require.Equal(t, 1, len(p.failedPeers))
   167  
   168  	p.reconnect()
   169  
   170  	require.Equal(t, 2, p.ClusterSize())
   171  	require.Equal(t, 0, len(p.failedPeers))
   172  	require.Equal(t, StatusAlive, p.peers[p2.Self().Address()].status)
   173  }
   174  
   175  func testRemoveFailedPeers(t *testing.T) {
   176  	logger := log.NewNopLogger()
   177  	p, err := Create(
   178  		logger,
   179  		prometheus.NewRegistry(),
   180  		"127.0.0.1:0",
   181  		"",
   182  		[]string{},
   183  		true,
   184  		DefaultPushPullInterval,
   185  		DefaultGossipInterval,
   186  		DefaultTCPTimeout,
   187  		DefaultProbeTimeout,
   188  		DefaultProbeInterval,
   189  		nil,
   190  		false,
   191  	)
   192  	require.NoError(t, err)
   193  	require.NotNil(t, p)
   194  	err = p.Join(
   195  		DefaultReconnectInterval,
   196  		DefaultReconnectTimeout,
   197  	)
   198  	require.NoError(t, err)
   199  	n := p.Self()
   200  
   201  	now := time.Now()
   202  	p1 := peer{
   203  		status:    StatusFailed,
   204  		leaveTime: now,
   205  		Node:      n,
   206  	}
   207  	p2 := peer{
   208  		status:    StatusFailed,
   209  		leaveTime: now.Add(-time.Hour),
   210  		Node:      n,
   211  	}
   212  	p3 := peer{
   213  		status:    StatusFailed,
   214  		leaveTime: now.Add(30 * -time.Minute),
   215  		Node:      n,
   216  	}
   217  	p.failedPeers = []peer{p1, p2, p3}
   218  
   219  	p.removeFailedPeers(30 * time.Minute)
   220  	require.Equal(t, 1, len(p.failedPeers))
   221  	require.Equal(t, p1, p.failedPeers[0])
   222  }
   223  
   224  func testInitiallyFailingPeers(t *testing.T) {
   225  	logger := log.NewNopLogger()
   226  	myAddr := "1.2.3.4:5000"
   227  	peerAddrs := []string{myAddr, "2.3.4.5:5000", "3.4.5.6:5000", "foo.example.com:5000"}
   228  	p, err := Create(
   229  		logger,
   230  		prometheus.NewRegistry(),
   231  		"127.0.0.1:0",
   232  		"",
   233  		[]string{},
   234  		true,
   235  		DefaultPushPullInterval,
   236  		DefaultGossipInterval,
   237  		DefaultTCPTimeout,
   238  		DefaultProbeTimeout,
   239  		DefaultProbeInterval,
   240  		nil,
   241  		false,
   242  	)
   243  	require.NoError(t, err)
   244  	require.NotNil(t, p)
   245  	err = p.Join(
   246  		DefaultReconnectInterval,
   247  		DefaultReconnectTimeout,
   248  	)
   249  	require.NoError(t, err)
   250  
   251  	p.setInitialFailed(peerAddrs, myAddr)
   252  
   253  	// We shouldn't have added "our" bind addr and the FQDN address to the
   254  	// failed peers list.
   255  	require.Equal(t, len(peerAddrs)-2, len(p.failedPeers))
   256  	for _, addr := range peerAddrs {
   257  		if addr == myAddr || addr == "foo.example.com:5000" {
   258  			continue
   259  		}
   260  
   261  		pr, ok := p.peers[addr]
   262  		require.True(t, ok)
   263  		require.Equal(t, StatusFailed.String(), pr.status.String())
   264  		require.Equal(t, addr, pr.Address())
   265  		expectedLen := len(p.failedPeers) - 1
   266  		p.peerJoin(pr.Node)
   267  		require.Equal(t, expectedLen, len(p.failedPeers))
   268  	}
   269  }
   270  
   271  func testTLSConnection(t *testing.T) {
   272  	logger := log.NewNopLogger()
   273  	tlsTransportConfig1, err := GetTLSTransportConfig("./testdata/tls_config_node1.yml")
   274  	require.NoError(t, err)
   275  	p1, err := Create(
   276  		logger,
   277  		prometheus.NewRegistry(),
   278  		"127.0.0.1:0",
   279  		"",
   280  		[]string{},
   281  		true,
   282  		DefaultPushPullInterval,
   283  		DefaultGossipInterval,
   284  		DefaultTCPTimeout,
   285  		DefaultProbeTimeout,
   286  		DefaultProbeInterval,
   287  		tlsTransportConfig1,
   288  		false,
   289  	)
   290  	require.NoError(t, err)
   291  	require.NotNil(t, p1)
   292  	err = p1.Join(
   293  		DefaultReconnectInterval,
   294  		DefaultReconnectTimeout,
   295  	)
   296  	require.NoError(t, err)
   297  	require.False(t, p1.Ready())
   298  	require.Equal(t, p1.Status(), "settling")
   299  	go p1.Settle(context.Background(), 0*time.Second)
   300  	p1.WaitReady(context.Background())
   301  	require.Equal(t, p1.Status(), "ready")
   302  
   303  	// Create the peer who joins the first.
   304  	tlsTransportConfig2, err := GetTLSTransportConfig("./testdata/tls_config_node2.yml")
   305  	require.NoError(t, err)
   306  	p2, err := Create(
   307  		logger,
   308  		prometheus.NewRegistry(),
   309  		"127.0.0.1:0",
   310  		"",
   311  		[]string{p1.Self().Address()},
   312  		true,
   313  		DefaultPushPullInterval,
   314  		DefaultGossipInterval,
   315  		DefaultTCPTimeout,
   316  		DefaultProbeTimeout,
   317  		DefaultProbeInterval,
   318  		tlsTransportConfig2,
   319  		false,
   320  	)
   321  	require.NoError(t, err)
   322  	require.NotNil(t, p2)
   323  	err = p2.Join(
   324  		DefaultReconnectInterval,
   325  		DefaultReconnectTimeout,
   326  	)
   327  	require.NoError(t, err)
   328  	go p2.Settle(context.Background(), 0*time.Second)
   329  
   330  	require.Equal(t, 2, p1.ClusterSize())
   331  	p2.Leave(0 * time.Second)
   332  	require.Equal(t, 1, p1.ClusterSize())
   333  	require.Equal(t, 1, len(p1.failedPeers))
   334  	require.Equal(t, p2.Self().Address(), p1.peers[p2.Self().Address()].Node.Address())
   335  	require.Equal(t, p2.Name(), p1.failedPeers[0].Name)
   336  }
   337  

View as plain text