...

Source file src/github.com/prometheus/alertmanager/silence/silence.go

Documentation: github.com/prometheus/alertmanager/silence

     1  // Copyright 2016 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  // Package silence provides a storage for silences, which can share its
    15  // state over a mesh network and snapshot it.
    16  package silence
    17  
    18  import (
    19  	"bytes"
    20  	"fmt"
    21  	"io"
    22  	"math/rand"
    23  	"os"
    24  	"reflect"
    25  	"regexp"
    26  	"sort"
    27  	"sync"
    28  	"time"
    29  
    30  	"github.com/benbjohnson/clock"
    31  	"github.com/go-kit/log"
    32  	"github.com/go-kit/log/level"
    33  	uuid "github.com/gofrs/uuid"
    34  	"github.com/matttproud/golang_protobuf_extensions/pbutil"
    35  	"github.com/pkg/errors"
    36  	"github.com/prometheus/client_golang/prometheus"
    37  	"github.com/prometheus/common/model"
    38  
    39  	"github.com/prometheus/alertmanager/cluster"
    40  	"github.com/prometheus/alertmanager/pkg/labels"
    41  	pb "github.com/prometheus/alertmanager/silence/silencepb"
    42  	"github.com/prometheus/alertmanager/types"
    43  )
    44  
    45  // ErrNotFound is returned if a silence was not found.
    46  var ErrNotFound = fmt.Errorf("silence not found")
    47  
    48  // ErrInvalidState is returned if the state isn't valid.
    49  var ErrInvalidState = fmt.Errorf("invalid state")
    50  
    51  type matcherCache map[*pb.Silence]labels.Matchers
    52  
    53  // Get retrieves the matchers for a given silence. If it is a missed cache
    54  // access, it compiles and adds the matchers of the requested silence to the
    55  // cache.
    56  func (c matcherCache) Get(s *pb.Silence) (labels.Matchers, error) {
    57  	if m, ok := c[s]; ok {
    58  		return m, nil
    59  	}
    60  	return c.add(s)
    61  }
    62  
    63  // add compiles a silences' matchers and adds them to the cache.
    64  // It returns the compiled matchers.
    65  func (c matcherCache) add(s *pb.Silence) (labels.Matchers, error) {
    66  	ms := make(labels.Matchers, len(s.Matchers))
    67  
    68  	for i, m := range s.Matchers {
    69  		var mt labels.MatchType
    70  		switch m.Type {
    71  		case pb.Matcher_EQUAL:
    72  			mt = labels.MatchEqual
    73  		case pb.Matcher_NOT_EQUAL:
    74  			mt = labels.MatchNotEqual
    75  		case pb.Matcher_REGEXP:
    76  			mt = labels.MatchRegexp
    77  		case pb.Matcher_NOT_REGEXP:
    78  			mt = labels.MatchNotRegexp
    79  		default:
    80  			return nil, errors.Errorf("unknown matcher type %q", m.Type)
    81  		}
    82  		matcher, err := labels.NewMatcher(mt, m.Name, m.Pattern)
    83  		if err != nil {
    84  			return nil, err
    85  		}
    86  
    87  		ms[i] = matcher
    88  	}
    89  
    90  	c[s] = ms
    91  	return ms, nil
    92  }
    93  
    94  // Silencer binds together a Marker and a Silences to implement the Muter
    95  // interface.
    96  type Silencer struct {
    97  	silences *Silences
    98  	marker   types.Marker
    99  	logger   log.Logger
   100  }
   101  
   102  // NewSilencer returns a new Silencer.
   103  func NewSilencer(s *Silences, m types.Marker, l log.Logger) *Silencer {
   104  	return &Silencer{
   105  		silences: s,
   106  		marker:   m,
   107  		logger:   l,
   108  	}
   109  }
   110  
   111  // Mutes implements the Muter interface.
   112  func (s *Silencer) Mutes(lset model.LabelSet) bool {
   113  	fp := lset.Fingerprint()
   114  	activeIDs, pendingIDs, markerVersion, _ := s.marker.Silenced(fp)
   115  
   116  	var (
   117  		err        error
   118  		allSils    []*pb.Silence
   119  		newVersion = markerVersion
   120  	)
   121  	if markerVersion == s.silences.Version() {
   122  		totalSilences := len(activeIDs) + len(pendingIDs)
   123  		// No new silences added, just need to check which of the old
   124  		// silences are still relevant and which of the pending ones
   125  		// have become active.
   126  		if totalSilences == 0 {
   127  			// Super fast path: No silences ever applied to this
   128  			// alert, none have been added. We are done.
   129  			return false
   130  		}
   131  		// This is still a quite fast path: No silences have been added,
   132  		// we only need to check which of the applicable silences are
   133  		// currently active. Note that newVersion is left at
   134  		// markerVersion because the Query call might already return a
   135  		// newer version, which is not the version our old list of
   136  		// applicable silences is based on.
   137  		allIDs := append(append(make([]string, 0, totalSilences), activeIDs...), pendingIDs...)
   138  		allSils, _, err = s.silences.Query(
   139  			QIDs(allIDs...),
   140  			QState(types.SilenceStateActive, types.SilenceStatePending),
   141  		)
   142  	} else {
   143  		// New silences have been added, do a full query.
   144  		allSils, newVersion, err = s.silences.Query(
   145  			QState(types.SilenceStateActive, types.SilenceStatePending),
   146  			QMatches(lset),
   147  		)
   148  	}
   149  	if err != nil {
   150  		level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err)
   151  	}
   152  	if len(allSils) == 0 {
   153  		// Easy case, neither active nor pending silences anymore.
   154  		s.marker.SetActiveOrSilenced(fp, newVersion, nil, nil)
   155  		return false
   156  	}
   157  	// It is still possible that nothing has changed, but finding out is not
   158  	// much less effort than just recreating the IDs from the query
   159  	// result. So let's do it in any case. Note that we cannot reuse the
   160  	// current ID slices for concurrency reasons.
   161  	activeIDs, pendingIDs = nil, nil
   162  	now := s.silences.nowUTC()
   163  	for _, sil := range allSils {
   164  		switch getState(sil, now) {
   165  		case types.SilenceStatePending:
   166  			pendingIDs = append(pendingIDs, sil.Id)
   167  		case types.SilenceStateActive:
   168  			activeIDs = append(activeIDs, sil.Id)
   169  		default:
   170  			// Do nothing, silence has expired in the meantime.
   171  		}
   172  	}
   173  	level.Debug(s.logger).Log(
   174  		"msg", "determined current silences state",
   175  		"now", now,
   176  		"total", len(allSils),
   177  		"active", len(activeIDs),
   178  		"pending", len(pendingIDs),
   179  	)
   180  	sort.Strings(activeIDs)
   181  	sort.Strings(pendingIDs)
   182  
   183  	s.marker.SetActiveOrSilenced(fp, newVersion, activeIDs, pendingIDs)
   184  
   185  	return len(activeIDs) > 0
   186  }
   187  
   188  // Silences holds a silence state that can be modified, queried, and snapshot.
   189  type Silences struct {
   190  	clock clock.Clock
   191  
   192  	logger    log.Logger
   193  	metrics   *metrics
   194  	retention time.Duration
   195  
   196  	mtx       sync.RWMutex
   197  	st        state
   198  	version   int // Increments whenever silences are added.
   199  	broadcast func([]byte)
   200  	mc        matcherCache
   201  }
   202  
   203  // MaintenanceFunc represents the function to run as part of the periodic maintenance for silences.
   204  // It returns the size of the snapshot taken or an error if it failed.
   205  type MaintenanceFunc func() (int64, error)
   206  
   207  type metrics struct {
   208  	gcDuration              prometheus.Summary
   209  	snapshotDuration        prometheus.Summary
   210  	snapshotSize            prometheus.Gauge
   211  	queriesTotal            prometheus.Counter
   212  	queryErrorsTotal        prometheus.Counter
   213  	queryDuration           prometheus.Histogram
   214  	silencesActive          prometheus.GaugeFunc
   215  	silencesPending         prometheus.GaugeFunc
   216  	silencesExpired         prometheus.GaugeFunc
   217  	propagatedMessagesTotal prometheus.Counter
   218  }
   219  
   220  func newSilenceMetricByState(s *Silences, st types.SilenceState) prometheus.GaugeFunc {
   221  	return prometheus.NewGaugeFunc(
   222  		prometheus.GaugeOpts{
   223  			Name:        "alertmanager_silences",
   224  			Help:        "How many silences by state.",
   225  			ConstLabels: prometheus.Labels{"state": string(st)},
   226  		},
   227  		func() float64 {
   228  			count, err := s.CountState(st)
   229  			if err != nil {
   230  				level.Error(s.logger).Log("msg", "Counting silences failed", "err", err)
   231  			}
   232  			return float64(count)
   233  		},
   234  	)
   235  }
   236  
   237  func newMetrics(r prometheus.Registerer, s *Silences) *metrics {
   238  	m := &metrics{}
   239  
   240  	m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{
   241  		Name:       "alertmanager_silences_gc_duration_seconds",
   242  		Help:       "Duration of the last silence garbage collection cycle.",
   243  		Objectives: map[float64]float64{},
   244  	})
   245  	m.snapshotDuration = prometheus.NewSummary(prometheus.SummaryOpts{
   246  		Name:       "alertmanager_silences_snapshot_duration_seconds",
   247  		Help:       "Duration of the last silence snapshot.",
   248  		Objectives: map[float64]float64{},
   249  	})
   250  	m.snapshotSize = prometheus.NewGauge(prometheus.GaugeOpts{
   251  		Name: "alertmanager_silences_snapshot_size_bytes",
   252  		Help: "Size of the last silence snapshot in bytes.",
   253  	})
   254  	m.queriesTotal = prometheus.NewCounter(prometheus.CounterOpts{
   255  		Name: "alertmanager_silences_queries_total",
   256  		Help: "How many silence queries were received.",
   257  	})
   258  	m.queryErrorsTotal = prometheus.NewCounter(prometheus.CounterOpts{
   259  		Name: "alertmanager_silences_query_errors_total",
   260  		Help: "How many silence received queries did not succeed.",
   261  	})
   262  	m.queryDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
   263  		Name: "alertmanager_silences_query_duration_seconds",
   264  		Help: "Duration of silence query evaluation.",
   265  	})
   266  	m.propagatedMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{
   267  		Name: "alertmanager_silences_gossip_messages_propagated_total",
   268  		Help: "Number of received gossip messages that have been further gossiped.",
   269  	})
   270  	if s != nil {
   271  		m.silencesActive = newSilenceMetricByState(s, types.SilenceStateActive)
   272  		m.silencesPending = newSilenceMetricByState(s, types.SilenceStatePending)
   273  		m.silencesExpired = newSilenceMetricByState(s, types.SilenceStateExpired)
   274  	}
   275  
   276  	if r != nil {
   277  		r.MustRegister(
   278  			m.gcDuration,
   279  			m.snapshotDuration,
   280  			m.snapshotSize,
   281  			m.queriesTotal,
   282  			m.queryErrorsTotal,
   283  			m.queryDuration,
   284  			m.silencesActive,
   285  			m.silencesPending,
   286  			m.silencesExpired,
   287  			m.propagatedMessagesTotal,
   288  		)
   289  	}
   290  	return m
   291  }
   292  
   293  // Options exposes configuration options for creating a new Silences object.
   294  // Its zero value is a safe default.
   295  type Options struct {
   296  	// A snapshot file or reader from which the initial state is loaded.
   297  	// None or only one of them must be set.
   298  	SnapshotFile   string
   299  	SnapshotReader io.Reader
   300  
   301  	// Retention time for newly created Silences. Silences may be
   302  	// garbage collected after the given duration after they ended.
   303  	Retention time.Duration
   304  
   305  	// A logger used by background processing.
   306  	Logger  log.Logger
   307  	Metrics prometheus.Registerer
   308  }
   309  
   310  func (o *Options) validate() error {
   311  	if o.SnapshotFile != "" && o.SnapshotReader != nil {
   312  		return fmt.Errorf("only one of SnapshotFile and SnapshotReader must be set")
   313  	}
   314  	return nil
   315  }
   316  
   317  // New returns a new Silences object with the given configuration.
   318  func New(o Options) (*Silences, error) {
   319  	if err := o.validate(); err != nil {
   320  		return nil, err
   321  	}
   322  	if o.SnapshotFile != "" {
   323  		if r, err := os.Open(o.SnapshotFile); err != nil {
   324  			if !os.IsNotExist(err) {
   325  				return nil, err
   326  			}
   327  		} else {
   328  			o.SnapshotReader = r
   329  			defer r.Close()
   330  		}
   331  	}
   332  	s := &Silences{
   333  		clock:     clock.New(),
   334  		mc:        matcherCache{},
   335  		logger:    log.NewNopLogger(),
   336  		retention: o.Retention,
   337  		broadcast: func([]byte) {},
   338  		st:        state{},
   339  	}
   340  	s.metrics = newMetrics(o.Metrics, s)
   341  
   342  	if o.Logger != nil {
   343  		s.logger = o.Logger
   344  	}
   345  	if o.SnapshotReader != nil {
   346  		if err := s.loadSnapshot(o.SnapshotReader); err != nil {
   347  			return s, err
   348  		}
   349  	}
   350  	return s, nil
   351  }
   352  
   353  func (s *Silences) nowUTC() time.Time {
   354  	return s.clock.Now().UTC()
   355  }
   356  
   357  // Maintenance garbage collects the silence state at the given interval. If the snapshot
   358  // file is set, a snapshot is written to it afterwards.
   359  // Terminates on receiving from stopc.
   360  // If not nil, the last argument is an override for what to do as part of the maintenance - for advanced usage.
   361  func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}, override MaintenanceFunc) {
   362  	t := s.clock.Ticker(interval)
   363  	defer t.Stop()
   364  
   365  	var doMaintenance MaintenanceFunc
   366  	doMaintenance = func() (int64, error) {
   367  		var size int64
   368  
   369  		if _, err := s.GC(); err != nil {
   370  			return size, err
   371  		}
   372  		if snapf == "" {
   373  			return size, nil
   374  		}
   375  		f, err := openReplace(snapf)
   376  		if err != nil {
   377  			return size, err
   378  		}
   379  		if size, err = s.Snapshot(f); err != nil {
   380  			return size, err
   381  		}
   382  		return size, f.Close()
   383  	}
   384  
   385  	if override != nil {
   386  		doMaintenance = override
   387  	}
   388  
   389  	runMaintenance := func(do MaintenanceFunc) error {
   390  		start := s.nowUTC()
   391  		level.Debug(s.logger).Log("msg", "Running maintenance")
   392  		size, err := do()
   393  		level.Debug(s.logger).Log("msg", "Maintenance done", "duration", s.clock.Since(start), "size", size)
   394  		s.metrics.snapshotSize.Set(float64(size))
   395  		return err
   396  	}
   397  
   398  Loop:
   399  	for {
   400  		select {
   401  		case <-stopc:
   402  			break Loop
   403  		case <-t.C:
   404  			if err := runMaintenance(doMaintenance); err != nil {
   405  				level.Info(s.logger).Log("msg", "Running maintenance failed", "err", err)
   406  			}
   407  		}
   408  	}
   409  	// No need for final maintenance if we don't want to snapshot.
   410  	if snapf == "" {
   411  		return
   412  	}
   413  	if err := runMaintenance(doMaintenance); err != nil {
   414  		level.Info(s.logger).Log("msg", "Creating shutdown snapshot failed", "err", err)
   415  	}
   416  }
   417  
   418  // GC runs a garbage collection that removes silences that have ended longer
   419  // than the configured retention time ago.
   420  func (s *Silences) GC() (int, error) {
   421  	start := time.Now()
   422  	defer func() { s.metrics.gcDuration.Observe(time.Since(start).Seconds()) }()
   423  
   424  	now := s.nowUTC()
   425  	var n int
   426  
   427  	s.mtx.Lock()
   428  	defer s.mtx.Unlock()
   429  
   430  	for id, sil := range s.st {
   431  		if sil.ExpiresAt.IsZero() {
   432  			return n, errors.New("unexpected zero expiration timestamp")
   433  		}
   434  		if !sil.ExpiresAt.After(now) {
   435  			delete(s.st, id)
   436  			delete(s.mc, sil.Silence)
   437  			n++
   438  		}
   439  	}
   440  
   441  	return n, nil
   442  }
   443  
   444  // ValidateMatcher runs validation on the matcher name, type, and pattern.
   445  var ValidateMatcher = func(m *pb.Matcher) error {
   446  	if !model.LabelName(m.Name).IsValid() {
   447  		return fmt.Errorf("invalid label name %q", m.Name)
   448  	}
   449  	switch m.Type {
   450  	case pb.Matcher_EQUAL, pb.Matcher_NOT_EQUAL:
   451  		if !model.LabelValue(m.Pattern).IsValid() {
   452  			return fmt.Errorf("invalid label value %q", m.Pattern)
   453  		}
   454  	case pb.Matcher_REGEXP, pb.Matcher_NOT_REGEXP:
   455  		if _, err := regexp.Compile(m.Pattern); err != nil {
   456  			return fmt.Errorf("invalid regular expression %q: %s", m.Pattern, err)
   457  		}
   458  	default:
   459  		return fmt.Errorf("unknown matcher type %q", m.Type)
   460  	}
   461  	return nil
   462  }
   463  
   464  func matchesEmpty(m *pb.Matcher) bool {
   465  	switch m.Type {
   466  	case pb.Matcher_EQUAL:
   467  		return m.Pattern == ""
   468  	case pb.Matcher_REGEXP:
   469  		matched, _ := regexp.MatchString(m.Pattern, "")
   470  		return matched
   471  	default:
   472  		return false
   473  	}
   474  }
   475  
   476  func validateSilence(s *pb.Silence) error {
   477  	if s.Id == "" {
   478  		return errors.New("ID missing")
   479  	}
   480  	if len(s.Matchers) == 0 {
   481  		return errors.New("at least one matcher required")
   482  	}
   483  	allMatchEmpty := true
   484  	for i, m := range s.Matchers {
   485  		if err := ValidateMatcher(m); err != nil {
   486  			return fmt.Errorf("invalid label matcher %d: %s", i, err)
   487  		}
   488  		allMatchEmpty = allMatchEmpty && matchesEmpty(m)
   489  	}
   490  	if allMatchEmpty {
   491  		return errors.New("at least one matcher must not match the empty string")
   492  	}
   493  	if s.StartsAt.IsZero() {
   494  		return errors.New("invalid zero start timestamp")
   495  	}
   496  	if s.EndsAt.IsZero() {
   497  		return errors.New("invalid zero end timestamp")
   498  	}
   499  	if s.EndsAt.Before(s.StartsAt) {
   500  		return errors.New("end time must not be before start time")
   501  	}
   502  	if s.UpdatedAt.IsZero() {
   503  		return errors.New("invalid zero update timestamp")
   504  	}
   505  	return nil
   506  }
   507  
   508  // cloneSilence returns a shallow copy of a silence.
   509  func cloneSilence(sil *pb.Silence) *pb.Silence {
   510  	s := *sil
   511  	return &s
   512  }
   513  
   514  func (s *Silences) getSilence(id string) (*pb.Silence, bool) {
   515  	msil, ok := s.st[id]
   516  	if !ok {
   517  		return nil, false
   518  	}
   519  	return msil.Silence, true
   520  }
   521  
   522  func (s *Silences) setSilence(sil *pb.Silence, now time.Time) error {
   523  	sil.UpdatedAt = now
   524  
   525  	if err := validateSilence(sil); err != nil {
   526  		return errors.Wrap(err, "silence invalid")
   527  	}
   528  
   529  	msil := &pb.MeshSilence{
   530  		Silence:   sil,
   531  		ExpiresAt: sil.EndsAt.Add(s.retention),
   532  	}
   533  	b, err := marshalMeshSilence(msil)
   534  	if err != nil {
   535  		return err
   536  	}
   537  
   538  	if s.st.merge(msil, now) {
   539  		s.version++
   540  	}
   541  	s.broadcast(b)
   542  
   543  	return nil
   544  }
   545  
   546  // Set the specified silence. If a silence with the ID already exists and the modification
   547  // modifies history, the old silence gets expired and a new one is created.
   548  func (s *Silences) Set(sil *pb.Silence) (string, error) {
   549  	s.mtx.Lock()
   550  	defer s.mtx.Unlock()
   551  
   552  	now := s.nowUTC()
   553  	prev, ok := s.getSilence(sil.Id)
   554  
   555  	if sil.Id != "" && !ok {
   556  		return "", ErrNotFound
   557  	}
   558  	if ok {
   559  		if canUpdate(prev, sil, now) {
   560  			return sil.Id, s.setSilence(sil, now)
   561  		}
   562  		if getState(prev, s.nowUTC()) != types.SilenceStateExpired {
   563  			// We cannot update the silence, expire the old one.
   564  			if err := s.expire(prev.Id); err != nil {
   565  				return "", errors.Wrap(err, "expire previous silence")
   566  			}
   567  		}
   568  	}
   569  	// If we got here it's either a new silence or a replacing one.
   570  	uid, err := uuid.NewV4()
   571  	if err != nil {
   572  		return "", errors.Wrap(err, "generate uuid")
   573  	}
   574  	sil.Id = uid.String()
   575  
   576  	if sil.StartsAt.Before(now) {
   577  		sil.StartsAt = now
   578  	}
   579  
   580  	return sil.Id, s.setSilence(sil, now)
   581  }
   582  
   583  // canUpdate returns true if silence a can be updated to b without
   584  // affecting the historic view of silencing.
   585  func canUpdate(a, b *pb.Silence, now time.Time) bool {
   586  	if !reflect.DeepEqual(a.Matchers, b.Matchers) {
   587  		return false
   588  	}
   589  	// Allowed timestamp modifications depend on the current time.
   590  	switch st := getState(a, now); st {
   591  	case types.SilenceStateActive:
   592  		if b.StartsAt.Unix() != a.StartsAt.Unix() {
   593  			return false
   594  		}
   595  		if b.EndsAt.Before(now) {
   596  			return false
   597  		}
   598  	case types.SilenceStatePending:
   599  		if b.StartsAt.Before(now) {
   600  			return false
   601  		}
   602  	case types.SilenceStateExpired:
   603  		return false
   604  	default:
   605  		panic("unknown silence state")
   606  	}
   607  	return true
   608  }
   609  
   610  // Expire the silence with the given ID immediately.
   611  func (s *Silences) Expire(id string) error {
   612  	s.mtx.Lock()
   613  	defer s.mtx.Unlock()
   614  	return s.expire(id)
   615  }
   616  
   617  // Expire the silence with the given ID immediately.
   618  // It is idempotent, nil is returned if the silence already expired before it is GC'd.
   619  // If the silence is not found an error is returned.
   620  func (s *Silences) expire(id string) error {
   621  	sil, ok := s.getSilence(id)
   622  	if !ok {
   623  		return ErrNotFound
   624  	}
   625  	sil = cloneSilence(sil)
   626  	now := s.nowUTC()
   627  
   628  	switch getState(sil, now) {
   629  	case types.SilenceStateExpired:
   630  		return nil
   631  	case types.SilenceStateActive:
   632  		sil.EndsAt = now
   633  	case types.SilenceStatePending:
   634  		// Set both to now to make Silence move to "expired" state
   635  		sil.StartsAt = now
   636  		sil.EndsAt = now
   637  	}
   638  
   639  	return s.setSilence(sil, now)
   640  }
   641  
   642  // QueryParam expresses parameters along which silences are queried.
   643  type QueryParam func(*query) error
   644  
   645  type query struct {
   646  	ids     []string
   647  	filters []silenceFilter
   648  }
   649  
   650  // silenceFilter is a function that returns true if a silence
   651  // should be dropped from a result set for a given time.
   652  type silenceFilter func(*pb.Silence, *Silences, time.Time) (bool, error)
   653  
   654  // QIDs configures a query to select the given silence IDs.
   655  func QIDs(ids ...string) QueryParam {
   656  	return func(q *query) error {
   657  		q.ids = append(q.ids, ids...)
   658  		return nil
   659  	}
   660  }
   661  
   662  // QMatches returns silences that match the given label set.
   663  func QMatches(set model.LabelSet) QueryParam {
   664  	return func(q *query) error {
   665  		f := func(sil *pb.Silence, s *Silences, _ time.Time) (bool, error) {
   666  			m, err := s.mc.Get(sil)
   667  			if err != nil {
   668  				return true, err
   669  			}
   670  			return m.Matches(set), nil
   671  		}
   672  		q.filters = append(q.filters, f)
   673  		return nil
   674  	}
   675  }
   676  
   677  // getState returns a silence's SilenceState at the given timestamp.
   678  func getState(sil *pb.Silence, ts time.Time) types.SilenceState {
   679  	if ts.Before(sil.StartsAt) {
   680  		return types.SilenceStatePending
   681  	}
   682  	if ts.After(sil.EndsAt) {
   683  		return types.SilenceStateExpired
   684  	}
   685  	return types.SilenceStateActive
   686  }
   687  
   688  // QState filters queried silences by the given states.
   689  func QState(states ...types.SilenceState) QueryParam {
   690  	return func(q *query) error {
   691  		f := func(sil *pb.Silence, _ *Silences, now time.Time) (bool, error) {
   692  			s := getState(sil, now)
   693  
   694  			for _, ps := range states {
   695  				if s == ps {
   696  					return true, nil
   697  				}
   698  			}
   699  			return false, nil
   700  		}
   701  		q.filters = append(q.filters, f)
   702  		return nil
   703  	}
   704  }
   705  
   706  // QueryOne queries with the given parameters and returns the first result.
   707  // Returns ErrNotFound if the query result is empty.
   708  func (s *Silences) QueryOne(params ...QueryParam) (*pb.Silence, error) {
   709  	res, _, err := s.Query(params...)
   710  	if err != nil {
   711  		return nil, err
   712  	}
   713  	if len(res) == 0 {
   714  		return nil, ErrNotFound
   715  	}
   716  	return res[0], nil
   717  }
   718  
   719  // Query for silences based on the given query parameters. It returns the
   720  // resulting silences and the state version the result is based on.
   721  func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, int, error) {
   722  	s.metrics.queriesTotal.Inc()
   723  	defer prometheus.NewTimer(s.metrics.queryDuration).ObserveDuration()
   724  
   725  	q := &query{}
   726  	for _, p := range params {
   727  		if err := p(q); err != nil {
   728  			s.metrics.queryErrorsTotal.Inc()
   729  			return nil, s.Version(), err
   730  		}
   731  	}
   732  	sils, version, err := s.query(q, s.nowUTC())
   733  	if err != nil {
   734  		s.metrics.queryErrorsTotal.Inc()
   735  	}
   736  	return sils, version, err
   737  }
   738  
   739  // Version of the silence state.
   740  func (s *Silences) Version() int {
   741  	s.mtx.RLock()
   742  	defer s.mtx.RUnlock()
   743  	return s.version
   744  }
   745  
   746  // CountState counts silences by state.
   747  func (s *Silences) CountState(states ...types.SilenceState) (int, error) {
   748  	// This could probably be optimized.
   749  	sils, _, err := s.Query(QState(states...))
   750  	if err != nil {
   751  		return -1, err
   752  	}
   753  	return len(sils), nil
   754  }
   755  
   756  func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, int, error) {
   757  	// If we have no ID constraint, all silences are our base set.  This and
   758  	// the use of post-filter functions is the trivial solution for now.
   759  	var res []*pb.Silence
   760  
   761  	s.mtx.Lock()
   762  	defer s.mtx.Unlock()
   763  
   764  	if q.ids != nil {
   765  		for _, id := range q.ids {
   766  			if s, ok := s.st[id]; ok {
   767  				res = append(res, s.Silence)
   768  			}
   769  		}
   770  	} else {
   771  		for _, sil := range s.st {
   772  			res = append(res, sil.Silence)
   773  		}
   774  	}
   775  
   776  	var resf []*pb.Silence
   777  	for _, sil := range res {
   778  		remove := false
   779  		for _, f := range q.filters {
   780  			ok, err := f(sil, s, now)
   781  			if err != nil {
   782  				return nil, s.version, err
   783  			}
   784  			if !ok {
   785  				remove = true
   786  				break
   787  			}
   788  		}
   789  		if !remove {
   790  			resf = append(resf, cloneSilence(sil))
   791  		}
   792  	}
   793  
   794  	return resf, s.version, nil
   795  }
   796  
   797  // loadSnapshot loads a snapshot generated by Snapshot() into the state.
   798  // Any previous state is wiped.
   799  func (s *Silences) loadSnapshot(r io.Reader) error {
   800  	st, err := decodeState(r)
   801  	if err != nil {
   802  		return err
   803  	}
   804  	for _, e := range st {
   805  		// Comments list was moved to a single comment. Upgrade on loading the snapshot.
   806  		if len(e.Silence.Comments) > 0 {
   807  			e.Silence.Comment = e.Silence.Comments[0].Comment
   808  			e.Silence.CreatedBy = e.Silence.Comments[0].Author
   809  			e.Silence.Comments = nil
   810  		}
   811  		st[e.Silence.Id] = e
   812  	}
   813  	s.mtx.Lock()
   814  	s.st = st
   815  	s.version++
   816  	s.mtx.Unlock()
   817  
   818  	return nil
   819  }
   820  
   821  // Snapshot writes the full internal state into the writer and returns the number of bytes
   822  // written.
   823  func (s *Silences) Snapshot(w io.Writer) (int64, error) {
   824  	start := time.Now()
   825  	defer func() { s.metrics.snapshotDuration.Observe(time.Since(start).Seconds()) }()
   826  
   827  	s.mtx.RLock()
   828  	defer s.mtx.RUnlock()
   829  
   830  	b, err := s.st.MarshalBinary()
   831  	if err != nil {
   832  		return 0, err
   833  	}
   834  
   835  	return io.Copy(w, bytes.NewReader(b))
   836  }
   837  
   838  // MarshalBinary serializes all silences.
   839  func (s *Silences) MarshalBinary() ([]byte, error) {
   840  	s.mtx.Lock()
   841  	defer s.mtx.Unlock()
   842  
   843  	return s.st.MarshalBinary()
   844  }
   845  
   846  // Merge merges silence state received from the cluster with the local state.
   847  func (s *Silences) Merge(b []byte) error {
   848  	st, err := decodeState(bytes.NewReader(b))
   849  	if err != nil {
   850  		return err
   851  	}
   852  	s.mtx.Lock()
   853  	defer s.mtx.Unlock()
   854  
   855  	now := s.nowUTC()
   856  
   857  	for _, e := range st {
   858  		if merged := s.st.merge(e, now); merged {
   859  			s.version++
   860  			if !cluster.OversizedMessage(b) {
   861  				// If this is the first we've seen the message and it's
   862  				// not oversized, gossip it to other nodes. We don't
   863  				// propagate oversized messages because they're sent to
   864  				// all nodes already.
   865  				s.broadcast(b)
   866  				s.metrics.propagatedMessagesTotal.Inc()
   867  				level.Debug(s.logger).Log("msg", "Gossiping new silence", "silence", e)
   868  			}
   869  		}
   870  	}
   871  	return nil
   872  }
   873  
   874  // SetBroadcast sets the provided function as the one creating data to be
   875  // broadcast.
   876  func (s *Silences) SetBroadcast(f func([]byte)) {
   877  	s.mtx.Lock()
   878  	s.broadcast = f
   879  	s.mtx.Unlock()
   880  }
   881  
   882  type state map[string]*pb.MeshSilence
   883  
   884  func (s state) merge(e *pb.MeshSilence, now time.Time) bool {
   885  	id := e.Silence.Id
   886  	if e.ExpiresAt.Before(now) {
   887  		return false
   888  	}
   889  	// Comments list was moved to a single comment. Apply upgrade
   890  	// on silences received from peers.
   891  	if len(e.Silence.Comments) > 0 {
   892  		e.Silence.Comment = e.Silence.Comments[0].Comment
   893  		e.Silence.CreatedBy = e.Silence.Comments[0].Author
   894  		e.Silence.Comments = nil
   895  	}
   896  
   897  	prev, ok := s[id]
   898  	if !ok || prev.Silence.UpdatedAt.Before(e.Silence.UpdatedAt) {
   899  		s[id] = e
   900  		return true
   901  	}
   902  	return false
   903  }
   904  
   905  func (s state) MarshalBinary() ([]byte, error) {
   906  	var buf bytes.Buffer
   907  
   908  	for _, e := range s {
   909  		if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
   910  			return nil, err
   911  		}
   912  	}
   913  	return buf.Bytes(), nil
   914  }
   915  
   916  func decodeState(r io.Reader) (state, error) {
   917  	st := state{}
   918  	for {
   919  		var s pb.MeshSilence
   920  		_, err := pbutil.ReadDelimited(r, &s)
   921  		if err == nil {
   922  			if s.Silence == nil {
   923  				return nil, ErrInvalidState
   924  			}
   925  			st[s.Silence.Id] = &s
   926  			continue
   927  		}
   928  		if err == io.EOF {
   929  			break
   930  		}
   931  		return nil, err
   932  	}
   933  	return st, nil
   934  }
   935  
   936  func marshalMeshSilence(e *pb.MeshSilence) ([]byte, error) {
   937  	var buf bytes.Buffer
   938  	if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
   939  		return nil, err
   940  	}
   941  	return buf.Bytes(), nil
   942  }
   943  
   944  // replaceFile wraps a file that is moved to another filename on closing.
   945  type replaceFile struct {
   946  	*os.File
   947  	filename string
   948  }
   949  
   950  func (f *replaceFile) Close() error {
   951  	if err := f.File.Sync(); err != nil {
   952  		return err
   953  	}
   954  	if err := f.File.Close(); err != nil {
   955  		return err
   956  	}
   957  	return os.Rename(f.File.Name(), f.filename)
   958  }
   959  
   960  // openReplace opens a new temporary file that is moved to filename on closing.
   961  func openReplace(filename string) (*replaceFile, error) {
   962  	tmpFilename := fmt.Sprintf("%s.%x", filename, uint64(rand.Int63()))
   963  
   964  	f, err := os.Create(tmpFilename)
   965  	if err != nil {
   966  		return nil, err
   967  	}
   968  
   969  	rf := &replaceFile{
   970  		File:     f,
   971  		filename: filename,
   972  	}
   973  	return rf, nil
   974  }
   975  

View as plain text