1 package clients
2
3 import (
4 "context"
5 "encoding/json"
6
7 gcperror "edge-infra.dev/pkg/edge/api/apierror/gcp"
8 "edge-infra.dev/pkg/edge/api/graph/model"
9
10 "cloud.google.com/go/bigquery"
11 "github.com/doug-martin/goqu/v9"
12 "google.golang.org/api/iterator"
13 )
14
15 type bqClient struct {
16 Client *bigquery.Client
17 ProjectID string
18 BigQueryTable string
19 }
20
21 func New(ctx context.Context, projectID, bigqueryTable string) (BQClient, error) {
22 client, err := bigquery.NewClient(ctx, projectID)
23 if err != nil {
24 return nil, err
25 }
26
27 return &bqClient{
28 Client: client,
29 ProjectID: projectID,
30 BigQueryTable: bigqueryTable,
31 }, nil
32 }
33
34 func (b *bqClient) Read(ctx context.Context, query string) ([]string, error) {
35 q := b.Client.Query(query)
36
37 q.Location = "US"
38
39 it, err := q.Read(ctx)
40 if err != nil {
41 return nil, err
42 }
43 return toJSONArrayString(it)
44 }
45
46 func toJSONArrayString(it *bigquery.RowIterator) ([]string, error) {
47 data := make([]string, 0)
48 for {
49 var row []bigquery.Value
50 err := it.Next(&row)
51 if err == iterator.Done {
52 break
53 }
54 if err != nil {
55 return nil, err
56 }
57 if len(row) == 1 {
58 data = append(data, row[0].(string))
59 } else {
60 str, err := json.Marshal(row)
61 if err != nil {
62 return nil, err
63 }
64 data = append(data, string(str))
65 }
66 }
67 return data, nil
68 }
69
70 func (b *bqClient) GetKubeResource(ctx context.Context, projectID string, cluster *model.Cluster, input model.LoqRequest) ([]string, error) {
71 query, err := b.buildQuery(projectID, cluster, input)
72 if err != nil {
73 return nil, gcperror.Wrap(err)
74 }
75 payload, err := b.Read(ctx, query)
76 if err != nil {
77 return nil, gcperror.Wrap(err)
78 }
79 return payload, nil
80 }
81
82 func (b *bqClient) buildQuery(projectID string, cluster *model.Cluster, input model.LoqRequest) (string, error) {
83 queryBuilder := goqu.Dialect("mysql").Select("resource")
84 if input.GetClusterEdgeID {
85 queryBuilder = queryBuilder.SelectAppend("cluster_edge_id")
86 }
87
88 queryBuilder = queryBuilder.From(b.BigQueryTable)
89
90 queryBuilder = queryBuilder.
91 Where(goqu.C("project_id").Eq(projectID)).
92 Where(goqu.C("k8s_group").Eq(input.Group)).
93 Where(goqu.C("version").Eq(input.Version)).
94 Where(goqu.C("kind").Eq(input.Kind)).
95 Where(goqu.C("operation").Neq("delete"))
96 if cluster != nil {
97 queryBuilder = queryBuilder.Where(goqu.C("cluster_edge_id").Eq(cluster.ClusterEdgeID))
98 }
99 if input.Name != nil {
100 queryBuilder = queryBuilder.Where(goqu.C("name").Eq(*input.Name))
101 }
102 if input.Namespace != nil {
103 queryBuilder = queryBuilder.Where(goqu.C("namespace").Eq(*input.Namespace))
104 }
105 query, _, err := queryBuilder.ToSQL()
106 return query, err
107 }
108
109
110
111
112 type BQClient interface {
113 Read(ctx context.Context, query string) ([]string, error)
114 GetKubeResource(ctx context.Context, projectID string, cluster *model.Cluster, input model.LoqRequest) ([]string, error)
115 }
116
View as plain text