1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "bytes"
19 "context"
20 "fmt"
21 "math"
22 "testing"
23
24 "cloud.google.com/go/bigquery"
25 )
26
27
28 func validateTableConstraints(ctx context.Context, t *testing.T, client *bigquery.Client, table *bigquery.Table, description string, opts ...constraintOption) {
29 vi := &validationInfo{
30 constraints: make(map[string]*constraint),
31 }
32
33 for _, o := range opts {
34 o(vi)
35 }
36
37 if len(vi.constraints) == 0 {
38 t.Errorf("%q: no constraints were specified", description)
39 return
40 }
41
42 sql := new(bytes.Buffer)
43 sql.WriteString("SELECT\n")
44 var i int
45 for _, c := range vi.constraints {
46 if i > 0 {
47 sql.WriteString(",\n")
48 }
49 sql.WriteString(c.projection)
50 i++
51 }
52 sql.WriteString(fmt.Sprintf("\nFROM `%s`.%s.%s", table.ProjectID, table.DatasetID, table.TableID))
53 q := client.Query(sql.String())
54 it, err := q.Read(ctx)
55 if err != nil {
56 t.Errorf("%q: failed to issue validation query: %v\nSQL: %s", description, err, sql.String())
57 return
58 }
59 var resultrow []bigquery.Value
60 err = it.Next(&resultrow)
61 if err != nil {
62 t.Errorf("%q: failed to get result row: %v", description, err)
63 return
64 }
65
66 for colname, con := range vi.constraints {
67 off := -1
68 for k, v := range it.Schema {
69 if v.Name == colname {
70 off = k
71 break
72 }
73 }
74 if off == -1 {
75 t.Errorf("%q: missing constraint %q from results", description, colname)
76 continue
77 }
78 val, ok := resultrow[off].(int64)
79 if !ok {
80 t.Errorf("%q: constraint %q type mismatch", description, colname)
81 }
82 if con.allowedError == 0 {
83 if val != con.expectedValue {
84 t.Errorf("%q: constraint %q mismatch, got %d want %d (%s)", description, colname, val, con.expectedValue, it.SourceJob().ID())
85 }
86 continue
87 }
88 res := val - con.expectedValue
89 if res < 0 {
90 res = -res
91 }
92 if res > con.allowedError {
93 t.Errorf("%q: constraint %q outside error bound %d, got %d want %d", description, colname, con.allowedError, val, con.expectedValue)
94 }
95 }
96 }
97
98
99 type constraint struct {
100
101 projection string
102
103
104 expectedValue int64
105
106
107 allowedError int64
108 }
109
110
111 type validationInfo struct {
112 constraints map[string]*constraint
113 }
114
115
116 type constraintOption func(*validationInfo)
117
118
119 func withExactRowCount(totalRows int64) constraintOption {
120 return func(vi *validationInfo) {
121 resultCol := "total_rows"
122 vi.constraints[resultCol] = &constraint{
123 projection: fmt.Sprintf("COUNT(1) AS `%s`", resultCol),
124 expectedValue: totalRows,
125 }
126 }
127 }
128
129
130 func withNullCount(colname string, nullCount int64) constraintOption {
131 return func(vi *validationInfo) {
132 resultCol := fmt.Sprintf("nullcol_count_%s", colname)
133 vi.constraints[resultCol] = &constraint{
134 projection: fmt.Sprintf("SUM(IF(`%s` IS NULL,1,0)) AS `%s`", colname, resultCol),
135 expectedValue: nullCount,
136 }
137 }
138 }
139
140
141 func withNonNullCount(colname string, nonNullCount int64) constraintOption {
142 return func(vi *validationInfo) {
143 resultCol := fmt.Sprintf("nonnullcol_count_%s", colname)
144 vi.constraints[resultCol] = &constraint{
145 projection: fmt.Sprintf("SUM(IF(`%s` IS NOT NULL,1,0)) AS `%s`", colname, resultCol),
146 expectedValue: nonNullCount,
147 }
148 }
149 }
150
151
152 func withDistinctValues(colname string, distinctVals int64) constraintOption {
153 return func(vi *validationInfo) {
154 resultCol := fmt.Sprintf("distinct_count_%s", colname)
155 vi.constraints[resultCol] = &constraint{
156 projection: fmt.Sprintf("COUNT(DISTINCT `%s`) AS `%s`", colname, resultCol),
157 expectedValue: distinctVals,
158 }
159 }
160 }
161
162
163 func withApproxDistinctValues(colname string, approxValues int64, errorBound int64) constraintOption {
164 return func(vi *validationInfo) {
165 resultCol := fmt.Sprintf("distinct_count_%s", colname)
166 vi.constraints[resultCol] = &constraint{
167 projection: fmt.Sprintf("APPROX_COUNT_DISTINCT(`%s`) AS `%s`", colname, resultCol),
168 expectedValue: approxValues,
169 allowedError: errorBound,
170 }
171 }
172 }
173
174
175 func withIntegerValueCount(colname string, wantValue int64, valueCount int64) constraintOption {
176 return func(vi *validationInfo) {
177 resultCol := fmt.Sprintf("integer_value_count_%s", colname)
178 vi.constraints[resultCol] = &constraint{
179 projection: fmt.Sprintf("COUNTIF(`%s` = %d) AS `%s`", colname, wantValue, resultCol),
180 expectedValue: valueCount,
181 }
182 }
183 }
184
185
186 func withStringValueCount(colname string, wantValue string, valueCount int64) constraintOption {
187 return func(vi *validationInfo) {
188 resultCol := fmt.Sprintf("string_value_count_%s", colname)
189 vi.constraints[resultCol] = &constraint{
190 projection: fmt.Sprintf("COUNTIF(`%s` = \"%s\") AS `%s`", colname, wantValue, resultCol),
191 expectedValue: valueCount,
192 }
193 }
194 }
195
196
197 func withBoolValueCount(colname string, wantValue bool, valueCount int64) constraintOption {
198 return func(vi *validationInfo) {
199 resultCol := fmt.Sprintf("bool_value_count_%s", colname)
200 vi.constraints[resultCol] = &constraint{
201 projection: fmt.Sprintf("COUNTIF(`%s` = %t) AS `%s`", colname, wantValue, resultCol),
202 expectedValue: valueCount,
203 }
204 }
205 }
206
207
208 func withBytesValueCount(colname string, wantValue []byte, valueCount int64) constraintOption {
209 return func(vi *validationInfo) {
210 resultCol := fmt.Sprintf("bytes_value_count_%s", colname)
211 vi.constraints[resultCol] = &constraint{
212 projection: fmt.Sprintf("COUNTIF(`%s` = B\"%s\") AS `%s`", colname, wantValue, resultCol),
213 expectedValue: valueCount,
214 }
215 }
216 }
217
218
219
220 func withFloatValueCount(colname string, wantValue float64, valueCount int64) constraintOption {
221 return func(vi *validationInfo) {
222 resultCol := fmt.Sprintf("float_value_count_%s", colname)
223 projection := fmt.Sprintf("COUNTIF((ABS(`%s`) - ABS(%f))/ABS(%f) < 0.0001) AS `%s`", colname, wantValue, wantValue, resultCol)
224 switch wantValue {
225 case math.Inf(0):
226
227 projection = fmt.Sprintf("COUNTIF(IS_INF(`%s`)) as `%s`", colname, resultCol)
228 case math.NaN():
229 projection = fmt.Sprintf("COUNTIF(IS_NAN(%s)) as `%s`", colname, resultCol)
230 case 0:
231 projection = fmt.Sprintf("COUNTIF(SIGN(`%s`) = 0) as `%s`", colname, resultCol)
232 }
233 vi.constraints[resultCol] = &constraint{
234 projection: projection,
235 expectedValue: valueCount,
236 }
237 }
238 }
239
240
241 func withArrayLength(colname string, wantLen int64, wantCount int64) constraintOption {
242 return func(vi *validationInfo) {
243 resultCol := fmt.Sprintf("arraylength_value_count_%s", colname)
244 vi.constraints[resultCol] = &constraint{
245 projection: fmt.Sprintf("COUNTIF(ARRAY_LENGTH(`%s`) = %d) as `%s`", colname, wantLen, resultCol),
246 expectedValue: wantCount,
247 }
248 }
249 }
250
251
252 func withDistinctArrayValues(colname string, distinctVals, wantCount int64) constraintOption {
253 return func(vi *validationInfo) {
254 resultCol := fmt.Sprintf("distinct_array_count_%s", colname)
255 vi.constraints[resultCol] = &constraint{
256 projection: fmt.Sprintf("COUNTIF(ARRAY_LENGTH(ARRAY(SELECT DISTINCT element FROM UNNEST(`%s`) as element)) = %d) AS `%s`", colname, distinctVals, resultCol),
257 expectedValue: wantCount,
258 }
259 }
260 }
261
262
263 func withIntegerArraySum(colname string, arraySum int64, wantCount int64) constraintOption {
264 return func(vi *validationInfo) {
265 resultCol := fmt.Sprintf("arraysum_int64_value_count_%s", colname)
266 vi.constraints[resultCol] = &constraint{
267 projection: fmt.Sprintf("COUNTIF((SELECT SUM(elem) FROM UNNEST(`%s`) as elem) = %d) as `%s`", colname, arraySum, resultCol),
268 expectedValue: wantCount,
269 }
270 }
271 }
272
273
274 func withFloatArraySum(colname string, floatSum float64, wantCount int64) constraintOption {
275 return func(vi *validationInfo) {
276 resultCol := fmt.Sprintf("arraysum_float_value_count_%s", colname)
277 vi.constraints[resultCol] = &constraint{
278 projection: fmt.Sprintf("COUNTIF(((SELECT ABS(SUM(elem)) FROM UNNEST(`%s`) as elem) - ABS(%f)) / ABS(%f) < 0.0001) as `%s`", colname, floatSum, floatSum, resultCol),
279 expectedValue: wantCount,
280 }
281 }
282 }
283
View as plain text