...

Source file src/github.com/prometheus/alertmanager/provider/mem/mem.go

Documentation: github.com/prometheus/alertmanager/provider/mem

     1  // Copyright 2016 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package mem
    15  
    16  import (
    17  	"context"
    18  	"sync"
    19  	"time"
    20  
    21  	"github.com/go-kit/log"
    22  	"github.com/go-kit/log/level"
    23  	"github.com/prometheus/client_golang/prometheus"
    24  	"github.com/prometheus/common/model"
    25  
    26  	"github.com/prometheus/alertmanager/provider"
    27  	"github.com/prometheus/alertmanager/store"
    28  	"github.com/prometheus/alertmanager/types"
    29  )
    30  
    31  const alertChannelLength = 200
    32  
    33  // Alerts gives access to a set of alerts. All methods are goroutine-safe.
    34  type Alerts struct {
    35  	cancel context.CancelFunc
    36  
    37  	alerts *store.Alerts
    38  	marker types.Marker
    39  
    40  	mtx       sync.Mutex
    41  	listeners map[int]listeningAlerts
    42  	next      int
    43  
    44  	callback AlertStoreCallback
    45  
    46  	logger log.Logger
    47  }
    48  
    49  type AlertStoreCallback interface {
    50  	// PreStore is called before alert is stored into the store. If this method returns error,
    51  	// alert is not stored.
    52  	// Existing flag indicates whether alert has existed before (and is only updated) or not.
    53  	// If alert has existed before, then alert passed to PreStore is result of merging existing alert with new alert.
    54  	PreStore(alert *types.Alert, existing bool) error
    55  
    56  	// PostStore is called after alert has been put into store.
    57  	PostStore(alert *types.Alert, existing bool)
    58  
    59  	// PostDelete is called after alert has been removed from the store due to alert garbage collection.
    60  	PostDelete(alert *types.Alert)
    61  }
    62  
    63  type listeningAlerts struct {
    64  	alerts chan *types.Alert
    65  	done   chan struct{}
    66  }
    67  
    68  func (a *Alerts) registerMetrics(r prometheus.Registerer) {
    69  	newMemAlertByStatus := func(s types.AlertState) prometheus.GaugeFunc {
    70  		return prometheus.NewGaugeFunc(
    71  			prometheus.GaugeOpts{
    72  				Name:        "alertmanager_alerts",
    73  				Help:        "How many alerts by state.",
    74  				ConstLabels: prometheus.Labels{"state": string(s)},
    75  			},
    76  			func() float64 {
    77  				return float64(a.count(s))
    78  			},
    79  		)
    80  	}
    81  
    82  	r.MustRegister(newMemAlertByStatus(types.AlertStateActive))
    83  	r.MustRegister(newMemAlertByStatus(types.AlertStateSuppressed))
    84  	r.MustRegister(newMemAlertByStatus(types.AlertStateUnprocessed))
    85  }
    86  
    87  // NewAlerts returns a new alert provider.
    88  func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, alertCallback AlertStoreCallback, l log.Logger, r prometheus.Registerer) (*Alerts, error) {
    89  	if alertCallback == nil {
    90  		alertCallback = noopCallback{}
    91  	}
    92  
    93  	ctx, cancel := context.WithCancel(ctx)
    94  	a := &Alerts{
    95  		marker:    m,
    96  		alerts:    store.NewAlerts(),
    97  		cancel:    cancel,
    98  		listeners: map[int]listeningAlerts{},
    99  		next:      0,
   100  		logger:    log.With(l, "component", "provider"),
   101  		callback:  alertCallback,
   102  	}
   103  	a.alerts.SetGCCallback(func(alerts []*types.Alert) {
   104  		for _, alert := range alerts {
   105  			// As we don't persist alerts, we no longer consider them after
   106  			// they are resolved. Alerts waiting for resolved notifications are
   107  			// held in memory in aggregation groups redundantly.
   108  			m.Delete(alert.Fingerprint())
   109  			a.callback.PostDelete(alert)
   110  		}
   111  
   112  		a.mtx.Lock()
   113  		for i, l := range a.listeners {
   114  			select {
   115  			case <-l.done:
   116  				delete(a.listeners, i)
   117  				close(l.alerts)
   118  			default:
   119  				// listener is not closed yet, hence proceed.
   120  			}
   121  		}
   122  		a.mtx.Unlock()
   123  	})
   124  
   125  	if r != nil {
   126  		a.registerMetrics(r)
   127  	}
   128  
   129  	go a.alerts.Run(ctx, intervalGC)
   130  
   131  	return a, nil
   132  }
   133  
   134  // Close the alert provider.
   135  func (a *Alerts) Close() {
   136  	if a.cancel != nil {
   137  		a.cancel()
   138  	}
   139  }
   140  
   141  func max(a, b int) int {
   142  	if a > b {
   143  		return a
   144  	}
   145  	return b
   146  }
   147  
   148  // Subscribe returns an iterator over active alerts that have not been
   149  // resolved and successfully notified about.
   150  // They are not guaranteed to be in chronological order.
   151  func (a *Alerts) Subscribe() provider.AlertIterator {
   152  	a.mtx.Lock()
   153  	defer a.mtx.Unlock()
   154  
   155  	var (
   156  		done   = make(chan struct{})
   157  		alerts = a.alerts.List()
   158  		ch     = make(chan *types.Alert, max(len(alerts), alertChannelLength))
   159  	)
   160  
   161  	for _, a := range alerts {
   162  		ch <- a
   163  	}
   164  
   165  	a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
   166  	a.next++
   167  
   168  	return provider.NewAlertIterator(ch, done, nil)
   169  }
   170  
   171  // GetPending returns an iterator over all the alerts that have
   172  // pending notifications.
   173  func (a *Alerts) GetPending() provider.AlertIterator {
   174  	var (
   175  		ch   = make(chan *types.Alert, alertChannelLength)
   176  		done = make(chan struct{})
   177  	)
   178  
   179  	go func() {
   180  		defer close(ch)
   181  
   182  		for _, a := range a.alerts.List() {
   183  			select {
   184  			case ch <- a:
   185  			case <-done:
   186  				return
   187  			}
   188  		}
   189  	}()
   190  
   191  	return provider.NewAlertIterator(ch, done, nil)
   192  }
   193  
   194  // Get returns the alert for a given fingerprint.
   195  func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
   196  	return a.alerts.Get(fp)
   197  }
   198  
   199  // Put adds the given alert to the set.
   200  func (a *Alerts) Put(alerts ...*types.Alert) error {
   201  	for _, alert := range alerts {
   202  		fp := alert.Fingerprint()
   203  
   204  		existing := false
   205  
   206  		// Check that there's an alert existing within the store before
   207  		// trying to merge.
   208  		if old, err := a.alerts.Get(fp); err == nil {
   209  			existing = true
   210  
   211  			// Merge alerts if there is an overlap in activity range.
   212  			if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
   213  				(alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
   214  				alert = old.Merge(alert)
   215  			}
   216  		}
   217  
   218  		if err := a.callback.PreStore(alert, existing); err != nil {
   219  			level.Error(a.logger).Log("msg", "pre-store callback returned error on set alert", "err", err)
   220  			continue
   221  		}
   222  
   223  		if err := a.alerts.Set(alert); err != nil {
   224  			level.Error(a.logger).Log("msg", "error on set alert", "err", err)
   225  			continue
   226  		}
   227  
   228  		a.callback.PostStore(alert, existing)
   229  
   230  		a.mtx.Lock()
   231  		for _, l := range a.listeners {
   232  			select {
   233  			case l.alerts <- alert:
   234  			case <-l.done:
   235  			}
   236  		}
   237  		a.mtx.Unlock()
   238  	}
   239  
   240  	return nil
   241  }
   242  
   243  // count returns the number of non-resolved alerts we currently have stored filtered by the provided state.
   244  func (a *Alerts) count(state types.AlertState) int {
   245  	var count int
   246  	for _, alert := range a.alerts.List() {
   247  		if alert.Resolved() {
   248  			continue
   249  		}
   250  
   251  		status := a.marker.Status(alert.Fingerprint())
   252  		if status.State != state {
   253  			continue
   254  		}
   255  
   256  		count++
   257  	}
   258  
   259  	return count
   260  }
   261  
   262  type noopCallback struct{}
   263  
   264  func (n noopCallback) PreStore(_ *types.Alert, _ bool) error { return nil }
   265  func (n noopCallback) PostStore(_ *types.Alert, _ bool)      {}
   266  func (n noopCallback) PostDelete(_ *types.Alert)             {}
   267  

View as plain text