...

Source file src/github.com/prometheus/alertmanager/cluster/cluster.go

Documentation: github.com/prometheus/alertmanager/cluster

     1  // Copyright 2018 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package cluster
    15  
    16  import (
    17  	"context"
    18  	"fmt"
    19  	"math/rand"
    20  	"net"
    21  	"sort"
    22  	"strconv"
    23  	"strings"
    24  	"sync"
    25  	"time"
    26  
    27  	"github.com/go-kit/log"
    28  	"github.com/go-kit/log/level"
    29  	"github.com/hashicorp/memberlist"
    30  	"github.com/oklog/ulid"
    31  	"github.com/pkg/errors"
    32  	"github.com/prometheus/client_golang/prometheus"
    33  )
    34  
    35  // ClusterPeer represents a single Peer in a gossip cluster.
    36  type ClusterPeer interface {
    37  	// Name returns the unique identifier of this peer in the cluster.
    38  	Name() string
    39  	// Status returns a status string representing the peer state.
    40  	Status() string
    41  	// Peers returns the peer nodes in the cluster.
    42  	Peers() []ClusterMember
    43  }
    44  
    45  // ClusterMember interface that represents node peers in a cluster
    46  type ClusterMember interface {
    47  	// Name returns the name of the node
    48  	Name() string
    49  	// Address returns the IP address of the node
    50  	Address() string
    51  }
    52  
    53  // ClusterChannel supports state broadcasting across peers.
    54  type ClusterChannel interface {
    55  	Broadcast([]byte)
    56  }
    57  
    58  // Peer is a single peer in a gossip cluster.
    59  type Peer struct {
    60  	mlist    *memberlist.Memberlist
    61  	delegate *delegate
    62  
    63  	resolvedPeers []string
    64  
    65  	mtx    sync.RWMutex
    66  	states map[string]State
    67  	stopc  chan struct{}
    68  	readyc chan struct{}
    69  
    70  	peerLock    sync.RWMutex
    71  	peers       map[string]peer
    72  	failedPeers []peer
    73  
    74  	knownPeers    []string
    75  	advertiseAddr string
    76  
    77  	failedReconnectionsCounter prometheus.Counter
    78  	reconnectionsCounter       prometheus.Counter
    79  	failedRefreshCounter       prometheus.Counter
    80  	refreshCounter             prometheus.Counter
    81  	peerLeaveCounter           prometheus.Counter
    82  	peerUpdateCounter          prometheus.Counter
    83  	peerJoinCounter            prometheus.Counter
    84  
    85  	logger log.Logger
    86  }
    87  
    88  // peer is an internal type used for bookkeeping. It holds the state of peers
    89  // in the cluster.
    90  type peer struct {
    91  	status    PeerStatus
    92  	leaveTime time.Time
    93  
    94  	*memberlist.Node
    95  }
    96  
    97  // PeerStatus is the state that a peer is in.
    98  type PeerStatus int
    99  
   100  const (
   101  	StatusNone PeerStatus = iota
   102  	StatusAlive
   103  	StatusFailed
   104  )
   105  
   106  func (s PeerStatus) String() string {
   107  	switch s {
   108  	case StatusNone:
   109  		return "none"
   110  	case StatusAlive:
   111  		return "alive"
   112  	case StatusFailed:
   113  		return "failed"
   114  	default:
   115  		panic(fmt.Sprintf("unknown PeerStatus: %d", s))
   116  	}
   117  }
   118  
   119  const (
   120  	DefaultPushPullInterval  = 60 * time.Second
   121  	DefaultGossipInterval    = 200 * time.Millisecond
   122  	DefaultTCPTimeout        = 10 * time.Second
   123  	DefaultProbeTimeout      = 500 * time.Millisecond
   124  	DefaultProbeInterval     = 1 * time.Second
   125  	DefaultReconnectInterval = 10 * time.Second
   126  	DefaultReconnectTimeout  = 6 * time.Hour
   127  	DefaultRefreshInterval   = 15 * time.Second
   128  	MaxGossipPacketSize      = 1400
   129  )
   130  
   131  func Create(
   132  	l log.Logger,
   133  	reg prometheus.Registerer,
   134  	bindAddr string,
   135  	advertiseAddr string,
   136  	knownPeers []string,
   137  	waitIfEmpty bool,
   138  	pushPullInterval time.Duration,
   139  	gossipInterval time.Duration,
   140  	tcpTimeout time.Duration,
   141  	probeTimeout time.Duration,
   142  	probeInterval time.Duration,
   143  	tlsTransportConfig *TLSTransportConfig,
   144  	allowInsecureAdvertise bool,
   145  ) (*Peer, error) {
   146  	bindHost, bindPortStr, err := net.SplitHostPort(bindAddr)
   147  	if err != nil {
   148  		return nil, errors.Wrap(err, "invalid listen address")
   149  	}
   150  	bindPort, err := strconv.Atoi(bindPortStr)
   151  	if err != nil {
   152  		return nil, errors.Wrapf(err, "address %s: invalid port", bindAddr)
   153  	}
   154  
   155  	var advertiseHost string
   156  	var advertisePort int
   157  	if advertiseAddr != "" {
   158  		var advertisePortStr string
   159  		advertiseHost, advertisePortStr, err = net.SplitHostPort(advertiseAddr)
   160  		if err != nil {
   161  			return nil, errors.Wrap(err, "invalid advertise address")
   162  		}
   163  		advertisePort, err = strconv.Atoi(advertisePortStr)
   164  		if err != nil {
   165  			return nil, errors.Wrapf(err, "address %s: invalid port", advertiseAddr)
   166  		}
   167  	}
   168  
   169  	resolvedPeers, err := resolvePeers(context.Background(), knownPeers, advertiseAddr, &net.Resolver{}, waitIfEmpty)
   170  	if err != nil {
   171  		return nil, errors.Wrap(err, "resolve peers")
   172  	}
   173  	level.Debug(l).Log("msg", "resolved peers to following addresses", "peers", strings.Join(resolvedPeers, ","))
   174  
   175  	// Initial validation of user-specified advertise address.
   176  	addr, err := calculateAdvertiseAddress(bindHost, advertiseHost, allowInsecureAdvertise)
   177  	if err != nil {
   178  		level.Warn(l).Log("err", "couldn't deduce an advertise address: "+err.Error())
   179  	} else if hasNonlocal(resolvedPeers) && isUnroutable(addr.String()) {
   180  		level.Warn(l).Log("err", "this node advertises itself on an unroutable address", "addr", addr.String())
   181  		level.Warn(l).Log("err", "this node will be unreachable in the cluster")
   182  		level.Warn(l).Log("err", "provide --cluster.advertise-address as a routable IP address or hostname")
   183  	} else if isAny(bindAddr) && advertiseHost == "" {
   184  		// memberlist doesn't advertise properly when the bind address is empty or unspecified.
   185  		level.Info(l).Log("msg", "setting advertise address explicitly", "addr", addr.String(), "port", bindPort)
   186  		advertiseHost = addr.String()
   187  		advertisePort = bindPort
   188  	}
   189  
   190  	// TODO(fabxc): generate human-readable but random names?
   191  	name, err := ulid.New(ulid.Now(), rand.New(rand.NewSource(time.Now().UnixNano())))
   192  	if err != nil {
   193  		return nil, err
   194  	}
   195  
   196  	p := &Peer{
   197  		states:        map[string]State{},
   198  		stopc:         make(chan struct{}),
   199  		readyc:        make(chan struct{}),
   200  		logger:        l,
   201  		peers:         map[string]peer{},
   202  		resolvedPeers: resolvedPeers,
   203  		knownPeers:    knownPeers,
   204  	}
   205  
   206  	p.register(reg, name.String())
   207  
   208  	retransmit := len(knownPeers) / 2
   209  	if retransmit < 3 {
   210  		retransmit = 3
   211  	}
   212  	p.delegate = newDelegate(l, reg, p, retransmit)
   213  
   214  	cfg := memberlist.DefaultLANConfig()
   215  	cfg.Name = name.String()
   216  	cfg.BindAddr = bindHost
   217  	cfg.BindPort = bindPort
   218  	cfg.Delegate = p.delegate
   219  	cfg.Ping = p.delegate
   220  	cfg.Alive = p.delegate
   221  	cfg.Events = p.delegate
   222  	cfg.GossipInterval = gossipInterval
   223  	cfg.PushPullInterval = pushPullInterval
   224  	cfg.TCPTimeout = tcpTimeout
   225  	cfg.ProbeTimeout = probeTimeout
   226  	cfg.ProbeInterval = probeInterval
   227  	cfg.LogOutput = &logWriter{l: l}
   228  	cfg.GossipNodes = retransmit
   229  	cfg.UDPBufferSize = MaxGossipPacketSize
   230  
   231  	if advertiseHost != "" {
   232  		cfg.AdvertiseAddr = advertiseHost
   233  		cfg.AdvertisePort = advertisePort
   234  		p.setInitialFailed(resolvedPeers, fmt.Sprintf("%s:%d", advertiseHost, advertisePort))
   235  	} else {
   236  		p.setInitialFailed(resolvedPeers, bindAddr)
   237  	}
   238  
   239  	if tlsTransportConfig != nil {
   240  		level.Info(l).Log("msg", "using TLS for gossip")
   241  		cfg.Transport, err = NewTLSTransport(context.Background(), l, reg, cfg.BindAddr, cfg.BindPort, tlsTransportConfig)
   242  		if err != nil {
   243  			return nil, errors.Wrap(err, "tls transport")
   244  		}
   245  	}
   246  
   247  	ml, err := memberlist.Create(cfg)
   248  	if err != nil {
   249  		return nil, errors.Wrap(err, "create memberlist")
   250  	}
   251  	p.mlist = ml
   252  	return p, nil
   253  }
   254  
   255  func (p *Peer) Join(
   256  	reconnectInterval time.Duration,
   257  	reconnectTimeout time.Duration,
   258  ) error {
   259  	n, err := p.mlist.Join(p.resolvedPeers)
   260  	if err != nil {
   261  		level.Warn(p.logger).Log("msg", "failed to join cluster", "err", err)
   262  		if reconnectInterval != 0 {
   263  			level.Info(p.logger).Log("msg", fmt.Sprintf("will retry joining cluster every %v", reconnectInterval.String()))
   264  		}
   265  	} else {
   266  		level.Debug(p.logger).Log("msg", "joined cluster", "peers", n)
   267  	}
   268  
   269  	if reconnectInterval != 0 {
   270  		go p.runPeriodicTask(
   271  			reconnectInterval,
   272  			p.reconnect,
   273  		)
   274  	}
   275  	if reconnectTimeout != 0 {
   276  		go p.runPeriodicTask(
   277  			5*time.Minute,
   278  			func() { p.removeFailedPeers(reconnectTimeout) },
   279  		)
   280  	}
   281  	go p.runPeriodicTask(
   282  		DefaultRefreshInterval,
   283  		p.refresh,
   284  	)
   285  
   286  	return err
   287  }
   288  
   289  // All peers are initially added to the failed list. They will be removed from
   290  // this list in peerJoin when making their initial connection.
   291  func (p *Peer) setInitialFailed(peers []string, myAddr string) {
   292  	if len(peers) == 0 {
   293  		return
   294  	}
   295  
   296  	p.peerLock.Lock()
   297  	defer p.peerLock.Unlock()
   298  
   299  	now := time.Now()
   300  	for _, peerAddr := range peers {
   301  		if peerAddr == myAddr {
   302  			// Don't add ourselves to the initially failing list,
   303  			// we don't connect to ourselves.
   304  			continue
   305  		}
   306  		host, port, err := net.SplitHostPort(peerAddr)
   307  		if err != nil {
   308  			continue
   309  		}
   310  		ip := net.ParseIP(host)
   311  		if ip == nil {
   312  			// Don't add textual addresses since memberlist only advertises
   313  			// dotted decimal or IPv6 addresses.
   314  			continue
   315  		}
   316  		portUint, err := strconv.ParseUint(port, 10, 16)
   317  		if err != nil {
   318  			continue
   319  		}
   320  
   321  		pr := peer{
   322  			status:    StatusFailed,
   323  			leaveTime: now,
   324  			Node: &memberlist.Node{
   325  				Addr: ip,
   326  				Port: uint16(portUint),
   327  			},
   328  		}
   329  		p.failedPeers = append(p.failedPeers, pr)
   330  		p.peers[peerAddr] = pr
   331  	}
   332  }
   333  
   334  type logWriter struct {
   335  	l log.Logger
   336  }
   337  
   338  func (l *logWriter) Write(b []byte) (int, error) {
   339  	return len(b), level.Debug(l.l).Log("memberlist", string(b))
   340  }
   341  
   342  func (p *Peer) register(reg prometheus.Registerer, name string) {
   343  	peerInfo := prometheus.NewGauge(
   344  		prometheus.GaugeOpts{
   345  			Name:        "alertmanager_cluster_peer_info",
   346  			Help:        "A metric with a constant '1' value labeled by peer name.",
   347  			ConstLabels: prometheus.Labels{"peer": name},
   348  		},
   349  	)
   350  	peerInfo.Set(1)
   351  	clusterFailedPeers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
   352  		Name: "alertmanager_cluster_failed_peers",
   353  		Help: "Number indicating the current number of failed peers in the cluster.",
   354  	}, func() float64 {
   355  		p.peerLock.RLock()
   356  		defer p.peerLock.RUnlock()
   357  
   358  		return float64(len(p.failedPeers))
   359  	})
   360  	p.failedReconnectionsCounter = prometheus.NewCounter(prometheus.CounterOpts{
   361  		Name: "alertmanager_cluster_reconnections_failed_total",
   362  		Help: "A counter of the number of failed cluster peer reconnection attempts.",
   363  	})
   364  
   365  	p.reconnectionsCounter = prometheus.NewCounter(prometheus.CounterOpts{
   366  		Name: "alertmanager_cluster_reconnections_total",
   367  		Help: "A counter of the number of cluster peer reconnections.",
   368  	})
   369  
   370  	p.failedRefreshCounter = prometheus.NewCounter(prometheus.CounterOpts{
   371  		Name: "alertmanager_cluster_refresh_join_failed_total",
   372  		Help: "A counter of the number of failed cluster peer joined attempts via refresh.",
   373  	})
   374  	p.refreshCounter = prometheus.NewCounter(prometheus.CounterOpts{
   375  		Name: "alertmanager_cluster_refresh_join_total",
   376  		Help: "A counter of the number of cluster peer joined via refresh.",
   377  	})
   378  
   379  	p.peerLeaveCounter = prometheus.NewCounter(prometheus.CounterOpts{
   380  		Name: "alertmanager_cluster_peers_left_total",
   381  		Help: "A counter of the number of peers that have left.",
   382  	})
   383  	p.peerUpdateCounter = prometheus.NewCounter(prometheus.CounterOpts{
   384  		Name: "alertmanager_cluster_peers_update_total",
   385  		Help: "A counter of the number of peers that have updated metadata.",
   386  	})
   387  	p.peerJoinCounter = prometheus.NewCounter(prometheus.CounterOpts{
   388  		Name: "alertmanager_cluster_peers_joined_total",
   389  		Help: "A counter of the number of peers that have joined.",
   390  	})
   391  
   392  	reg.MustRegister(peerInfo, clusterFailedPeers, p.failedReconnectionsCounter, p.reconnectionsCounter,
   393  		p.peerLeaveCounter, p.peerUpdateCounter, p.peerJoinCounter, p.refreshCounter, p.failedRefreshCounter)
   394  }
   395  
   396  func (p *Peer) runPeriodicTask(d time.Duration, f func()) {
   397  	tick := time.NewTicker(d)
   398  	defer tick.Stop()
   399  
   400  	for {
   401  		select {
   402  		case <-p.stopc:
   403  			return
   404  		case <-tick.C:
   405  			f()
   406  		}
   407  	}
   408  }
   409  
   410  func (p *Peer) removeFailedPeers(timeout time.Duration) {
   411  	p.peerLock.Lock()
   412  	defer p.peerLock.Unlock()
   413  
   414  	now := time.Now()
   415  
   416  	keep := make([]peer, 0, len(p.failedPeers))
   417  	for _, pr := range p.failedPeers {
   418  		if pr.leaveTime.Add(timeout).After(now) {
   419  			keep = append(keep, pr)
   420  		} else {
   421  			level.Debug(p.logger).Log("msg", "failed peer has timed out", "peer", pr.Node, "addr", pr.Address())
   422  			delete(p.peers, pr.Name)
   423  		}
   424  	}
   425  
   426  	p.failedPeers = keep
   427  }
   428  
   429  func (p *Peer) reconnect() {
   430  	p.peerLock.RLock()
   431  	failedPeers := p.failedPeers
   432  	p.peerLock.RUnlock()
   433  
   434  	logger := log.With(p.logger, "msg", "reconnect")
   435  	for _, pr := range failedPeers {
   436  		// No need to do book keeping on failedPeers here. If a
   437  		// reconnect is successful, they will be announced in
   438  		// peerJoin().
   439  		if _, err := p.mlist.Join([]string{pr.Address()}); err != nil {
   440  			p.failedReconnectionsCounter.Inc()
   441  			level.Debug(logger).Log("result", "failure", "peer", pr.Node, "addr", pr.Address(), "err", err)
   442  		} else {
   443  			p.reconnectionsCounter.Inc()
   444  			level.Debug(logger).Log("result", "success", "peer", pr.Node, "addr", pr.Address())
   445  		}
   446  	}
   447  }
   448  
   449  func (p *Peer) refresh() {
   450  	logger := log.With(p.logger, "msg", "refresh")
   451  
   452  	resolvedPeers, err := resolvePeers(context.Background(), p.knownPeers, p.advertiseAddr, &net.Resolver{}, false)
   453  	if err != nil {
   454  		level.Debug(logger).Log("peers", p.knownPeers, "err", err)
   455  		return
   456  	}
   457  
   458  	members := p.mlist.Members()
   459  	for _, peer := range resolvedPeers {
   460  		var isPeerFound bool
   461  		for _, member := range members {
   462  			if member.Address() == peer {
   463  				isPeerFound = true
   464  				break
   465  			}
   466  		}
   467  
   468  		if !isPeerFound {
   469  			if _, err := p.mlist.Join([]string{peer}); err != nil {
   470  				p.failedRefreshCounter.Inc()
   471  				level.Warn(logger).Log("result", "failure", "addr", peer, "err", err)
   472  			} else {
   473  				p.refreshCounter.Inc()
   474  				level.Debug(logger).Log("result", "success", "addr", peer)
   475  			}
   476  		}
   477  	}
   478  }
   479  
   480  func (p *Peer) peerJoin(n *memberlist.Node) {
   481  	p.peerLock.Lock()
   482  	defer p.peerLock.Unlock()
   483  
   484  	var oldStatus PeerStatus
   485  	pr, ok := p.peers[n.Address()]
   486  	if !ok {
   487  		oldStatus = StatusNone
   488  		pr = peer{
   489  			status: StatusAlive,
   490  			Node:   n,
   491  		}
   492  	} else {
   493  		oldStatus = pr.status
   494  		pr.Node = n
   495  		pr.status = StatusAlive
   496  		pr.leaveTime = time.Time{}
   497  	}
   498  
   499  	p.peers[n.Address()] = pr
   500  	p.peerJoinCounter.Inc()
   501  
   502  	if oldStatus == StatusFailed {
   503  		level.Debug(p.logger).Log("msg", "peer rejoined", "peer", pr.Node)
   504  		p.failedPeers = removeOldPeer(p.failedPeers, pr.Address())
   505  	}
   506  }
   507  
   508  func (p *Peer) peerLeave(n *memberlist.Node) {
   509  	p.peerLock.Lock()
   510  	defer p.peerLock.Unlock()
   511  
   512  	pr, ok := p.peers[n.Address()]
   513  	if !ok {
   514  		// Why are we receiving a leave notification from a node that
   515  		// never joined?
   516  		return
   517  	}
   518  
   519  	pr.status = StatusFailed
   520  	pr.leaveTime = time.Now()
   521  	p.failedPeers = append(p.failedPeers, pr)
   522  	p.peers[n.Address()] = pr
   523  
   524  	p.peerLeaveCounter.Inc()
   525  	level.Debug(p.logger).Log("msg", "peer left", "peer", pr.Node)
   526  }
   527  
   528  func (p *Peer) peerUpdate(n *memberlist.Node) {
   529  	p.peerLock.Lock()
   530  	defer p.peerLock.Unlock()
   531  
   532  	pr, ok := p.peers[n.Address()]
   533  	if !ok {
   534  		// Why are we receiving an update from a node that never
   535  		// joined?
   536  		return
   537  	}
   538  
   539  	pr.Node = n
   540  	p.peers[n.Address()] = pr
   541  
   542  	p.peerUpdateCounter.Inc()
   543  	level.Debug(p.logger).Log("msg", "peer updated", "peer", pr.Node)
   544  }
   545  
   546  // AddState adds a new state that will be gossiped. It returns a channel to which
   547  // broadcast messages for the state can be sent.
   548  func (p *Peer) AddState(key string, s State, reg prometheus.Registerer) ClusterChannel {
   549  	p.mtx.Lock()
   550  	p.states[key] = s
   551  	p.mtx.Unlock()
   552  
   553  	send := func(b []byte) {
   554  		p.delegate.bcast.QueueBroadcast(simpleBroadcast(b))
   555  	}
   556  	peers := func() []*memberlist.Node {
   557  		nodes := p.mlist.Members()
   558  		for i, n := range nodes {
   559  			if n.String() == p.Self().Name {
   560  				nodes = append(nodes[:i], nodes[i+1:]...)
   561  				break
   562  			}
   563  		}
   564  		return nodes
   565  	}
   566  	sendOversize := func(n *memberlist.Node, b []byte) error {
   567  		return p.mlist.SendReliable(n, b)
   568  	}
   569  	return NewChannel(key, send, peers, sendOversize, p.logger, p.stopc, reg)
   570  }
   571  
   572  // Leave the cluster, waiting up to timeout.
   573  func (p *Peer) Leave(timeout time.Duration) error {
   574  	close(p.stopc)
   575  	level.Debug(p.logger).Log("msg", "leaving cluster")
   576  	return p.mlist.Leave(timeout)
   577  }
   578  
   579  // Name returns the unique ID of this peer in the cluster.
   580  func (p *Peer) Name() string {
   581  	return p.mlist.LocalNode().Name
   582  }
   583  
   584  // ClusterSize returns the current number of alive members in the cluster.
   585  func (p *Peer) ClusterSize() int {
   586  	return p.mlist.NumMembers()
   587  }
   588  
   589  // Return true when router has settled.
   590  func (p *Peer) Ready() bool {
   591  	select {
   592  	case <-p.readyc:
   593  		return true
   594  	default:
   595  	}
   596  	return false
   597  }
   598  
   599  // Wait until Settle() has finished.
   600  func (p *Peer) WaitReady(ctx context.Context) error {
   601  	select {
   602  	case <-ctx.Done():
   603  		return ctx.Err()
   604  	case <-p.readyc:
   605  		return nil
   606  	}
   607  }
   608  
   609  // Return a status string representing the peer state.
   610  func (p *Peer) Status() string {
   611  	if p.Ready() {
   612  		return "ready"
   613  	}
   614  
   615  	return "settling"
   616  }
   617  
   618  // Info returns a JSON-serializable dump of cluster state.
   619  // Useful for debug.
   620  func (p *Peer) Info() map[string]interface{} {
   621  	p.mtx.RLock()
   622  	defer p.mtx.RUnlock()
   623  
   624  	return map[string]interface{}{
   625  		"self":    p.mlist.LocalNode(),
   626  		"members": p.mlist.Members(),
   627  	}
   628  }
   629  
   630  // Self returns the node information about the peer itself.
   631  func (p *Peer) Self() *memberlist.Node {
   632  	return p.mlist.LocalNode()
   633  }
   634  
   635  // Member represents a member in the cluster.
   636  type Member struct {
   637  	node *memberlist.Node
   638  }
   639  
   640  // Name implements cluster.ClusterMember
   641  func (m Member) Name() string { return m.node.Name }
   642  
   643  // Address implements cluster.ClusterMember
   644  func (m Member) Address() string { return m.node.Address() }
   645  
   646  // Peers returns the peers in the cluster.
   647  func (p *Peer) Peers() []ClusterMember {
   648  	peers := make([]ClusterMember, 0, len(p.mlist.Members()))
   649  	for _, member := range p.mlist.Members() {
   650  		peers = append(peers, Member{
   651  			node: member,
   652  		})
   653  	}
   654  	return peers
   655  }
   656  
   657  // Position returns the position of the peer in the cluster.
   658  func (p *Peer) Position() int {
   659  	all := p.mlist.Members()
   660  	sort.Slice(all, func(i, j int) bool {
   661  		return all[i].Name < all[j].Name
   662  	})
   663  
   664  	k := 0
   665  	for _, n := range all {
   666  		if n.Name == p.Self().Name {
   667  			break
   668  		}
   669  		k++
   670  	}
   671  	return k
   672  }
   673  
   674  // Settle waits until the mesh is ready (and sets the appropriate internal state when it is).
   675  // The idea is that we don't want to start "working" before we get a chance to know most of the alerts and/or silences.
   676  // Inspired from https://github.com/apache/cassandra/blob/7a40abb6a5108688fb1b10c375bb751cbb782ea4/src/java/org/apache/cassandra/gms/Gossiper.java
   677  // This is clearly not perfect or strictly correct but should prevent the alertmanager to send notification before it is obviously not ready.
   678  // This is especially important for those that do not have persistent storage.
   679  func (p *Peer) Settle(ctx context.Context, interval time.Duration) {
   680  	const NumOkayRequired = 3
   681  	level.Info(p.logger).Log("msg", "Waiting for gossip to settle...", "interval", interval)
   682  	start := time.Now()
   683  	nPeers := 0
   684  	nOkay := 0
   685  	totalPolls := 0
   686  	for {
   687  		select {
   688  		case <-ctx.Done():
   689  			elapsed := time.Since(start)
   690  			level.Info(p.logger).Log("msg", "gossip not settled but continuing anyway", "polls", totalPolls, "elapsed", elapsed)
   691  			close(p.readyc)
   692  			return
   693  		case <-time.After(interval):
   694  		}
   695  		elapsed := time.Since(start)
   696  		n := len(p.Peers())
   697  		if nOkay >= NumOkayRequired {
   698  			level.Info(p.logger).Log("msg", "gossip settled; proceeding", "elapsed", elapsed)
   699  			break
   700  		}
   701  		if n == nPeers {
   702  			nOkay++
   703  			level.Debug(p.logger).Log("msg", "gossip looks settled", "elapsed", elapsed)
   704  		} else {
   705  			nOkay = 0
   706  			level.Info(p.logger).Log("msg", "gossip not settled", "polls", totalPolls, "before", nPeers, "now", n, "elapsed", elapsed)
   707  		}
   708  		nPeers = n
   709  		totalPolls++
   710  	}
   711  	close(p.readyc)
   712  }
   713  
   714  // State is a piece of state that can be serialized and merged with other
   715  // serialized state.
   716  type State interface {
   717  	// MarshalBinary serializes the underlying state.
   718  	MarshalBinary() ([]byte, error)
   719  
   720  	// Merge merges serialized state into the underlying state.
   721  	Merge(b []byte) error
   722  }
   723  
   724  // We use a simple broadcast implementation in which items are never invalidated by others.
   725  type simpleBroadcast []byte
   726  
   727  func (b simpleBroadcast) Message() []byte                       { return []byte(b) }
   728  func (b simpleBroadcast) Invalidates(memberlist.Broadcast) bool { return false }
   729  func (b simpleBroadcast) Finished()                             {}
   730  
   731  func resolvePeers(ctx context.Context, peers []string, myAddress string, res *net.Resolver, waitIfEmpty bool) ([]string, error) {
   732  	var resolvedPeers []string
   733  
   734  	for _, peer := range peers {
   735  		host, port, err := net.SplitHostPort(peer)
   736  		if err != nil {
   737  			return nil, errors.Wrapf(err, "split host/port for peer %s", peer)
   738  		}
   739  
   740  		retryCtx, cancel := context.WithCancel(ctx)
   741  		defer cancel()
   742  
   743  		ips, err := res.LookupIPAddr(ctx, host)
   744  		if err != nil {
   745  			// Assume direct address.
   746  			resolvedPeers = append(resolvedPeers, peer)
   747  			continue
   748  		}
   749  
   750  		if len(ips) == 0 {
   751  			var lookupErrSpotted bool
   752  
   753  			err := retry(2*time.Second, retryCtx.Done(), func() error {
   754  				if lookupErrSpotted {
   755  					// We need to invoke cancel in next run of retry when lookupErrSpotted to preserve LookupIPAddr error.
   756  					cancel()
   757  				}
   758  
   759  				ips, err = res.LookupIPAddr(retryCtx, host)
   760  				if err != nil {
   761  					lookupErrSpotted = true
   762  					return errors.Wrapf(err, "IP Addr lookup for peer %s", peer)
   763  				}
   764  
   765  				ips = removeMyAddr(ips, port, myAddress)
   766  				if len(ips) == 0 {
   767  					if !waitIfEmpty {
   768  						return nil
   769  					}
   770  					return errors.New("empty IPAddr result. Retrying")
   771  				}
   772  
   773  				return nil
   774  			})
   775  			if err != nil {
   776  				return nil, err
   777  			}
   778  		}
   779  
   780  		for _, ip := range ips {
   781  			resolvedPeers = append(resolvedPeers, net.JoinHostPort(ip.String(), port))
   782  		}
   783  	}
   784  
   785  	return resolvedPeers, nil
   786  }
   787  
   788  func removeMyAddr(ips []net.IPAddr, targetPort, myAddr string) []net.IPAddr {
   789  	var result []net.IPAddr
   790  
   791  	for _, ip := range ips {
   792  		if net.JoinHostPort(ip.String(), targetPort) == myAddr {
   793  			continue
   794  		}
   795  		result = append(result, ip)
   796  	}
   797  
   798  	return result
   799  }
   800  
   801  func hasNonlocal(clusterPeers []string) bool {
   802  	for _, peer := range clusterPeers {
   803  		if host, _, err := net.SplitHostPort(peer); err == nil {
   804  			peer = host
   805  		}
   806  		if ip := net.ParseIP(peer); ip != nil && !ip.IsLoopback() {
   807  			return true
   808  		} else if ip == nil && strings.ToLower(peer) != "localhost" {
   809  			return true
   810  		}
   811  	}
   812  	return false
   813  }
   814  
   815  func isUnroutable(addr string) bool {
   816  	if host, _, err := net.SplitHostPort(addr); err == nil {
   817  		addr = host
   818  	}
   819  	if ip := net.ParseIP(addr); ip != nil && (ip.IsUnspecified() || ip.IsLoopback()) {
   820  		return true // typically 0.0.0.0 or localhost
   821  	} else if ip == nil && strings.ToLower(addr) == "localhost" {
   822  		return true
   823  	}
   824  	return false
   825  }
   826  
   827  func isAny(addr string) bool {
   828  	if host, _, err := net.SplitHostPort(addr); err == nil {
   829  		addr = host
   830  	}
   831  	return addr == "" || net.ParseIP(addr).IsUnspecified()
   832  }
   833  
   834  // retry executes f every interval seconds until timeout or no error is returned from f.
   835  func retry(interval time.Duration, stopc <-chan struct{}, f func() error) error {
   836  	tick := time.NewTicker(interval)
   837  	defer tick.Stop()
   838  
   839  	var err error
   840  	for {
   841  		if err = f(); err == nil {
   842  			return nil
   843  		}
   844  		select {
   845  		case <-stopc:
   846  			return err
   847  		case <-tick.C:
   848  		}
   849  	}
   850  }
   851  
   852  func removeOldPeer(old []peer, addr string) []peer {
   853  	new := make([]peer, 0, len(old))
   854  	for _, p := range old {
   855  		if p.Address() != addr {
   856  			new = append(new, p)
   857  		}
   858  	}
   859  
   860  	return new
   861  }
   862  

View as plain text