...

Source file src/edge-infra.dev/pkg/edge/monitoring/billman/gcp/gcp.go

Documentation: edge-infra.dev/pkg/edge/monitoring/billman/gcp

     1  // Package gcp calls the GCP metrics api to return data
     2  // about metric samples and logs ingested for edge clusters.
     3  package gcp
     4  
     5  import (
     6  	"context"
     7  	"fmt"
     8  
     9  	monitoring "cloud.google.com/go/monitoring/apiv3/v2"
    10  	monitoringpb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
    11  	"google.golang.org/api/iterator"
    12  )
    13  
    14  // GetMetricSamples uses the write_sample_count metric from
    15  // the MetricIngestionAttribution resource to determine the
    16  // number of metric samples can be attributed to a given
    17  // cluster id.
    18  //
    19  // ref: https://cloud.google.com/stackdriver/docs/managed-prometheus/cost-controls
    20  func GetMetricSamples(scopingProject string, clusterID string, interval string) (int64, error) {
    21  	ctx := context.Background()
    22  	client, err := monitoring.NewQueryClient(ctx)
    23  	if err != nil {
    24  		return 0.0, fmt.Errorf("NewQueryClient for metrics: %v", err)
    25  	}
    26  	defer client.Close()
    27  
    28  	query := "fetch monitoring.googleapis.com/MetricIngestionAttribution" +
    29  		" | metric 'monitoring.googleapis.com/collection/attribution/write_sample_count'" +
    30  		" | filter (resource.attribution_dimension == 'cluster' && resource.attribution_id == \"" + clusterID + "\")" +
    31  		" | group_by " + interval + ", [samples_agg: aggregate(value.write_sample_count)]" +
    32  		" | every 5m " +
    33  		" | group_by [resource.attribution_id, resource.resource_container] "
    34  
    35  	req := &monitoringpb.QueryTimeSeriesRequest{
    36  		Name:  "projects/" + scopingProject,
    37  		Query: query,
    38  	}
    39  
    40  	var bytes int64
    41  	it := client.QueryTimeSeries(ctx, req)
    42  	for {
    43  		resp, err := it.Next()
    44  		if err == iterator.Done {
    45  			break
    46  		}
    47  		if err != nil {
    48  			return 0, fmt.Errorf("could not read time series value: %v", err)
    49  		}
    50  		if len(resp.GetPointData()) > 0 {
    51  			bytes = resp.GetPointData()[0].GetValues()[0].GetInt64Value()
    52  		}
    53  	}
    54  	return bytes, nil
    55  }
    56  
    57  // GetLogBytes uses the byte_count to deterine the number of
    58  // log bytes ingested can be attributed to a given cluster id.
    59  func GetLogBytes(scopingProject string, clusterID string, interval string, logType string) (int64, error) {
    60  	ctx := context.Background()
    61  	client, err := monitoring.NewQueryClient(ctx)
    62  	if err != nil {
    63  		return 0.0, fmt.Errorf("NewQueryClient for logs: %v", err)
    64  	}
    65  	defer client.Close()
    66  
    67  	query := "fetch " + logType +
    68  		"| metric 'logging.googleapis.com/byte_count'" +
    69  		"| filter (resource.project_id == \"" + scopingProject + "\" && resource.cluster_name == \"" + clusterID + "\")" +
    70  		"| group_by " + interval + ", [byte_count_agg:aggregate(value.byte_count)]" +
    71  		"| every 5m" +
    72  		"| group_by [resource.cluster_name], [aggregate(byte_count_agg)]"
    73  
    74  	req := &monitoringpb.QueryTimeSeriesRequest{
    75  		Name:  "projects/" + scopingProject,
    76  		Query: query,
    77  	}
    78  
    79  	var bytes int64
    80  	it := client.QueryTimeSeries(ctx, req)
    81  	for {
    82  		resp, err := it.Next()
    83  		if err == iterator.Done {
    84  			break
    85  		}
    86  		if err != nil {
    87  			return 0, fmt.Errorf("could not read time series value: %v", err)
    88  		}
    89  		if len(resp.GetPointData()) > 0 {
    90  			bytes = resp.GetPointData()[0].GetValues()[0].GetInt64Value()
    91  		}
    92  	}
    93  	return bytes, nil
    94  }
    95  

View as plain text