...
1
2
3
4
5
6
7
8
9
10
11
12
13
14 package cli
15
16 import (
17 "context"
18 "encoding/json"
19 "fmt"
20 "os"
21 "sync"
22
23 "github.com/pkg/errors"
24 kingpin "gopkg.in/alecthomas/kingpin.v2"
25
26 "github.com/prometheus/alertmanager/api/v2/client/silence"
27 "github.com/prometheus/alertmanager/api/v2/models"
28 )
29
30 type silenceImportCmd struct {
31 force bool
32 workers int
33 file string
34 }
35
36 const silenceImportHelp = `Import alertmanager silences from JSON file or stdin
37
38 This command can be used to bulk import silences from a JSON file
39 created by query command. For example:
40
41 amtool silence query -o json foo > foo.json
42
43 amtool silence import foo.json
44
45 JSON data can also come from stdin if no param is specified.
46 `
47
48 func configureSilenceImportCmd(cc *kingpin.CmdClause) {
49 var (
50 c = &silenceImportCmd{}
51 importCmd = cc.Command("import", silenceImportHelp)
52 )
53
54 importCmd.Flag("force", "Force adding new silences even if it already exists").Short('f').BoolVar(&c.force)
55 importCmd.Flag("worker", "Number of concurrent workers to use for import").Short('w').Default("8").IntVar(&c.workers)
56 importCmd.Arg("input-file", "JSON file with silences").ExistingFileVar(&c.file)
57 importCmd.Action(execWithTimeout(c.bulkImport))
58 }
59
60 func addSilenceWorker(ctx context.Context, sclient silence.ClientService, silencec <-chan *models.PostableSilence, errc chan<- error) {
61 for s := range silencec {
62 sid := s.ID
63 params := silence.NewPostSilencesParams().WithContext(ctx).WithSilence(s)
64 postOk, err := sclient.PostSilences(params)
65 if _, ok := err.(*silence.PostSilencesNotFound); ok {
66
67 params.Silence.ID = ""
68 postOk, err = sclient.PostSilences(params)
69 }
70
71 if err != nil {
72 fmt.Fprintf(os.Stderr, "Error adding silence id='%v': %v\n", sid, err)
73 } else {
74 fmt.Println(postOk.Payload.SilenceID)
75 }
76 errc <- err
77 }
78 }
79
80 func (c *silenceImportCmd) bulkImport(ctx context.Context, _ *kingpin.ParseContext) error {
81 input := os.Stdin
82 var err error
83 if c.file != "" {
84 input, err = os.Open(c.file)
85 if err != nil {
86 return err
87 }
88 defer input.Close()
89 }
90
91 dec := json.NewDecoder(input)
92
93 _, err = dec.Token()
94 if err != nil {
95 return errors.Wrap(err, "couldn't unmarshal input data, is it JSON?")
96 }
97
98 amclient := NewAlertmanagerClient(alertmanagerURL)
99 silencec := make(chan *models.PostableSilence, 100)
100 errc := make(chan error, 100)
101 var wg sync.WaitGroup
102 for w := 0; w < c.workers; w++ {
103 wg.Add(1)
104 go func() {
105 addSilenceWorker(ctx, amclient.Silence, silencec, errc)
106 wg.Done()
107 }()
108 }
109
110 errCount := 0
111 go func() {
112 for err := range errc {
113 if err != nil {
114 errCount++
115 }
116 }
117 }()
118
119 count := 0
120 for dec.More() {
121 var s models.PostableSilence
122 err := dec.Decode(&s)
123 if err != nil {
124 return errors.Wrap(err, "couldn't unmarshal input data, is it JSON?")
125 }
126
127 if c.force {
128
129 s.ID = ""
130 }
131
132 silencec <- &s
133 count++
134 }
135
136 close(silencec)
137 wg.Wait()
138 close(errc)
139
140 if errCount > 0 {
141 return fmt.Errorf("couldn't import %v out of %v silences", errCount, count)
142 }
143 return nil
144 }
145
View as plain text