1
2
3
4
5
6
7
8
9
10
11
12
13
14 package api
15
16 import (
17 "errors"
18 "fmt"
19 "net/http"
20 "runtime"
21 "time"
22
23 "github.com/go-kit/log"
24 "github.com/prometheus/client_golang/prometheus"
25 "github.com/prometheus/common/model"
26 "github.com/prometheus/common/route"
27
28 apiv1 "github.com/prometheus/alertmanager/api/v1"
29 apiv2 "github.com/prometheus/alertmanager/api/v2"
30 "github.com/prometheus/alertmanager/cluster"
31 "github.com/prometheus/alertmanager/config"
32 "github.com/prometheus/alertmanager/dispatch"
33 "github.com/prometheus/alertmanager/provider"
34 "github.com/prometheus/alertmanager/silence"
35 "github.com/prometheus/alertmanager/types"
36 )
37
38
39 type API struct {
40 v1 *apiv1.API
41 v2 *apiv2.API
42 requestsInFlight prometheus.Gauge
43 concurrencyLimitExceeded prometheus.Counter
44 timeout time.Duration
45 inFlightSem chan struct{}
46 }
47
48
49
50 type Options struct {
51
52 Alerts provider.Alerts
53
54 Silences *silence.Silences
55
56
57 StatusFunc func(model.Fingerprint) types.AlertStatus
58
59 Peer cluster.ClusterPeer
60
61
62 Timeout time.Duration
63
64
65
66
67 Concurrency int
68
69 Logger log.Logger
70
71
72 Registry prometheus.Registerer
73
74
75
76 GroupFunc func(func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string)
77 }
78
79 func (o Options) validate() error {
80 if o.Alerts == nil {
81 return errors.New("mandatory field Alerts not set")
82 }
83 if o.Silences == nil {
84 return errors.New("mandatory field Silences not set")
85 }
86 if o.StatusFunc == nil {
87 return errors.New("mandatory field StatusFunc not set")
88 }
89 if o.GroupFunc == nil {
90 return errors.New("mandatory field GroupFunc not set")
91 }
92 return nil
93 }
94
95
96
97 func New(opts Options) (*API, error) {
98 if err := opts.validate(); err != nil {
99 return nil, fmt.Errorf("invalid API options: %s", err)
100 }
101 l := opts.Logger
102 if l == nil {
103 l = log.NewNopLogger()
104 }
105 concurrency := opts.Concurrency
106 if concurrency < 1 {
107 concurrency = runtime.GOMAXPROCS(0)
108 if concurrency < 8 {
109 concurrency = 8
110 }
111 }
112
113 v1 := apiv1.New(
114 opts.Alerts,
115 opts.Silences,
116 opts.StatusFunc,
117 opts.Peer,
118 log.With(l, "version", "v1"),
119 opts.Registry,
120 )
121
122 v2, err := apiv2.NewAPI(
123 opts.Alerts,
124 opts.GroupFunc,
125 opts.StatusFunc,
126 opts.Silences,
127 opts.Peer,
128 log.With(l, "version", "v2"),
129 opts.Registry,
130 )
131 if err != nil {
132 return nil, err
133 }
134
135
136
137 requestsInFlight := prometheus.NewGauge(prometheus.GaugeOpts{
138 Name: "alertmanager_http_requests_in_flight",
139 Help: "Current number of HTTP requests being processed.",
140 ConstLabels: prometheus.Labels{"method": "get"},
141 })
142 concurrencyLimitExceeded := prometheus.NewCounter(prometheus.CounterOpts{
143 Name: "alertmanager_http_concurrency_limit_exceeded_total",
144 Help: "Total number of times an HTTP request failed because the concurrency limit was reached.",
145 ConstLabels: prometheus.Labels{"method": "get"},
146 })
147 if opts.Registry != nil {
148 if err := opts.Registry.Register(requestsInFlight); err != nil {
149 return nil, err
150 }
151 if err := opts.Registry.Register(concurrencyLimitExceeded); err != nil {
152 return nil, err
153 }
154 }
155
156 return &API{
157 v1: v1,
158 v2: v2,
159 requestsInFlight: requestsInFlight,
160 concurrencyLimitExceeded: concurrencyLimitExceeded,
161 timeout: opts.Timeout,
162 inFlightSem: make(chan struct{}, concurrency),
163 }, nil
164 }
165
166
167
168
169
170
171
172
173
174 func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
175 api.v1.Register(r.WithPrefix("/api/v1"))
176
177 mux := http.NewServeMux()
178 mux.Handle("/", api.limitHandler(r))
179
180 apiPrefix := ""
181 if routePrefix != "/" {
182 apiPrefix = routePrefix
183 }
184
185
186
187
188 mux.Handle(
189 apiPrefix+"/api/v2/",
190 api.limitHandler(http.StripPrefix(apiPrefix, api.v2.Handler)),
191 )
192
193 return mux
194 }
195
196
197
198 func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet)) {
199 api.v1.Update(cfg)
200 api.v2.Update(cfg, setAlertStatus)
201 }
202
203 func (api *API) limitHandler(h http.Handler) http.Handler {
204 concLimiter := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
205 if req.Method == http.MethodGet {
206 select {
207 case api.inFlightSem <- struct{}{}:
208 api.requestsInFlight.Inc()
209 defer func() {
210 <-api.inFlightSem
211 api.requestsInFlight.Dec()
212 }()
213 default:
214 api.concurrencyLimitExceeded.Inc()
215 http.Error(rsp, fmt.Sprintf(
216 "Limit of concurrent GET requests reached (%d), try again later.\n", cap(api.inFlightSem),
217 ), http.StatusServiceUnavailable)
218 return
219 }
220 }
221 h.ServeHTTP(rsp, req)
222 })
223 if api.timeout <= 0 {
224 return concLimiter
225 }
226 return http.TimeoutHandler(concLimiter, api.timeout, fmt.Sprintf(
227 "Exceeded configured timeout of %v.\n", api.timeout,
228 ))
229 }
230
View as plain text