1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "context"
19
20 "go.opencensus.io/stats"
21 "go.opencensus.io/stats/view"
22 "go.opencensus.io/tag"
23 )
24
25 var (
26
27 keyStream = tag.MustNewKey("streamID")
28
29
30
31 keyDataOrigin = tag.MustNewKey("dataOrigin")
32
33
34 keyError = tag.MustNewKey("error")
35 )
36
37
38
39 var DefaultOpenCensusViews []*view.View
40
41 const statsPrefix = "cloud.google.com/go/bigquery/storage/managedwriter/"
42
43 var (
44
45
46 AppendClientOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of times AppendRowsClient was opened", stats.UnitDimensionless)
47
48
49
50 AppendClientOpenRetryCount = stats.Int64(statsPrefix+"stream_open_retry_count", "Number of times AppendRowsClient open was retried", stats.UnitDimensionless)
51
52
53
54 AppendRequests = stats.Int64(statsPrefix+"append_requests", "Number of append requests sent", stats.UnitDimensionless)
55
56
57
58 AppendRequestBytes = stats.Int64(statsPrefix+"append_request_bytes", "Number of bytes sent as append requests", stats.UnitBytes)
59
60
61
62 AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless)
63
64
65
66 AppendRequestReconnects = stats.Int64(statsPrefix+"append_reconnections", "Number of append rows reconnections", stats.UnitDimensionless)
67
68
69
70 AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless)
71
72
73
74 AppendResponses = stats.Int64(statsPrefix+"append_responses", "Number of append responses sent", stats.UnitDimensionless)
75
76
77
78 AppendResponseErrors = stats.Int64(statsPrefix+"append_response_errors", "Number of append responses with errors attached", stats.UnitDimensionless)
79
80
81
82
83 AppendRetryCount = stats.Int64(statsPrefix+"append_retry_count", "Number of appends that were retried", stats.UnitDimensionless)
84
85
86
87 FlushRequests = stats.Int64(statsPrefix+"flush_requests", "Number of FlushRows requests sent", stats.UnitDimensionless)
88 )
89
90 var (
91
92
93
94 AppendClientOpenView *view.View
95
96
97
98 AppendClientOpenRetryView *view.View
99
100
101
102 AppendRequestsView *view.View
103
104
105
106 AppendRequestBytesView *view.View
107
108
109
110 AppendRequestErrorsView *view.View
111
112
113
114 AppendRequestReconnectsView *view.View
115
116
117
118 AppendRequestRowsView *view.View
119
120
121
122 AppendResponsesView *view.View
123
124
125
126 AppendResponseErrorsView *view.View
127
128
129
130 AppendRetryView *view.View
131
132
133
134 FlushRequestsView *view.View
135 )
136
137 func init() {
138 AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyError)
139 AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount))
140
141 AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
142 AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin)
143 AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError)
144 AppendRequestReconnectsView = createSumView(stats.Measure(AppendRequestReconnects), keyStream, keyDataOrigin, keyError)
145 AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin)
146
147 AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
148 AppendResponseErrorsView = createSumView(stats.Measure(AppendResponseErrors), keyStream, keyDataOrigin, keyError)
149 AppendRetryView = createSumView(stats.Measure(AppendRetryCount), keyStream, keyDataOrigin)
150 FlushRequestsView = createSumView(stats.Measure(FlushRequests), keyStream, keyDataOrigin)
151
152 DefaultOpenCensusViews = []*view.View{
153 AppendClientOpenView,
154 AppendClientOpenRetryView,
155
156 AppendRequestsView,
157 AppendRequestBytesView,
158 AppendRequestErrorsView,
159 AppendRequestReconnectsView,
160 AppendRequestRowsView,
161
162 AppendResponsesView,
163 AppendResponseErrorsView,
164 AppendRetryView,
165
166 FlushRequestsView,
167 }
168 }
169
170 func createView(m stats.Measure, agg *view.Aggregation, keys ...tag.Key) *view.View {
171 return &view.View{
172 Name: m.Name(),
173 Description: m.Description(),
174 TagKeys: keys,
175 Measure: m,
176 Aggregation: agg,
177 }
178 }
179
180 func createSumView(m stats.Measure, keys ...tag.Key) *view.View {
181 return createView(m, view.Sum(), keys...)
182 }
183
184
185
186 func setupWriterStatContext(ms *ManagedStream) context.Context {
187 if ms == nil {
188 panic("no ManagedStream provided")
189 }
190 kCtx := ms.ctx
191 if ms.streamSettings == nil {
192 return kCtx
193 }
194 if ms.streamSettings.streamID != "" {
195 ctx, err := tag.New(kCtx, tag.Upsert(keyStream, ms.streamSettings.streamID))
196 if err != nil {
197 return kCtx
198 }
199 kCtx = ctx
200 }
201 if ms.streamSettings.dataOrigin != "" {
202 ctx, err := tag.New(kCtx, tag.Upsert(keyDataOrigin, ms.streamSettings.dataOrigin))
203 if err != nil {
204 return kCtx
205 }
206 kCtx = ctx
207 }
208 return kCtx
209 }
210
211
212
213 func recordWriterStat(ms *ManagedStream, m *stats.Int64Measure, n int64) {
214 stats.Record(ms.ctx, m.M(n))
215 }
216
217 func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {
218 stats.Record(ctx, m.M(n))
219 }
220
View as plain text