...

Source file src/edge-infra.dev/pkg/edge/api/clients/bigquery_client.go

Documentation: edge-infra.dev/pkg/edge/api/clients

     1  package clients
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  
     7  	gcperror "edge-infra.dev/pkg/edge/api/apierror/gcp"
     8  	"edge-infra.dev/pkg/edge/api/graph/model"
     9  
    10  	"cloud.google.com/go/bigquery"
    11  	"github.com/doug-martin/goqu/v9"
    12  	"google.golang.org/api/iterator"
    13  )
    14  
    15  type bqClient struct {
    16  	Client        *bigquery.Client
    17  	ProjectID     string
    18  	BigQueryTable string
    19  }
    20  
    21  func New(ctx context.Context, projectID, bigqueryTable string) (BQClient, error) {
    22  	client, err := bigquery.NewClient(ctx, projectID)
    23  	if err != nil {
    24  		return nil, err
    25  	}
    26  
    27  	return &bqClient{
    28  		Client:        client,
    29  		ProjectID:     projectID,
    30  		BigQueryTable: bigqueryTable,
    31  	}, nil
    32  }
    33  
    34  func (b *bqClient) Read(ctx context.Context, query string) ([]string, error) {
    35  	q := b.Client.Query(query)
    36  	// Location must match that of the dataset(s) referenced in the query.
    37  	q.Location = "US"
    38  	// Run the query and print results when the query job is completed.
    39  	it, err := q.Read(ctx)
    40  	if err != nil {
    41  		return nil, err
    42  	}
    43  	return toJSONArrayString(it)
    44  }
    45  
    46  func toJSONArrayString(it *bigquery.RowIterator) ([]string, error) {
    47  	data := make([]string, 0)
    48  	for {
    49  		var row []bigquery.Value
    50  		err := it.Next(&row)
    51  		if err == iterator.Done {
    52  			break
    53  		}
    54  		if err != nil {
    55  			return nil, err
    56  		}
    57  		if len(row) == 1 {
    58  			data = append(data, row[0].(string))
    59  		} else {
    60  			str, err := json.Marshal(row)
    61  			if err != nil {
    62  				return nil, err
    63  			}
    64  			data = append(data, string(str))
    65  		}
    66  	}
    67  	return data, nil
    68  }
    69  
    70  func (b *bqClient) GetKubeResource(ctx context.Context, projectID string, cluster *model.Cluster, input model.LoqRequest) ([]string, error) {
    71  	query, err := b.buildQuery(projectID, cluster, input)
    72  	if err != nil {
    73  		return nil, gcperror.Wrap(err)
    74  	}
    75  	payload, err := b.Read(ctx, query)
    76  	if err != nil {
    77  		return nil, gcperror.Wrap(err)
    78  	}
    79  	return payload, nil
    80  }
    81  
    82  func (b *bqClient) buildQuery(projectID string, cluster *model.Cluster, input model.LoqRequest) (string, error) {
    83  	queryBuilder := goqu.Dialect("mysql").Select("resource")
    84  	if input.GetClusterEdgeID {
    85  		queryBuilder = queryBuilder.SelectAppend("cluster_edge_id")
    86  	}
    87  
    88  	queryBuilder = queryBuilder.From(b.BigQueryTable)
    89  
    90  	queryBuilder = queryBuilder.
    91  		Where(goqu.C("project_id").Eq(projectID)).
    92  		Where(goqu.C("k8s_group").Eq(input.Group)).
    93  		Where(goqu.C("version").Eq(input.Version)).
    94  		Where(goqu.C("kind").Eq(input.Kind)).
    95  		Where(goqu.C("operation").Neq("delete"))
    96  	if cluster != nil {
    97  		queryBuilder = queryBuilder.Where(goqu.C("cluster_edge_id").Eq(cluster.ClusterEdgeID))
    98  	}
    99  	if input.Name != nil {
   100  		queryBuilder = queryBuilder.Where(goqu.C("name").Eq(*input.Name))
   101  	}
   102  	if input.Namespace != nil {
   103  		queryBuilder = queryBuilder.Where(goqu.C("namespace").Eq(*input.Namespace))
   104  	}
   105  	query, _, err := queryBuilder.ToSQL()
   106  	return query, err
   107  }
   108  
   109  // BQClient is used to read kube resources from gcp bigquery
   110  //
   111  //go:generate mockgen -destination=../mocks/mock_big_query_client.go -package=mocks edge-infra.dev/pkg/edge/api/clients BQClient
   112  type BQClient interface {
   113  	Read(ctx context.Context, query string) ([]string, error)
   114  	GetKubeResource(ctx context.Context, projectID string, cluster *model.Cluster, input model.LoqRequest) ([]string, error)
   115  }
   116  

View as plain text