1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "log"
20 "sync"
21
22 "go.opencensus.io/stats"
23 "go.opencensus.io/stats/view"
24 "go.opencensus.io/tag"
25 )
26
27
28 var (
29 keyTopic = tag.MustNewKey("topic")
30 keySubscription = tag.MustNewKey("subscription")
31 )
32
33
34 var (
35 keyStatus = tag.MustNewKey("status")
36 keyError = tag.MustNewKey("error")
37 )
38
39 const statsPrefix = "cloud.google.com/go/pubsub/"
40
41
42 var (
43
44
45 PublishedMessages = stats.Int64(statsPrefix+"published_messages", "Number of PubSub message published", stats.UnitDimensionless)
46
47
48
49
50 PublishLatency = stats.Float64(statsPrefix+"publish_roundtrip_latency", "The latency in milliseconds per publish batch", stats.UnitMilliseconds)
51
52
53
54 PullCount = stats.Int64(statsPrefix+"pull_count", "Number of PubSub messages pulled", stats.UnitDimensionless)
55
56
57
58 AckCount = stats.Int64(statsPrefix+"ack_count", "Number of PubSub messages acked", stats.UnitDimensionless)
59
60
61
62 NackCount = stats.Int64(statsPrefix+"nack_count", "Number of PubSub messages nacked", stats.UnitDimensionless)
63
64
65
66 ModAckCount = stats.Int64(statsPrefix+"mod_ack_count", "Number of ack-deadlines modified", stats.UnitDimensionless)
67
68
69
70 ModAckTimeoutCount = stats.Int64(statsPrefix+"mod_ack_timeout_count", "Number of ModifyAckDeadline RPCs that timed out", stats.UnitDimensionless)
71
72
73
74 StreamOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of calls opening a new streaming pull", stats.UnitDimensionless)
75
76
77
78 StreamRetryCount = stats.Int64(statsPrefix+"stream_retry_count", "Number of retries of a stream send or receive", stats.UnitDimensionless)
79
80
81
82 StreamRequestCount = stats.Int64(statsPrefix+"stream_request_count", "Number gRPC StreamingPull request messages sent", stats.UnitDimensionless)
83
84
85
86 StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless)
87
88
89
90 OutstandingMessages = stats.Int64(statsPrefix+"outstanding_messages", "Number of outstanding Pub/Sub messages", stats.UnitDimensionless)
91
92
93
94 OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless)
95
96
97
98 PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless)
99
100
101
102 PublisherOutstandingBytes = stats.Int64(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", stats.UnitDimensionless)
103 )
104
105 var (
106
107
108 PublishedMessagesView *view.View
109
110
111
112 PublishLatencyView *view.View
113
114
115
116 PullCountView *view.View
117
118
119
120 AckCountView *view.View
121
122
123
124 NackCountView *view.View
125
126
127
128 ModAckCountView *view.View
129
130
131
132 ModAckTimeoutCountView *view.View
133
134
135
136 StreamOpenCountView *view.View
137
138
139
140 StreamRetryCountView *view.View
141
142
143
144 StreamRequestCountView *view.View
145
146
147
148 StreamResponseCountView *view.View
149
150
151
152 OutstandingMessagesView *view.View
153
154
155
156 OutstandingBytesView *view.View
157
158
159
160 PublisherOutstandingMessagesView *view.View
161
162
163
164 PublisherOutstandingBytesView *view.View
165 )
166
167 func init() {
168 PublishedMessagesView = createCountView(stats.Measure(PublishedMessages), keyTopic, keyStatus, keyError)
169 PublishLatencyView = createDistView(PublishLatency, keyTopic, keyStatus, keyError)
170 PublisherOutstandingMessagesView = createLastValueView(PublisherOutstandingMessages, keyTopic)
171 PublisherOutstandingBytesView = createLastValueView(PublisherOutstandingBytes, keyTopic)
172 PullCountView = createCountView(PullCount, keySubscription)
173 AckCountView = createCountView(AckCount, keySubscription)
174 NackCountView = createCountView(NackCount, keySubscription)
175 ModAckCountView = createCountView(ModAckCount, keySubscription)
176 ModAckTimeoutCountView = createCountView(ModAckTimeoutCount, keySubscription)
177 StreamOpenCountView = createCountView(StreamOpenCount, keySubscription)
178 StreamRetryCountView = createCountView(StreamRetryCount, keySubscription)
179 StreamRequestCountView = createCountView(StreamRequestCount, keySubscription)
180 StreamResponseCountView = createCountView(StreamResponseCount, keySubscription)
181 OutstandingMessagesView = createLastValueView(OutstandingMessages, keySubscription)
182 OutstandingBytesView = createLastValueView(OutstandingBytes, keySubscription)
183
184 DefaultPublishViews = []*view.View{
185 PublishedMessagesView,
186 PublishLatencyView,
187 PublisherOutstandingMessagesView,
188 PublisherOutstandingBytesView,
189 }
190
191 DefaultSubscribeViews = []*view.View{
192 PullCountView,
193 AckCountView,
194 NackCountView,
195 ModAckCountView,
196 ModAckTimeoutCountView,
197 StreamOpenCountView,
198 StreamRetryCountView,
199 StreamRequestCountView,
200 StreamResponseCountView,
201 OutstandingMessagesView,
202 OutstandingBytesView,
203 }
204 }
205
206
207
208 var (
209 DefaultPublishViews []*view.View
210 DefaultSubscribeViews []*view.View
211 )
212
213 func createCountView(m stats.Measure, keys ...tag.Key) *view.View {
214 return &view.View{
215 Name: m.Name(),
216 Description: m.Description(),
217 TagKeys: keys,
218 Measure: m,
219 Aggregation: view.Sum(),
220 }
221 }
222
223 func createDistView(m stats.Measure, keys ...tag.Key) *view.View {
224 return &view.View{
225 Name: m.Name(),
226 Description: m.Description(),
227 TagKeys: keys,
228 Measure: m,
229 Aggregation: view.Distribution(0, 25, 50, 75, 100, 200, 400, 600, 800, 1000, 2000, 4000, 6000),
230 }
231 }
232
233 func createLastValueView(m stats.Measure, keys ...tag.Key) *view.View {
234 return &view.View{
235 Name: m.Name(),
236 Description: m.Description(),
237 TagKeys: keys,
238 Measure: m,
239 Aggregation: view.LastValue(),
240 }
241 }
242
243 var logOnce sync.Once
244
245
246 func withSubscriptionKey(ctx context.Context, subName string) context.Context {
247 ctx, err := tag.New(ctx, tag.Upsert(keySubscription, subName))
248 if err != nil {
249 logOnce.Do(func() {
250 log.Printf("pubsub: error creating tag map for 'subscribe' key: %v", err)
251 })
252 }
253 return ctx
254 }
255
256 func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
257 stats.Record(ctx, m.M(n))
258 }
259
View as plain text