...
1 package notifications
2
3 import (
4 "expvar"
5 "fmt"
6 "net/http"
7 "sync"
8 )
9
10
11
12
13 type EndpointMetrics struct {
14 Pending int
15 Events int
16 Successes int
17 Failures int
18 Errors int
19 Statuses map[string]int
20 }
21
22
23
24 type safeMetrics struct {
25 EndpointMetrics
26 sync.Mutex
27 }
28
29
30 func newSafeMetrics() *safeMetrics {
31 var sm safeMetrics
32 sm.Statuses = make(map[string]int)
33 return &sm
34 }
35
36
37
38 func (sm *safeMetrics) httpStatusListener() httpStatusListener {
39 return &endpointMetricsHTTPStatusListener{
40 safeMetrics: sm,
41 }
42 }
43
44
45 func (sm *safeMetrics) eventQueueListener() eventQueueListener {
46 return &endpointMetricsEventQueueListener{
47 safeMetrics: sm,
48 }
49 }
50
51
52
53 type endpointMetricsHTTPStatusListener struct {
54 *safeMetrics
55 }
56
57 var _ httpStatusListener = &endpointMetricsHTTPStatusListener{}
58
59 func (emsl *endpointMetricsHTTPStatusListener) success(status int, events ...Event) {
60 emsl.safeMetrics.Lock()
61 defer emsl.safeMetrics.Unlock()
62 emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
63 emsl.Successes += len(events)
64 }
65
66 func (emsl *endpointMetricsHTTPStatusListener) failure(status int, events ...Event) {
67 emsl.safeMetrics.Lock()
68 defer emsl.safeMetrics.Unlock()
69 emsl.Statuses[fmt.Sprintf("%d %s", status, http.StatusText(status))] += len(events)
70 emsl.Failures += len(events)
71 }
72
73 func (emsl *endpointMetricsHTTPStatusListener) err(err error, events ...Event) {
74 emsl.safeMetrics.Lock()
75 defer emsl.safeMetrics.Unlock()
76 emsl.Errors += len(events)
77 }
78
79
80
81 type endpointMetricsEventQueueListener struct {
82 *safeMetrics
83 }
84
85 func (eqc *endpointMetricsEventQueueListener) ingress(events ...Event) {
86 eqc.Lock()
87 defer eqc.Unlock()
88 eqc.Events += len(events)
89 eqc.Pending += len(events)
90 }
91
92 func (eqc *endpointMetricsEventQueueListener) egress(events ...Event) {
93 eqc.Lock()
94 defer eqc.Unlock()
95 eqc.Pending -= len(events)
96 }
97
98
99 var endpoints struct {
100 registered []*Endpoint
101 mu sync.Mutex
102 }
103
104
105 func register(e *Endpoint) {
106 endpoints.mu.Lock()
107 defer endpoints.mu.Unlock()
108
109 endpoints.registered = append(endpoints.registered, e)
110 }
111
112 func init() {
113
114
115
116
117 registry := expvar.Get("registry")
118
119 if registry == nil {
120 registry = expvar.NewMap("registry")
121 }
122
123 var notifications expvar.Map
124 notifications.Init()
125 notifications.Set("endpoints", expvar.Func(func() interface{} {
126 endpoints.mu.Lock()
127 defer endpoints.mu.Unlock()
128
129 var names []interface{}
130 for _, v := range endpoints.registered {
131 var epjson struct {
132 Name string `json:"name"`
133 URL string `json:"url"`
134 EndpointConfig
135
136 Metrics EndpointMetrics
137 }
138
139 epjson.Name = v.Name()
140 epjson.URL = v.URL()
141 epjson.EndpointConfig = v.EndpointConfig
142
143 v.ReadMetrics(&epjson.Metrics)
144
145 names = append(names, epjson)
146 }
147
148 return names
149 }))
150
151 registry.(*expvar.Map).Set("notifications", ¬ifications)
152 }
153
View as plain text