...

Source file src/github.com/prometheus/alertmanager/cluster/delegate.go

Documentation: github.com/prometheus/alertmanager/cluster

     1  // Copyright 2018 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package cluster
    15  
    16  import (
    17  	"time"
    18  
    19  	"github.com/go-kit/log"
    20  	"github.com/go-kit/log/level"
    21  	"github.com/gogo/protobuf/proto"
    22  	"github.com/hashicorp/memberlist"
    23  	"github.com/prometheus/client_golang/prometheus"
    24  
    25  	"github.com/prometheus/alertmanager/cluster/clusterpb"
    26  )
    27  
    28  const (
    29  	// Maximum number of messages to be held in the queue.
    30  	maxQueueSize = 4096
    31  	fullState    = "full_state"
    32  	update       = "update"
    33  )
    34  
    35  // delegate implements memberlist.Delegate and memberlist.EventDelegate
    36  // and broadcasts its peer's state in the cluster.
    37  type delegate struct {
    38  	*Peer
    39  
    40  	logger log.Logger
    41  	bcast  *memberlist.TransmitLimitedQueue
    42  
    43  	messagesReceived     *prometheus.CounterVec
    44  	messagesReceivedSize *prometheus.CounterVec
    45  	messagesSent         *prometheus.CounterVec
    46  	messagesSentSize     *prometheus.CounterVec
    47  	messagesPruned       prometheus.Counter
    48  	nodeAlive            *prometheus.CounterVec
    49  	nodePingDuration     *prometheus.HistogramVec
    50  }
    51  
    52  func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit int) *delegate {
    53  	bcast := &memberlist.TransmitLimitedQueue{
    54  		NumNodes:       p.ClusterSize,
    55  		RetransmitMult: retransmit,
    56  	}
    57  	messagesReceived := prometheus.NewCounterVec(prometheus.CounterOpts{
    58  		Name: "alertmanager_cluster_messages_received_total",
    59  		Help: "Total number of cluster messages received.",
    60  	}, []string{"msg_type"})
    61  	messagesReceivedSize := prometheus.NewCounterVec(prometheus.CounterOpts{
    62  		Name: "alertmanager_cluster_messages_received_size_total",
    63  		Help: "Total size of cluster messages received.",
    64  	}, []string{"msg_type"})
    65  	messagesSent := prometheus.NewCounterVec(prometheus.CounterOpts{
    66  		Name: "alertmanager_cluster_messages_sent_total",
    67  		Help: "Total number of cluster messages sent.",
    68  	}, []string{"msg_type"})
    69  	messagesSentSize := prometheus.NewCounterVec(prometheus.CounterOpts{
    70  		Name: "alertmanager_cluster_messages_sent_size_total",
    71  		Help: "Total size of cluster messages sent.",
    72  	}, []string{"msg_type"})
    73  	messagesPruned := prometheus.NewCounter(prometheus.CounterOpts{
    74  		Name: "alertmanager_cluster_messages_pruned_total",
    75  		Help: "Total number of cluster messages pruned.",
    76  	})
    77  	gossipClusterMembers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
    78  		Name: "alertmanager_cluster_members",
    79  		Help: "Number indicating current number of members in cluster.",
    80  	}, func() float64 {
    81  		return float64(p.ClusterSize())
    82  	})
    83  	peerPosition := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
    84  		Name: "alertmanager_peer_position",
    85  		Help: "Position the Alertmanager instance believes it's in. The position determines a peer's behavior in the cluster.",
    86  	}, func() float64 {
    87  		return float64(p.Position())
    88  	})
    89  	healthScore := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
    90  		Name: "alertmanager_cluster_health_score",
    91  		Help: "Health score of the cluster. Lower values are better and zero means 'totally healthy'.",
    92  	}, func() float64 {
    93  		return float64(p.mlist.GetHealthScore())
    94  	})
    95  	messagesQueued := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
    96  		Name: "alertmanager_cluster_messages_queued",
    97  		Help: "Number of cluster messages which are queued.",
    98  	}, func() float64 {
    99  		return float64(bcast.NumQueued())
   100  	})
   101  	nodeAlive := prometheus.NewCounterVec(prometheus.CounterOpts{
   102  		Name: "alertmanager_cluster_alive_messages_total",
   103  		Help: "Total number of received alive messages.",
   104  	}, []string{"peer"},
   105  	)
   106  	nodePingDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{
   107  		Name:    "alertmanager_cluster_pings_seconds",
   108  		Help:    "Histogram of latencies for ping messages.",
   109  		Buckets: []float64{.005, .01, .025, .05, .1, .25, .5},
   110  	}, []string{"peer"},
   111  	)
   112  
   113  	messagesReceived.WithLabelValues(fullState)
   114  	messagesReceivedSize.WithLabelValues(fullState)
   115  	messagesReceived.WithLabelValues(update)
   116  	messagesReceivedSize.WithLabelValues(update)
   117  	messagesSent.WithLabelValues(fullState)
   118  	messagesSentSize.WithLabelValues(fullState)
   119  	messagesSent.WithLabelValues(update)
   120  	messagesSentSize.WithLabelValues(update)
   121  
   122  	reg.MustRegister(messagesReceived, messagesReceivedSize, messagesSent, messagesSentSize,
   123  		gossipClusterMembers, peerPosition, healthScore, messagesQueued, messagesPruned,
   124  		nodeAlive, nodePingDuration,
   125  	)
   126  
   127  	d := &delegate{
   128  		logger:               l,
   129  		Peer:                 p,
   130  		bcast:                bcast,
   131  		messagesReceived:     messagesReceived,
   132  		messagesReceivedSize: messagesReceivedSize,
   133  		messagesSent:         messagesSent,
   134  		messagesSentSize:     messagesSentSize,
   135  		messagesPruned:       messagesPruned,
   136  		nodeAlive:            nodeAlive,
   137  		nodePingDuration:     nodePingDuration,
   138  	}
   139  
   140  	go d.handleQueueDepth()
   141  
   142  	return d
   143  }
   144  
   145  // NodeMeta retrieves meta-data about the current node when broadcasting an alive message.
   146  func (d *delegate) NodeMeta(limit int) []byte {
   147  	return []byte{}
   148  }
   149  
   150  // NotifyMsg is the callback invoked when a user-level gossip message is received.
   151  func (d *delegate) NotifyMsg(b []byte) {
   152  	d.messagesReceived.WithLabelValues(update).Inc()
   153  	d.messagesReceivedSize.WithLabelValues(update).Add(float64(len(b)))
   154  
   155  	var p clusterpb.Part
   156  	if err := proto.Unmarshal(b, &p); err != nil {
   157  		level.Warn(d.logger).Log("msg", "decode broadcast", "err", err)
   158  		return
   159  	}
   160  
   161  	d.mtx.RLock()
   162  	s, ok := d.states[p.Key]
   163  	d.mtx.RUnlock()
   164  
   165  	if !ok {
   166  		return
   167  	}
   168  	if err := s.Merge(p.Data); err != nil {
   169  		level.Warn(d.logger).Log("msg", "merge broadcast", "err", err, "key", p.Key)
   170  		return
   171  	}
   172  }
   173  
   174  // GetBroadcasts is called when user data messages can be broadcasted.
   175  func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
   176  	msgs := d.bcast.GetBroadcasts(overhead, limit)
   177  	d.messagesSent.WithLabelValues(update).Add(float64(len(msgs)))
   178  	for _, m := range msgs {
   179  		d.messagesSentSize.WithLabelValues(update).Add(float64(len(m)))
   180  	}
   181  	return msgs
   182  }
   183  
   184  // LocalState is called when gossip fetches local state.
   185  func (d *delegate) LocalState(_ bool) []byte {
   186  	d.mtx.RLock()
   187  	defer d.mtx.RUnlock()
   188  	all := &clusterpb.FullState{
   189  		Parts: make([]clusterpb.Part, 0, len(d.states)),
   190  	}
   191  
   192  	for key, s := range d.states {
   193  		b, err := s.MarshalBinary()
   194  		if err != nil {
   195  			level.Warn(d.logger).Log("msg", "encode local state", "err", err, "key", key)
   196  			return nil
   197  		}
   198  		all.Parts = append(all.Parts, clusterpb.Part{Key: key, Data: b})
   199  	}
   200  	b, err := proto.Marshal(all)
   201  	if err != nil {
   202  		level.Warn(d.logger).Log("msg", "encode local state", "err", err)
   203  		return nil
   204  	}
   205  	d.messagesSent.WithLabelValues(fullState).Inc()
   206  	d.messagesSentSize.WithLabelValues(fullState).Add(float64(len(b)))
   207  	return b
   208  }
   209  
   210  func (d *delegate) MergeRemoteState(buf []byte, _ bool) {
   211  	d.messagesReceived.WithLabelValues(fullState).Inc()
   212  	d.messagesReceivedSize.WithLabelValues(fullState).Add(float64(len(buf)))
   213  
   214  	var fs clusterpb.FullState
   215  	if err := proto.Unmarshal(buf, &fs); err != nil {
   216  		level.Warn(d.logger).Log("msg", "merge remote state", "err", err)
   217  		return
   218  	}
   219  	d.mtx.RLock()
   220  	defer d.mtx.RUnlock()
   221  	for _, p := range fs.Parts {
   222  		s, ok := d.states[p.Key]
   223  		if !ok {
   224  			level.Warn(d.logger).Log("received", "unknown state key", "len", len(buf), "key", p.Key)
   225  			continue
   226  		}
   227  		if err := s.Merge(p.Data); err != nil {
   228  			level.Warn(d.logger).Log("msg", "merge remote state", "err", err, "key", p.Key)
   229  			return
   230  		}
   231  	}
   232  }
   233  
   234  // NotifyJoin is called if a peer joins the cluster.
   235  func (d *delegate) NotifyJoin(n *memberlist.Node) {
   236  	level.Debug(d.logger).Log("received", "NotifyJoin", "node", n.Name, "addr", n.Address())
   237  	d.Peer.peerJoin(n)
   238  }
   239  
   240  // NotifyLeave is called if a peer leaves the cluster.
   241  func (d *delegate) NotifyLeave(n *memberlist.Node) {
   242  	level.Debug(d.logger).Log("received", "NotifyLeave", "node", n.Name, "addr", n.Address())
   243  	d.Peer.peerLeave(n)
   244  }
   245  
   246  // NotifyUpdate is called if a cluster peer gets updated.
   247  func (d *delegate) NotifyUpdate(n *memberlist.Node) {
   248  	level.Debug(d.logger).Log("received", "NotifyUpdate", "node", n.Name, "addr", n.Address())
   249  	d.Peer.peerUpdate(n)
   250  }
   251  
   252  // NotifyAlive implements the memberlist.AliveDelegate interface.
   253  func (d *delegate) NotifyAlive(peer *memberlist.Node) error {
   254  	d.nodeAlive.WithLabelValues(peer.Name).Inc()
   255  	return nil
   256  }
   257  
   258  // AckPayload implements the memberlist.PingDelegate interface.
   259  func (d *delegate) AckPayload() []byte {
   260  	return []byte{}
   261  }
   262  
   263  // NotifyPingComplete implements the memberlist.PingDelegate interface.
   264  func (d *delegate) NotifyPingComplete(peer *memberlist.Node, rtt time.Duration, payload []byte) {
   265  	d.nodePingDuration.WithLabelValues(peer.Name).Observe(rtt.Seconds())
   266  }
   267  
   268  // handleQueueDepth ensures that the queue doesn't grow unbounded by pruning
   269  // older messages at regular interval.
   270  func (d *delegate) handleQueueDepth() {
   271  	for {
   272  		select {
   273  		case <-d.stopc:
   274  			return
   275  		case <-time.After(15 * time.Minute):
   276  			n := d.bcast.NumQueued()
   277  			if n > maxQueueSize {
   278  				level.Warn(d.logger).Log("msg", "dropping messages because too many are queued", "current", n, "limit", maxQueueSize)
   279  				d.bcast.Prune(maxQueueSize)
   280  				d.messagesPruned.Add(float64(n - maxQueueSize))
   281  			}
   282  		}
   283  	}
   284  }
   285  

View as plain text