...

Source file src/github.com/GoogleCloudPlatform/k8s-config-connector/cmd/recorder/main.go

Documentation: github.com/GoogleCloudPlatform/k8s-config-connector/cmd/recorder

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package main
    16  
    17  import (
    18  	"context"
    19  	goflag "flag"
    20  	"fmt"
    21  	"net/http"
    22  	_ "net/http/pprof" // Needed to allow pprof server to accept requests
    23  	"os"
    24  	"time"
    25  
    26  	dclmetadata "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/dcl/metadata"
    27  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/gcp/profiler"
    28  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/gvks/supportedgvks"
    29  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
    30  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/logging"
    31  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/metrics"
    32  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/ready"
    33  	"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/servicemapping/servicemappingloader"
    34  
    35  	"github.com/prometheus/client_golang/prometheus"
    36  	"github.com/prometheus/client_golang/prometheus/promhttp"
    37  	flag "github.com/spf13/pflag"
    38  	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    39  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    40  	"k8s.io/apimachinery/pkg/runtime/schema"
    41  	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    42  	"k8s.io/klog/v2/klogr"
    43  	"sigs.k8s.io/controller-runtime/pkg/client"
    44  	"sigs.k8s.io/controller-runtime/pkg/client/config"
    45  	klog "sigs.k8s.io/controller-runtime/pkg/log"
    46  )
    47  
    48  const (
    49  	NumberOfWorkers    = 20
    50  	MaximumListResults = 50
    51  )
    52  
    53  var (
    54  	logger           = klog.Log.WithName("setup")
    55  	appliedResources = metrics.NewAppliedResourcesCollector()
    56  )
    57  
    58  func main() {
    59  
    60  	var (
    61  		prometheusScrapeEndpoint string
    62  		metricInterval           int
    63  		enablePprof              bool
    64  		pprofPort                int
    65  	)
    66  	flag.StringVar(&prometheusScrapeEndpoint, "prometheus-scrape-endpoint", ":8888", "configure the Prometheus scrape endpoint; :8888 as default")
    67  	flag.IntVar(&metricInterval, "metric-interval", 60, "the time interval of each recording in seconds")
    68  	flag.BoolVar(&enablePprof, "enable-pprof", false, "Enable the pprof server.")
    69  	flag.IntVar(&pprofPort, "pprof-port", 6060, "The port that the pprof server binds to if enabled.")
    70  	profiler.AddFlag(flag.CommandLine)
    71  	flag.CommandLine.AddGoFlagSet(goflag.CommandLine)
    72  	flag.Parse()
    73  	kccVersion := os.Getenv("CONFIG_CONNECTOR_VERSION")
    74  
    75  	klog.SetLogger(klogr.New())
    76  
    77  	logger.Info("Recording the stats of Config Connector resources")
    78  
    79  	// Start pprof server if enabled
    80  	if enablePprof {
    81  		go func() {
    82  			if err := http.ListenAndServe(fmt.Sprintf(":%d", pprofPort), nil); err != nil {
    83  				logger.Error(err, "error while running pprof server")
    84  			}
    85  		}()
    86  	}
    87  
    88  	// Start Cloud Profiler agent if enabled
    89  	if err := profiler.StartIfEnabled(); err != nil {
    90  		logging.Fatal(err, "error starting Cloud Profiler agent")
    91  	}
    92  
    93  	// Register the Prometheus metrics
    94  	prometheus.MustRegister(appliedResources)
    95  	prometheus.MustRegister(metrics.NewBuildInfoCollector(kccVersion))
    96  
    97  	// Expose the registered metrics via HTTP.
    98  	go func() {
    99  		http.Handle("/metrics", promhttp.Handler())
   100  		logging.Fatal(http.ListenAndServe(prometheusScrapeEndpoint, nil), "error registering the Prometheus HTTP handler")
   101  	}()
   102  
   103  	// Set up the HTTP server for the readiness probe
   104  	logger.Info("Setting container as ready...")
   105  	ready.SetContainerAsReady()
   106  	logger.Info("Container is ready.")
   107  
   108  	// Get a config to talk to the apiserver
   109  	cfg, err := config.GetConfig()
   110  	if err != nil {
   111  		logging.Fatal(err, "error getting configuration from APIServer.")
   112  	}
   113  
   114  	// Get a client to talk to the APIServer
   115  	c, err := client.New(cfg, client.Options{})
   116  	if err != nil {
   117  		logging.Fatal(err, "error getting client.")
   118  	}
   119  
   120  	smLoader, err := servicemappingloader.New()
   121  	if err != nil {
   122  		logging.Fatal(err, "error getting new service mapping loader")
   123  	}
   124  
   125  	supportedGVKs := supportedgvks.All(smLoader, dclmetadata.New())
   126  	for {
   127  		err := doRecord(c, supportedGVKs)
   128  		if err != nil {
   129  			logger.Error(err, "error recording metrics.")
   130  		}
   131  		time.Sleep(time.Duration(metricInterval) * time.Second)
   132  	}
   133  }
   134  
   135  func doRecord(c client.Client, gvks []schema.GroupVersionKind) error {
   136  	logger.Info("listing all CRDs managed by Config Connector.")
   137  
   138  	// reset all metrics in this vector before the new run of recording
   139  	appliedResources.Reset()
   140  	// worker pool with semaphore
   141  	sem := make(chan struct{}, NumberOfWorkers)
   142  	for _, gvk := range gvks {
   143  		gvk := gvk
   144  		sem <- struct{}{}
   145  		go func() {
   146  			defer func() { <-sem }()
   147  			err := recordMetricsForGVK(c, gvk)
   148  			if err != nil {
   149  				logger.Error(err, "error recording metrics for CRD %v: %v", gvk.String())
   150  			}
   151  		}()
   152  	}
   153  	for i := 0; i < NumberOfWorkers; i++ {
   154  		sem <- struct{}{}
   155  	}
   156  	logger.Info("finished one run of recording resource metrics.")
   157  	return nil
   158  }
   159  
   160  func forEach(c client.Client, gvk schema.GroupVersionKind, listOptions *client.ListOptions, fn func(unstructured.Unstructured) error) error {
   161  	for ok := true; ok; ok = listOptions.Continue != "" {
   162  		list := unstructured.UnstructuredList{}
   163  		list.SetGroupVersionKind(gvk)
   164  		err := c.List(context.Background(), &list, listOptions)
   165  		if err != nil {
   166  			return fmt.Errorf("error listing objects:%v", err)
   167  		}
   168  		for _, item := range list.Items {
   169  			if err := fn(item); err != nil {
   170  				return err
   171  			}
   172  		}
   173  		listOptions.Continue = list.GetContinue()
   174  	}
   175  	return nil
   176  }
   177  
   178  func recordMetricsForGVK(c client.Client, gvk schema.GroupVersionKind) error {
   179  	opts := &client.ListOptions{
   180  		Limit: MaximumListResults,
   181  		Raw:   &v1.ListOptions{},
   182  	}
   183  	statsNamespaceMap := make(map[string]*Stats)
   184  	if err := forEach(c, gvk, opts, func(obj unstructured.Unstructured) error {
   185  		namespace := obj.GetNamespace()
   186  		s := statsNamespaceMap[namespace]
   187  		if s == nil {
   188  			s = &Stats{make(map[string]int64)}
   189  			statsNamespaceMap[namespace] = s
   190  		}
   191  		lastCondition, err := getTheLastCondition(obj)
   192  		if err != nil {
   193  			logger.Error(err, "error getting the last condition for metrics for %v: %v", gvk.String())
   194  			return nil
   195  		}
   196  		s.countByStatus[lastCondition]++
   197  		return nil
   198  	}); err != nil {
   199  		return fmt.Errorf("error listing objects for %v: %w", gvk.String(), err)
   200  	}
   201  	for ns, stats := range statsNamespaceMap {
   202  		for status, count := range stats.countByStatus {
   203  			appliedResources.WithLabelValues(ns, gvk.GroupKind().String(), status).Set(float64(count))
   204  		}
   205  	}
   206  	return nil
   207  }
   208  
   209  type Stats struct {
   210  	countByStatus map[string]int64
   211  }
   212  
   213  // TODO: consolidate the logic with krmtotf.GetReadyCondition
   214  func getTheLastCondition(obj unstructured.Unstructured) (string, error) {
   215  	currConditionsRaw, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")
   216  	if err != nil {
   217  		return "", err
   218  	}
   219  	if !found || len(currConditionsRaw) == 0 {
   220  		return "NoCondition", nil
   221  	}
   222  
   223  	currConditions, err := k8s.MarshalAsConditionsSlice(currConditionsRaw)
   224  	if err != nil {
   225  		return "", err
   226  	}
   227  	if currConditions[0].Reason == "" {
   228  		return k8s.NoCondition, nil
   229  	}
   230  	return currConditions[0].Reason, nil
   231  }
   232  

View as plain text