package clients import ( "context" "encoding/json" gcperror "edge-infra.dev/pkg/edge/api/apierror/gcp" "edge-infra.dev/pkg/edge/api/graph/model" "cloud.google.com/go/bigquery" "github.com/doug-martin/goqu/v9" "google.golang.org/api/iterator" ) type bqClient struct { Client *bigquery.Client ProjectID string BigQueryTable string } func New(ctx context.Context, projectID, bigqueryTable string) (BQClient, error) { client, err := bigquery.NewClient(ctx, projectID) if err != nil { return nil, err } return &bqClient{ Client: client, ProjectID: projectID, BigQueryTable: bigqueryTable, }, nil } func (b *bqClient) Read(ctx context.Context, query string) ([]string, error) { q := b.Client.Query(query) // Location must match that of the dataset(s) referenced in the query. q.Location = "US" // Run the query and print results when the query job is completed. it, err := q.Read(ctx) if err != nil { return nil, err } return toJSONArrayString(it) } func toJSONArrayString(it *bigquery.RowIterator) ([]string, error) { data := make([]string, 0) for { var row []bigquery.Value err := it.Next(&row) if err == iterator.Done { break } if err != nil { return nil, err } if len(row) == 1 { data = append(data, row[0].(string)) } else { str, err := json.Marshal(row) if err != nil { return nil, err } data = append(data, string(str)) } } return data, nil } func (b *bqClient) GetKubeResource(ctx context.Context, projectID string, cluster *model.Cluster, input model.LoqRequest) ([]string, error) { query, err := b.buildQuery(projectID, cluster, input) if err != nil { return nil, gcperror.Wrap(err) } payload, err := b.Read(ctx, query) if err != nil { return nil, gcperror.Wrap(err) } return payload, nil } func (b *bqClient) buildQuery(projectID string, cluster *model.Cluster, input model.LoqRequest) (string, error) { queryBuilder := goqu.Dialect("mysql").Select("resource") if input.GetClusterEdgeID { queryBuilder = queryBuilder.SelectAppend("cluster_edge_id") } queryBuilder = queryBuilder.From(b.BigQueryTable) queryBuilder = queryBuilder. Where(goqu.C("project_id").Eq(projectID)). Where(goqu.C("k8s_group").Eq(input.Group)). Where(goqu.C("version").Eq(input.Version)). Where(goqu.C("kind").Eq(input.Kind)). Where(goqu.C("operation").Neq("delete")) if cluster != nil { queryBuilder = queryBuilder.Where(goqu.C("cluster_edge_id").Eq(cluster.ClusterEdgeID)) } if input.Name != nil { queryBuilder = queryBuilder.Where(goqu.C("name").Eq(*input.Name)) } if input.Namespace != nil { queryBuilder = queryBuilder.Where(goqu.C("namespace").Eq(*input.Namespace)) } query, _, err := queryBuilder.ToSQL() return query, err } // BQClient is used to read kube resources from gcp bigquery // //go:generate mockgen -destination=../mocks/mock_big_query_client.go -package=mocks edge-infra.dev/pkg/edge/api/clients BQClient type BQClient interface { Read(ctx context.Context, query string) ([]string, error) GetKubeResource(ctx context.Context, projectID string, cluster *model.Cluster, input model.LoqRequest) ([]string, error) }