...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/testutils_test.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"fmt"
    21  	"math"
    22  	"testing"
    23  
    24  	"cloud.google.com/go/bigquery"
    25  )
    26  
    27  // validateTableConstraints is used to validate properties of a table by computing stats using the query engine.
    28  func validateTableConstraints(ctx context.Context, t *testing.T, client *bigquery.Client, table *bigquery.Table, description string, opts ...constraintOption) {
    29  	vi := &validationInfo{
    30  		constraints: make(map[string]*constraint),
    31  	}
    32  
    33  	for _, o := range opts {
    34  		o(vi)
    35  	}
    36  
    37  	if len(vi.constraints) == 0 {
    38  		t.Errorf("%q: no constraints were specified", description)
    39  		return
    40  	}
    41  
    42  	sql := new(bytes.Buffer)
    43  	sql.WriteString("SELECT\n")
    44  	var i int
    45  	for _, c := range vi.constraints {
    46  		if i > 0 {
    47  			sql.WriteString(",\n")
    48  		}
    49  		sql.WriteString(c.projection)
    50  		i++
    51  	}
    52  	sql.WriteString(fmt.Sprintf("\nFROM `%s`.%s.%s", table.ProjectID, table.DatasetID, table.TableID))
    53  	q := client.Query(sql.String())
    54  	it, err := q.Read(ctx)
    55  	if err != nil {
    56  		t.Errorf("%q: failed to issue validation query: %v\nSQL: %s", description, err, sql.String())
    57  		return
    58  	}
    59  	var resultrow []bigquery.Value
    60  	err = it.Next(&resultrow)
    61  	if err != nil {
    62  		t.Errorf("%q: failed to get result row: %v", description, err)
    63  		return
    64  	}
    65  
    66  	for colname, con := range vi.constraints {
    67  		off := -1
    68  		for k, v := range it.Schema {
    69  			if v.Name == colname {
    70  				off = k
    71  				break
    72  			}
    73  		}
    74  		if off == -1 {
    75  			t.Errorf("%q: missing constraint %q from results", description, colname)
    76  			continue
    77  		}
    78  		val, ok := resultrow[off].(int64)
    79  		if !ok {
    80  			t.Errorf("%q: constraint %q type mismatch", description, colname)
    81  		}
    82  		if con.allowedError == 0 {
    83  			if val != con.expectedValue {
    84  				t.Errorf("%q: constraint %q mismatch, got %d want %d (%s)", description, colname, val, con.expectedValue, it.SourceJob().ID())
    85  			}
    86  			continue
    87  		}
    88  		res := val - con.expectedValue
    89  		if res < 0 {
    90  			res = -res
    91  		}
    92  		if res > con.allowedError {
    93  			t.Errorf("%q: constraint %q outside error bound %d, got %d want %d", description, colname, con.allowedError, val, con.expectedValue)
    94  		}
    95  	}
    96  }
    97  
    98  // constraint is a specific table constraint.
    99  type constraint struct {
   100  	// sql fragment that projects a result value
   101  	projection string
   102  
   103  	// all validation constraints must eval as int64.
   104  	expectedValue int64
   105  
   106  	// if nonzero, the constraint value must be within allowedError distance of expectedValue.
   107  	allowedError int64
   108  }
   109  
   110  // validationInfo is keyed by the result column name.
   111  type validationInfo struct {
   112  	constraints map[string]*constraint
   113  }
   114  
   115  // constraintOption is for building validation rules.
   116  type constraintOption func(*validationInfo)
   117  
   118  // withExactRowCount asserts the exact total row count of the table.
   119  func withExactRowCount(totalRows int64) constraintOption {
   120  	return func(vi *validationInfo) {
   121  		resultCol := "total_rows"
   122  		vi.constraints[resultCol] = &constraint{
   123  			projection:    fmt.Sprintf("COUNT(1) AS `%s`", resultCol),
   124  			expectedValue: totalRows,
   125  		}
   126  	}
   127  }
   128  
   129  // withNullCount asserts the number of null values in a column.
   130  func withNullCount(colname string, nullCount int64) constraintOption {
   131  	return func(vi *validationInfo) {
   132  		resultCol := fmt.Sprintf("nullcol_count_%s", colname)
   133  		vi.constraints[resultCol] = &constraint{
   134  			projection:    fmt.Sprintf("SUM(IF(`%s` IS NULL,1,0)) AS `%s`", colname, resultCol),
   135  			expectedValue: nullCount,
   136  		}
   137  	}
   138  }
   139  
   140  // withNonNullCount asserts the number of non null values in a column.
   141  func withNonNullCount(colname string, nonNullCount int64) constraintOption {
   142  	return func(vi *validationInfo) {
   143  		resultCol := fmt.Sprintf("nonnullcol_count_%s", colname)
   144  		vi.constraints[resultCol] = &constraint{
   145  			projection:    fmt.Sprintf("SUM(IF(`%s` IS NOT NULL,1,0)) AS `%s`", colname, resultCol),
   146  			expectedValue: nonNullCount,
   147  		}
   148  	}
   149  }
   150  
   151  // withDistinctValues validates the exact cardinality of a column.
   152  func withDistinctValues(colname string, distinctVals int64) constraintOption {
   153  	return func(vi *validationInfo) {
   154  		resultCol := fmt.Sprintf("distinct_count_%s", colname)
   155  		vi.constraints[resultCol] = &constraint{
   156  			projection:    fmt.Sprintf("COUNT(DISTINCT `%s`) AS `%s`", colname, resultCol),
   157  			expectedValue: distinctVals,
   158  		}
   159  	}
   160  }
   161  
   162  // withApproxDistinctValues validates the approximate cardinality of a column with an error bound.
   163  func withApproxDistinctValues(colname string, approxValues int64, errorBound int64) constraintOption {
   164  	return func(vi *validationInfo) {
   165  		resultCol := fmt.Sprintf("distinct_count_%s", colname)
   166  		vi.constraints[resultCol] = &constraint{
   167  			projection:    fmt.Sprintf("APPROX_COUNT_DISTINCT(`%s`) AS `%s`", colname, resultCol),
   168  			expectedValue: approxValues,
   169  			allowedError:  errorBound,
   170  		}
   171  	}
   172  }
   173  
   174  // withIntegerValueCount validates how many values in the column have a given integer value.
   175  func withIntegerValueCount(colname string, wantValue int64, valueCount int64) constraintOption {
   176  	return func(vi *validationInfo) {
   177  		resultCol := fmt.Sprintf("integer_value_count_%s", colname)
   178  		vi.constraints[resultCol] = &constraint{
   179  			projection:    fmt.Sprintf("COUNTIF(`%s` = %d) AS `%s`", colname, wantValue, resultCol),
   180  			expectedValue: valueCount,
   181  		}
   182  	}
   183  }
   184  
   185  // withStringValueCount validates how many values in the column have a given string value.
   186  func withStringValueCount(colname string, wantValue string, valueCount int64) constraintOption {
   187  	return func(vi *validationInfo) {
   188  		resultCol := fmt.Sprintf("string_value_count_%s", colname)
   189  		vi.constraints[resultCol] = &constraint{
   190  			projection:    fmt.Sprintf("COUNTIF(`%s` = \"%s\") AS `%s`", colname, wantValue, resultCol),
   191  			expectedValue: valueCount,
   192  		}
   193  	}
   194  }
   195  
   196  // withBoolValueCount validates how many values in the column have a given boolean value.
   197  func withBoolValueCount(colname string, wantValue bool, valueCount int64) constraintOption {
   198  	return func(vi *validationInfo) {
   199  		resultCol := fmt.Sprintf("bool_value_count_%s", colname)
   200  		vi.constraints[resultCol] = &constraint{
   201  			projection:    fmt.Sprintf("COUNTIF(`%s` = %t) AS `%s`", colname, wantValue, resultCol),
   202  			expectedValue: valueCount,
   203  		}
   204  	}
   205  }
   206  
   207  // withBytesValueCount validates how many values in the column have a given bytes value.
   208  func withBytesValueCount(colname string, wantValue []byte, valueCount int64) constraintOption {
   209  	return func(vi *validationInfo) {
   210  		resultCol := fmt.Sprintf("bytes_value_count_%s", colname)
   211  		vi.constraints[resultCol] = &constraint{
   212  			projection:    fmt.Sprintf("COUNTIF(`%s` = B\"%s\") AS `%s`", colname, wantValue, resultCol),
   213  			expectedValue: valueCount,
   214  		}
   215  	}
   216  }
   217  
   218  // withFloatValueCount validates how many values in the column have a given floating point value, with a
   219  // reasonable error bound due to precision loss.
   220  func withFloatValueCount(colname string, wantValue float64, valueCount int64) constraintOption {
   221  	return func(vi *validationInfo) {
   222  		resultCol := fmt.Sprintf("float_value_count_%s", colname)
   223  		projection := fmt.Sprintf("COUNTIF((ABS(`%s`) - ABS(%f))/ABS(%f) < 0.0001) AS `%s`", colname, wantValue, wantValue, resultCol)
   224  		switch wantValue {
   225  		case math.Inf(0):
   226  			// special case for infinities.
   227  			projection = fmt.Sprintf("COUNTIF(IS_INF(`%s`)) as `%s`", colname, resultCol)
   228  		case math.NaN():
   229  			projection = fmt.Sprintf("COUNTIF(IS_NAN(%s)) as `%s`", colname, resultCol)
   230  		case 0:
   231  			projection = fmt.Sprintf("COUNTIF(SIGN(`%s`) = 0) as `%s`", colname, resultCol)
   232  		}
   233  		vi.constraints[resultCol] = &constraint{
   234  			projection:    projection,
   235  			expectedValue: valueCount,
   236  		}
   237  	}
   238  }
   239  
   240  // withArrayLength validates how many rows in an ARRAY column have a given length.
   241  func withArrayLength(colname string, wantLen int64, wantCount int64) constraintOption {
   242  	return func(vi *validationInfo) {
   243  		resultCol := fmt.Sprintf("arraylength_value_count_%s", colname)
   244  		vi.constraints[resultCol] = &constraint{
   245  			projection:    fmt.Sprintf("COUNTIF(ARRAY_LENGTH(`%s`) = %d) as `%s`", colname, wantLen, resultCol),
   246  			expectedValue: wantCount,
   247  		}
   248  	}
   249  }
   250  
   251  // withDistinctArrayValues validates how many elements of an ARRAY column have a given cardinality.
   252  func withDistinctArrayValues(colname string, distinctVals, wantCount int64) constraintOption {
   253  	return func(vi *validationInfo) {
   254  		resultCol := fmt.Sprintf("distinct_array_count_%s", colname)
   255  		vi.constraints[resultCol] = &constraint{
   256  			projection:    fmt.Sprintf("COUNTIF(ARRAY_LENGTH(ARRAY(SELECT DISTINCT element FROM UNNEST(`%s`) as element)) = %d) AS `%s`", colname, distinctVals, resultCol),
   257  			expectedValue: wantCount,
   258  		}
   259  	}
   260  }
   261  
   262  // withIntegerArraySum validates the total sum of values in an ARRAY<INT64?> column.
   263  func withIntegerArraySum(colname string, arraySum int64, wantCount int64) constraintOption {
   264  	return func(vi *validationInfo) {
   265  		resultCol := fmt.Sprintf("arraysum_int64_value_count_%s", colname)
   266  		vi.constraints[resultCol] = &constraint{
   267  			projection:    fmt.Sprintf("COUNTIF((SELECT SUM(elem) FROM UNNEST(`%s`) as elem) = %d) as `%s`", colname, arraySum, resultCol),
   268  			expectedValue: wantCount,
   269  		}
   270  	}
   271  }
   272  
   273  // withFloatArraySum validates how many rows in an an ARRAY<INT64?> column have a given sum, within an error bound.
   274  func withFloatArraySum(colname string, floatSum float64, wantCount int64) constraintOption {
   275  	return func(vi *validationInfo) {
   276  		resultCol := fmt.Sprintf("arraysum_float_value_count_%s", colname)
   277  		vi.constraints[resultCol] = &constraint{
   278  			projection:    fmt.Sprintf("COUNTIF(((SELECT ABS(SUM(elem)) FROM UNNEST(`%s`) as elem) - ABS(%f)) / ABS(%f) < 0.0001) as `%s`", colname, floatSum, floatSum, resultCol),
   279  			expectedValue: wantCount,
   280  		}
   281  	}
   282  }
   283  

View as plain text