1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package silence
17
18 import (
19 "bytes"
20 "fmt"
21 "io"
22 "math/rand"
23 "os"
24 "reflect"
25 "regexp"
26 "sort"
27 "sync"
28 "time"
29
30 "github.com/benbjohnson/clock"
31 "github.com/go-kit/log"
32 "github.com/go-kit/log/level"
33 uuid "github.com/gofrs/uuid"
34 "github.com/matttproud/golang_protobuf_extensions/pbutil"
35 "github.com/pkg/errors"
36 "github.com/prometheus/client_golang/prometheus"
37 "github.com/prometheus/common/model"
38
39 "github.com/prometheus/alertmanager/cluster"
40 "github.com/prometheus/alertmanager/pkg/labels"
41 pb "github.com/prometheus/alertmanager/silence/silencepb"
42 "github.com/prometheus/alertmanager/types"
43 )
44
45
46 var ErrNotFound = fmt.Errorf("silence not found")
47
48
49 var ErrInvalidState = fmt.Errorf("invalid state")
50
51 type matcherCache map[*pb.Silence]labels.Matchers
52
53
54
55
56 func (c matcherCache) Get(s *pb.Silence) (labels.Matchers, error) {
57 if m, ok := c[s]; ok {
58 return m, nil
59 }
60 return c.add(s)
61 }
62
63
64
65 func (c matcherCache) add(s *pb.Silence) (labels.Matchers, error) {
66 ms := make(labels.Matchers, len(s.Matchers))
67
68 for i, m := range s.Matchers {
69 var mt labels.MatchType
70 switch m.Type {
71 case pb.Matcher_EQUAL:
72 mt = labels.MatchEqual
73 case pb.Matcher_NOT_EQUAL:
74 mt = labels.MatchNotEqual
75 case pb.Matcher_REGEXP:
76 mt = labels.MatchRegexp
77 case pb.Matcher_NOT_REGEXP:
78 mt = labels.MatchNotRegexp
79 default:
80 return nil, errors.Errorf("unknown matcher type %q", m.Type)
81 }
82 matcher, err := labels.NewMatcher(mt, m.Name, m.Pattern)
83 if err != nil {
84 return nil, err
85 }
86
87 ms[i] = matcher
88 }
89
90 c[s] = ms
91 return ms, nil
92 }
93
94
95
96 type Silencer struct {
97 silences *Silences
98 marker types.Marker
99 logger log.Logger
100 }
101
102
103 func NewSilencer(s *Silences, m types.Marker, l log.Logger) *Silencer {
104 return &Silencer{
105 silences: s,
106 marker: m,
107 logger: l,
108 }
109 }
110
111
112 func (s *Silencer) Mutes(lset model.LabelSet) bool {
113 fp := lset.Fingerprint()
114 activeIDs, pendingIDs, markerVersion, _ := s.marker.Silenced(fp)
115
116 var (
117 err error
118 allSils []*pb.Silence
119 newVersion = markerVersion
120 )
121 if markerVersion == s.silences.Version() {
122 totalSilences := len(activeIDs) + len(pendingIDs)
123
124
125
126 if totalSilences == 0 {
127
128
129 return false
130 }
131
132
133
134
135
136
137 allIDs := append(append(make([]string, 0, totalSilences), activeIDs...), pendingIDs...)
138 allSils, _, err = s.silences.Query(
139 QIDs(allIDs...),
140 QState(types.SilenceStateActive, types.SilenceStatePending),
141 )
142 } else {
143
144 allSils, newVersion, err = s.silences.Query(
145 QState(types.SilenceStateActive, types.SilenceStatePending),
146 QMatches(lset),
147 )
148 }
149 if err != nil {
150 level.Error(s.logger).Log("msg", "Querying silences failed, alerts might not get silenced correctly", "err", err)
151 }
152 if len(allSils) == 0 {
153
154 s.marker.SetActiveOrSilenced(fp, newVersion, nil, nil)
155 return false
156 }
157
158
159
160
161 activeIDs, pendingIDs = nil, nil
162 now := s.silences.nowUTC()
163 for _, sil := range allSils {
164 switch getState(sil, now) {
165 case types.SilenceStatePending:
166 pendingIDs = append(pendingIDs, sil.Id)
167 case types.SilenceStateActive:
168 activeIDs = append(activeIDs, sil.Id)
169 default:
170
171 }
172 }
173 level.Debug(s.logger).Log(
174 "msg", "determined current silences state",
175 "now", now,
176 "total", len(allSils),
177 "active", len(activeIDs),
178 "pending", len(pendingIDs),
179 )
180 sort.Strings(activeIDs)
181 sort.Strings(pendingIDs)
182
183 s.marker.SetActiveOrSilenced(fp, newVersion, activeIDs, pendingIDs)
184
185 return len(activeIDs) > 0
186 }
187
188
189 type Silences struct {
190 clock clock.Clock
191
192 logger log.Logger
193 metrics *metrics
194 retention time.Duration
195
196 mtx sync.RWMutex
197 st state
198 version int
199 broadcast func([]byte)
200 mc matcherCache
201 }
202
203
204
205 type MaintenanceFunc func() (int64, error)
206
207 type metrics struct {
208 gcDuration prometheus.Summary
209 snapshotDuration prometheus.Summary
210 snapshotSize prometheus.Gauge
211 queriesTotal prometheus.Counter
212 queryErrorsTotal prometheus.Counter
213 queryDuration prometheus.Histogram
214 silencesActive prometheus.GaugeFunc
215 silencesPending prometheus.GaugeFunc
216 silencesExpired prometheus.GaugeFunc
217 propagatedMessagesTotal prometheus.Counter
218 }
219
220 func newSilenceMetricByState(s *Silences, st types.SilenceState) prometheus.GaugeFunc {
221 return prometheus.NewGaugeFunc(
222 prometheus.GaugeOpts{
223 Name: "alertmanager_silences",
224 Help: "How many silences by state.",
225 ConstLabels: prometheus.Labels{"state": string(st)},
226 },
227 func() float64 {
228 count, err := s.CountState(st)
229 if err != nil {
230 level.Error(s.logger).Log("msg", "Counting silences failed", "err", err)
231 }
232 return float64(count)
233 },
234 )
235 }
236
237 func newMetrics(r prometheus.Registerer, s *Silences) *metrics {
238 m := &metrics{}
239
240 m.gcDuration = prometheus.NewSummary(prometheus.SummaryOpts{
241 Name: "alertmanager_silences_gc_duration_seconds",
242 Help: "Duration of the last silence garbage collection cycle.",
243 Objectives: map[float64]float64{},
244 })
245 m.snapshotDuration = prometheus.NewSummary(prometheus.SummaryOpts{
246 Name: "alertmanager_silences_snapshot_duration_seconds",
247 Help: "Duration of the last silence snapshot.",
248 Objectives: map[float64]float64{},
249 })
250 m.snapshotSize = prometheus.NewGauge(prometheus.GaugeOpts{
251 Name: "alertmanager_silences_snapshot_size_bytes",
252 Help: "Size of the last silence snapshot in bytes.",
253 })
254 m.queriesTotal = prometheus.NewCounter(prometheus.CounterOpts{
255 Name: "alertmanager_silences_queries_total",
256 Help: "How many silence queries were received.",
257 })
258 m.queryErrorsTotal = prometheus.NewCounter(prometheus.CounterOpts{
259 Name: "alertmanager_silences_query_errors_total",
260 Help: "How many silence received queries did not succeed.",
261 })
262 m.queryDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
263 Name: "alertmanager_silences_query_duration_seconds",
264 Help: "Duration of silence query evaluation.",
265 })
266 m.propagatedMessagesTotal = prometheus.NewCounter(prometheus.CounterOpts{
267 Name: "alertmanager_silences_gossip_messages_propagated_total",
268 Help: "Number of received gossip messages that have been further gossiped.",
269 })
270 if s != nil {
271 m.silencesActive = newSilenceMetricByState(s, types.SilenceStateActive)
272 m.silencesPending = newSilenceMetricByState(s, types.SilenceStatePending)
273 m.silencesExpired = newSilenceMetricByState(s, types.SilenceStateExpired)
274 }
275
276 if r != nil {
277 r.MustRegister(
278 m.gcDuration,
279 m.snapshotDuration,
280 m.snapshotSize,
281 m.queriesTotal,
282 m.queryErrorsTotal,
283 m.queryDuration,
284 m.silencesActive,
285 m.silencesPending,
286 m.silencesExpired,
287 m.propagatedMessagesTotal,
288 )
289 }
290 return m
291 }
292
293
294
295 type Options struct {
296
297
298 SnapshotFile string
299 SnapshotReader io.Reader
300
301
302
303 Retention time.Duration
304
305
306 Logger log.Logger
307 Metrics prometheus.Registerer
308 }
309
310 func (o *Options) validate() error {
311 if o.SnapshotFile != "" && o.SnapshotReader != nil {
312 return fmt.Errorf("only one of SnapshotFile and SnapshotReader must be set")
313 }
314 return nil
315 }
316
317
318 func New(o Options) (*Silences, error) {
319 if err := o.validate(); err != nil {
320 return nil, err
321 }
322 if o.SnapshotFile != "" {
323 if r, err := os.Open(o.SnapshotFile); err != nil {
324 if !os.IsNotExist(err) {
325 return nil, err
326 }
327 } else {
328 o.SnapshotReader = r
329 defer r.Close()
330 }
331 }
332 s := &Silences{
333 clock: clock.New(),
334 mc: matcherCache{},
335 logger: log.NewNopLogger(),
336 retention: o.Retention,
337 broadcast: func([]byte) {},
338 st: state{},
339 }
340 s.metrics = newMetrics(o.Metrics, s)
341
342 if o.Logger != nil {
343 s.logger = o.Logger
344 }
345 if o.SnapshotReader != nil {
346 if err := s.loadSnapshot(o.SnapshotReader); err != nil {
347 return s, err
348 }
349 }
350 return s, nil
351 }
352
353 func (s *Silences) nowUTC() time.Time {
354 return s.clock.Now().UTC()
355 }
356
357
358
359
360
361 func (s *Silences) Maintenance(interval time.Duration, snapf string, stopc <-chan struct{}, override MaintenanceFunc) {
362 t := s.clock.Ticker(interval)
363 defer t.Stop()
364
365 var doMaintenance MaintenanceFunc
366 doMaintenance = func() (int64, error) {
367 var size int64
368
369 if _, err := s.GC(); err != nil {
370 return size, err
371 }
372 if snapf == "" {
373 return size, nil
374 }
375 f, err := openReplace(snapf)
376 if err != nil {
377 return size, err
378 }
379 if size, err = s.Snapshot(f); err != nil {
380 return size, err
381 }
382 return size, f.Close()
383 }
384
385 if override != nil {
386 doMaintenance = override
387 }
388
389 runMaintenance := func(do MaintenanceFunc) error {
390 start := s.nowUTC()
391 level.Debug(s.logger).Log("msg", "Running maintenance")
392 size, err := do()
393 level.Debug(s.logger).Log("msg", "Maintenance done", "duration", s.clock.Since(start), "size", size)
394 s.metrics.snapshotSize.Set(float64(size))
395 return err
396 }
397
398 Loop:
399 for {
400 select {
401 case <-stopc:
402 break Loop
403 case <-t.C:
404 if err := runMaintenance(doMaintenance); err != nil {
405 level.Info(s.logger).Log("msg", "Running maintenance failed", "err", err)
406 }
407 }
408 }
409
410 if snapf == "" {
411 return
412 }
413 if err := runMaintenance(doMaintenance); err != nil {
414 level.Info(s.logger).Log("msg", "Creating shutdown snapshot failed", "err", err)
415 }
416 }
417
418
419
420 func (s *Silences) GC() (int, error) {
421 start := time.Now()
422 defer func() { s.metrics.gcDuration.Observe(time.Since(start).Seconds()) }()
423
424 now := s.nowUTC()
425 var n int
426
427 s.mtx.Lock()
428 defer s.mtx.Unlock()
429
430 for id, sil := range s.st {
431 if sil.ExpiresAt.IsZero() {
432 return n, errors.New("unexpected zero expiration timestamp")
433 }
434 if !sil.ExpiresAt.After(now) {
435 delete(s.st, id)
436 delete(s.mc, sil.Silence)
437 n++
438 }
439 }
440
441 return n, nil
442 }
443
444
445 var ValidateMatcher = func(m *pb.Matcher) error {
446 if !model.LabelName(m.Name).IsValid() {
447 return fmt.Errorf("invalid label name %q", m.Name)
448 }
449 switch m.Type {
450 case pb.Matcher_EQUAL, pb.Matcher_NOT_EQUAL:
451 if !model.LabelValue(m.Pattern).IsValid() {
452 return fmt.Errorf("invalid label value %q", m.Pattern)
453 }
454 case pb.Matcher_REGEXP, pb.Matcher_NOT_REGEXP:
455 if _, err := regexp.Compile(m.Pattern); err != nil {
456 return fmt.Errorf("invalid regular expression %q: %s", m.Pattern, err)
457 }
458 default:
459 return fmt.Errorf("unknown matcher type %q", m.Type)
460 }
461 return nil
462 }
463
464 func matchesEmpty(m *pb.Matcher) bool {
465 switch m.Type {
466 case pb.Matcher_EQUAL:
467 return m.Pattern == ""
468 case pb.Matcher_REGEXP:
469 matched, _ := regexp.MatchString(m.Pattern, "")
470 return matched
471 default:
472 return false
473 }
474 }
475
476 func validateSilence(s *pb.Silence) error {
477 if s.Id == "" {
478 return errors.New("ID missing")
479 }
480 if len(s.Matchers) == 0 {
481 return errors.New("at least one matcher required")
482 }
483 allMatchEmpty := true
484 for i, m := range s.Matchers {
485 if err := ValidateMatcher(m); err != nil {
486 return fmt.Errorf("invalid label matcher %d: %s", i, err)
487 }
488 allMatchEmpty = allMatchEmpty && matchesEmpty(m)
489 }
490 if allMatchEmpty {
491 return errors.New("at least one matcher must not match the empty string")
492 }
493 if s.StartsAt.IsZero() {
494 return errors.New("invalid zero start timestamp")
495 }
496 if s.EndsAt.IsZero() {
497 return errors.New("invalid zero end timestamp")
498 }
499 if s.EndsAt.Before(s.StartsAt) {
500 return errors.New("end time must not be before start time")
501 }
502 if s.UpdatedAt.IsZero() {
503 return errors.New("invalid zero update timestamp")
504 }
505 return nil
506 }
507
508
509 func cloneSilence(sil *pb.Silence) *pb.Silence {
510 s := *sil
511 return &s
512 }
513
514 func (s *Silences) getSilence(id string) (*pb.Silence, bool) {
515 msil, ok := s.st[id]
516 if !ok {
517 return nil, false
518 }
519 return msil.Silence, true
520 }
521
522 func (s *Silences) setSilence(sil *pb.Silence, now time.Time) error {
523 sil.UpdatedAt = now
524
525 if err := validateSilence(sil); err != nil {
526 return errors.Wrap(err, "silence invalid")
527 }
528
529 msil := &pb.MeshSilence{
530 Silence: sil,
531 ExpiresAt: sil.EndsAt.Add(s.retention),
532 }
533 b, err := marshalMeshSilence(msil)
534 if err != nil {
535 return err
536 }
537
538 if s.st.merge(msil, now) {
539 s.version++
540 }
541 s.broadcast(b)
542
543 return nil
544 }
545
546
547
548 func (s *Silences) Set(sil *pb.Silence) (string, error) {
549 s.mtx.Lock()
550 defer s.mtx.Unlock()
551
552 now := s.nowUTC()
553 prev, ok := s.getSilence(sil.Id)
554
555 if sil.Id != "" && !ok {
556 return "", ErrNotFound
557 }
558 if ok {
559 if canUpdate(prev, sil, now) {
560 return sil.Id, s.setSilence(sil, now)
561 }
562 if getState(prev, s.nowUTC()) != types.SilenceStateExpired {
563
564 if err := s.expire(prev.Id); err != nil {
565 return "", errors.Wrap(err, "expire previous silence")
566 }
567 }
568 }
569
570 uid, err := uuid.NewV4()
571 if err != nil {
572 return "", errors.Wrap(err, "generate uuid")
573 }
574 sil.Id = uid.String()
575
576 if sil.StartsAt.Before(now) {
577 sil.StartsAt = now
578 }
579
580 return sil.Id, s.setSilence(sil, now)
581 }
582
583
584
585 func canUpdate(a, b *pb.Silence, now time.Time) bool {
586 if !reflect.DeepEqual(a.Matchers, b.Matchers) {
587 return false
588 }
589
590 switch st := getState(a, now); st {
591 case types.SilenceStateActive:
592 if b.StartsAt.Unix() != a.StartsAt.Unix() {
593 return false
594 }
595 if b.EndsAt.Before(now) {
596 return false
597 }
598 case types.SilenceStatePending:
599 if b.StartsAt.Before(now) {
600 return false
601 }
602 case types.SilenceStateExpired:
603 return false
604 default:
605 panic("unknown silence state")
606 }
607 return true
608 }
609
610
611 func (s *Silences) Expire(id string) error {
612 s.mtx.Lock()
613 defer s.mtx.Unlock()
614 return s.expire(id)
615 }
616
617
618
619
620 func (s *Silences) expire(id string) error {
621 sil, ok := s.getSilence(id)
622 if !ok {
623 return ErrNotFound
624 }
625 sil = cloneSilence(sil)
626 now := s.nowUTC()
627
628 switch getState(sil, now) {
629 case types.SilenceStateExpired:
630 return nil
631 case types.SilenceStateActive:
632 sil.EndsAt = now
633 case types.SilenceStatePending:
634
635 sil.StartsAt = now
636 sil.EndsAt = now
637 }
638
639 return s.setSilence(sil, now)
640 }
641
642
643 type QueryParam func(*query) error
644
645 type query struct {
646 ids []string
647 filters []silenceFilter
648 }
649
650
651
652 type silenceFilter func(*pb.Silence, *Silences, time.Time) (bool, error)
653
654
655 func QIDs(ids ...string) QueryParam {
656 return func(q *query) error {
657 q.ids = append(q.ids, ids...)
658 return nil
659 }
660 }
661
662
663 func QMatches(set model.LabelSet) QueryParam {
664 return func(q *query) error {
665 f := func(sil *pb.Silence, s *Silences, _ time.Time) (bool, error) {
666 m, err := s.mc.Get(sil)
667 if err != nil {
668 return true, err
669 }
670 return m.Matches(set), nil
671 }
672 q.filters = append(q.filters, f)
673 return nil
674 }
675 }
676
677
678 func getState(sil *pb.Silence, ts time.Time) types.SilenceState {
679 if ts.Before(sil.StartsAt) {
680 return types.SilenceStatePending
681 }
682 if ts.After(sil.EndsAt) {
683 return types.SilenceStateExpired
684 }
685 return types.SilenceStateActive
686 }
687
688
689 func QState(states ...types.SilenceState) QueryParam {
690 return func(q *query) error {
691 f := func(sil *pb.Silence, _ *Silences, now time.Time) (bool, error) {
692 s := getState(sil, now)
693
694 for _, ps := range states {
695 if s == ps {
696 return true, nil
697 }
698 }
699 return false, nil
700 }
701 q.filters = append(q.filters, f)
702 return nil
703 }
704 }
705
706
707
708 func (s *Silences) QueryOne(params ...QueryParam) (*pb.Silence, error) {
709 res, _, err := s.Query(params...)
710 if err != nil {
711 return nil, err
712 }
713 if len(res) == 0 {
714 return nil, ErrNotFound
715 }
716 return res[0], nil
717 }
718
719
720
721 func (s *Silences) Query(params ...QueryParam) ([]*pb.Silence, int, error) {
722 s.metrics.queriesTotal.Inc()
723 defer prometheus.NewTimer(s.metrics.queryDuration).ObserveDuration()
724
725 q := &query{}
726 for _, p := range params {
727 if err := p(q); err != nil {
728 s.metrics.queryErrorsTotal.Inc()
729 return nil, s.Version(), err
730 }
731 }
732 sils, version, err := s.query(q, s.nowUTC())
733 if err != nil {
734 s.metrics.queryErrorsTotal.Inc()
735 }
736 return sils, version, err
737 }
738
739
740 func (s *Silences) Version() int {
741 s.mtx.RLock()
742 defer s.mtx.RUnlock()
743 return s.version
744 }
745
746
747 func (s *Silences) CountState(states ...types.SilenceState) (int, error) {
748
749 sils, _, err := s.Query(QState(states...))
750 if err != nil {
751 return -1, err
752 }
753 return len(sils), nil
754 }
755
756 func (s *Silences) query(q *query, now time.Time) ([]*pb.Silence, int, error) {
757
758
759 var res []*pb.Silence
760
761 s.mtx.Lock()
762 defer s.mtx.Unlock()
763
764 if q.ids != nil {
765 for _, id := range q.ids {
766 if s, ok := s.st[id]; ok {
767 res = append(res, s.Silence)
768 }
769 }
770 } else {
771 for _, sil := range s.st {
772 res = append(res, sil.Silence)
773 }
774 }
775
776 var resf []*pb.Silence
777 for _, sil := range res {
778 remove := false
779 for _, f := range q.filters {
780 ok, err := f(sil, s, now)
781 if err != nil {
782 return nil, s.version, err
783 }
784 if !ok {
785 remove = true
786 break
787 }
788 }
789 if !remove {
790 resf = append(resf, cloneSilence(sil))
791 }
792 }
793
794 return resf, s.version, nil
795 }
796
797
798
799 func (s *Silences) loadSnapshot(r io.Reader) error {
800 st, err := decodeState(r)
801 if err != nil {
802 return err
803 }
804 for _, e := range st {
805
806 if len(e.Silence.Comments) > 0 {
807 e.Silence.Comment = e.Silence.Comments[0].Comment
808 e.Silence.CreatedBy = e.Silence.Comments[0].Author
809 e.Silence.Comments = nil
810 }
811 st[e.Silence.Id] = e
812 }
813 s.mtx.Lock()
814 s.st = st
815 s.version++
816 s.mtx.Unlock()
817
818 return nil
819 }
820
821
822
823 func (s *Silences) Snapshot(w io.Writer) (int64, error) {
824 start := time.Now()
825 defer func() { s.metrics.snapshotDuration.Observe(time.Since(start).Seconds()) }()
826
827 s.mtx.RLock()
828 defer s.mtx.RUnlock()
829
830 b, err := s.st.MarshalBinary()
831 if err != nil {
832 return 0, err
833 }
834
835 return io.Copy(w, bytes.NewReader(b))
836 }
837
838
839 func (s *Silences) MarshalBinary() ([]byte, error) {
840 s.mtx.Lock()
841 defer s.mtx.Unlock()
842
843 return s.st.MarshalBinary()
844 }
845
846
847 func (s *Silences) Merge(b []byte) error {
848 st, err := decodeState(bytes.NewReader(b))
849 if err != nil {
850 return err
851 }
852 s.mtx.Lock()
853 defer s.mtx.Unlock()
854
855 now := s.nowUTC()
856
857 for _, e := range st {
858 if merged := s.st.merge(e, now); merged {
859 s.version++
860 if !cluster.OversizedMessage(b) {
861
862
863
864
865 s.broadcast(b)
866 s.metrics.propagatedMessagesTotal.Inc()
867 level.Debug(s.logger).Log("msg", "Gossiping new silence", "silence", e)
868 }
869 }
870 }
871 return nil
872 }
873
874
875
876 func (s *Silences) SetBroadcast(f func([]byte)) {
877 s.mtx.Lock()
878 s.broadcast = f
879 s.mtx.Unlock()
880 }
881
882 type state map[string]*pb.MeshSilence
883
884 func (s state) merge(e *pb.MeshSilence, now time.Time) bool {
885 id := e.Silence.Id
886 if e.ExpiresAt.Before(now) {
887 return false
888 }
889
890
891 if len(e.Silence.Comments) > 0 {
892 e.Silence.Comment = e.Silence.Comments[0].Comment
893 e.Silence.CreatedBy = e.Silence.Comments[0].Author
894 e.Silence.Comments = nil
895 }
896
897 prev, ok := s[id]
898 if !ok || prev.Silence.UpdatedAt.Before(e.Silence.UpdatedAt) {
899 s[id] = e
900 return true
901 }
902 return false
903 }
904
905 func (s state) MarshalBinary() ([]byte, error) {
906 var buf bytes.Buffer
907
908 for _, e := range s {
909 if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
910 return nil, err
911 }
912 }
913 return buf.Bytes(), nil
914 }
915
916 func decodeState(r io.Reader) (state, error) {
917 st := state{}
918 for {
919 var s pb.MeshSilence
920 _, err := pbutil.ReadDelimited(r, &s)
921 if err == nil {
922 if s.Silence == nil {
923 return nil, ErrInvalidState
924 }
925 st[s.Silence.Id] = &s
926 continue
927 }
928 if err == io.EOF {
929 break
930 }
931 return nil, err
932 }
933 return st, nil
934 }
935
936 func marshalMeshSilence(e *pb.MeshSilence) ([]byte, error) {
937 var buf bytes.Buffer
938 if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
939 return nil, err
940 }
941 return buf.Bytes(), nil
942 }
943
944
945 type replaceFile struct {
946 *os.File
947 filename string
948 }
949
950 func (f *replaceFile) Close() error {
951 if err := f.File.Sync(); err != nil {
952 return err
953 }
954 if err := f.File.Close(); err != nil {
955 return err
956 }
957 return os.Rename(f.File.Name(), f.filename)
958 }
959
960
961 func openReplace(filename string) (*replaceFile, error) {
962 tmpFilename := fmt.Sprintf("%s.%x", filename, uint64(rand.Int63()))
963
964 f, err := os.Create(tmpFilename)
965 if err != nil {
966 return nil, err
967 }
968
969 rf := &replaceFile{
970 File: f,
971 filename: filename,
972 }
973 return rf, nil
974 }
975
View as plain text