...

Source file src/github.com/prometheus/alertmanager/cli/silence_import.go

Documentation: github.com/prometheus/alertmanager/cli

     1  // Copyright 2018 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package cli
    15  
    16  import (
    17  	"context"
    18  	"encoding/json"
    19  	"fmt"
    20  	"os"
    21  	"sync"
    22  
    23  	"github.com/pkg/errors"
    24  	kingpin "gopkg.in/alecthomas/kingpin.v2"
    25  
    26  	"github.com/prometheus/alertmanager/api/v2/client/silence"
    27  	"github.com/prometheus/alertmanager/api/v2/models"
    28  )
    29  
    30  type silenceImportCmd struct {
    31  	force   bool
    32  	workers int
    33  	file    string
    34  }
    35  
    36  const silenceImportHelp = `Import alertmanager silences from JSON file or stdin
    37  
    38  This command can be used to bulk import silences from a JSON file
    39  created by query command. For example:
    40  
    41  amtool silence query -o json foo > foo.json
    42  
    43  amtool silence import foo.json
    44  
    45  JSON data can also come from stdin if no param is specified.
    46  `
    47  
    48  func configureSilenceImportCmd(cc *kingpin.CmdClause) {
    49  	var (
    50  		c         = &silenceImportCmd{}
    51  		importCmd = cc.Command("import", silenceImportHelp)
    52  	)
    53  
    54  	importCmd.Flag("force", "Force adding new silences even if it already exists").Short('f').BoolVar(&c.force)
    55  	importCmd.Flag("worker", "Number of concurrent workers to use for import").Short('w').Default("8").IntVar(&c.workers)
    56  	importCmd.Arg("input-file", "JSON file with silences").ExistingFileVar(&c.file)
    57  	importCmd.Action(execWithTimeout(c.bulkImport))
    58  }
    59  
    60  func addSilenceWorker(ctx context.Context, sclient silence.ClientService, silencec <-chan *models.PostableSilence, errc chan<- error) {
    61  	for s := range silencec {
    62  		sid := s.ID
    63  		params := silence.NewPostSilencesParams().WithContext(ctx).WithSilence(s)
    64  		postOk, err := sclient.PostSilences(params)
    65  		if _, ok := err.(*silence.PostSilencesNotFound); ok {
    66  			// silence doesn't exists yet, retry to create as a new one
    67  			params.Silence.ID = ""
    68  			postOk, err = sclient.PostSilences(params)
    69  		}
    70  
    71  		if err != nil {
    72  			fmt.Fprintf(os.Stderr, "Error adding silence id='%v': %v\n", sid, err)
    73  		} else {
    74  			fmt.Println(postOk.Payload.SilenceID)
    75  		}
    76  		errc <- err
    77  	}
    78  }
    79  
    80  func (c *silenceImportCmd) bulkImport(ctx context.Context, _ *kingpin.ParseContext) error {
    81  	input := os.Stdin
    82  	var err error
    83  	if c.file != "" {
    84  		input, err = os.Open(c.file)
    85  		if err != nil {
    86  			return err
    87  		}
    88  		defer input.Close()
    89  	}
    90  
    91  	dec := json.NewDecoder(input)
    92  	// read open square bracket
    93  	_, err = dec.Token()
    94  	if err != nil {
    95  		return errors.Wrap(err, "couldn't unmarshal input data, is it JSON?")
    96  	}
    97  
    98  	amclient := NewAlertmanagerClient(alertmanagerURL)
    99  	silencec := make(chan *models.PostableSilence, 100)
   100  	errc := make(chan error, 100)
   101  	var wg sync.WaitGroup
   102  	for w := 0; w < c.workers; w++ {
   103  		wg.Add(1)
   104  		go func() {
   105  			addSilenceWorker(ctx, amclient.Silence, silencec, errc)
   106  			wg.Done()
   107  		}()
   108  	}
   109  
   110  	errCount := 0
   111  	go func() {
   112  		for err := range errc {
   113  			if err != nil {
   114  				errCount++
   115  			}
   116  		}
   117  	}()
   118  
   119  	count := 0
   120  	for dec.More() {
   121  		var s models.PostableSilence
   122  		err := dec.Decode(&s)
   123  		if err != nil {
   124  			return errors.Wrap(err, "couldn't unmarshal input data, is it JSON?")
   125  		}
   126  
   127  		if c.force {
   128  			// reset the silence ID so Alertmanager will always create new silence
   129  			s.ID = ""
   130  		}
   131  
   132  		silencec <- &s
   133  		count++
   134  	}
   135  
   136  	close(silencec)
   137  	wg.Wait()
   138  	close(errc)
   139  
   140  	if errCount > 0 {
   141  		return fmt.Errorf("couldn't import %v out of %v silences", errCount, count)
   142  	}
   143  	return nil
   144  }
   145  

View as plain text