...

Source file src/edge-infra.dev/pkg/edge/monitoring/k8s/controllers/prometheusctl/prometheus_stackdriver_controller.go

Documentation: edge-infra.dev/pkg/edge/monitoring/k8s/controllers/prometheusctl

     1  package prometheusctl
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"regexp"
     7  	"strings"
     8  
     9  	monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
    10  
    11  	corev1 "k8s.io/api/core/v1"
    12  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    13  	"k8s.io/apimachinery/pkg/types"
    14  	ctrl "sigs.k8s.io/controller-runtime"
    15  	"sigs.k8s.io/controller-runtime/pkg/client"
    16  	"sigs.k8s.io/controller-runtime/pkg/handler"
    17  
    18  	"edge-infra.dev/pkg/edge/bsl"
    19  	"edge-infra.dev/pkg/edge/constants/api/cluster"
    20  	"edge-infra.dev/pkg/edge/info"
    21  	eclient "edge-infra.dev/pkg/k8s/runtime/client"
    22  )
    23  
    24  // +kubebuilder:rbac:groups=monitoring.coreos.com,resources=prometheuses,verbs=get;list;watch;patch
    25  // +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch
    26  // +kubebuilder:rbac:groups=monitoring.coreos.com,resources=servicemonitors,verbs=get;list;watch
    27  // +kubebuilder:rbac:groups=monitoring.coreos.com,resources=podmonitors,verbs=get;list;watch
    28  
    29  const allowedMetrics = "monitoring.edge.ncr.com/allowed-metrics"
    30  
    31  // PrometheusStackdriverReconciler reconciles all prometheus objects
    32  type PrometheusStackdriverReconciler struct {
    33  	Name string
    34  	client.Client
    35  	config
    36  	metricsList map[string]bool
    37  }
    38  
    39  // SetupWithManager sets up PrometheusStackdriverReconciler with the manager
    40  func (r *PrometheusStackdriverReconciler) SetupWithManager(mgr ctrl.Manager) error {
    41  	if r.Name == "" {
    42  		r.Name = "prometheusctl"
    43  	}
    44  
    45  	return ctrl.NewControllerManagedBy(mgr).
    46  		For(&monitoringv1.Prometheus{}).
    47  		Watches(&monitoringv1.ServiceMonitor{}, &handler.EnqueueRequestForObject{}).
    48  		Watches(&monitoringv1.PodMonitor{}, &handler.EnqueueRequestForObject{}).
    49  		WithEventFilter(predicates(r.prometheus)).
    50  		Complete(r)
    51  }
    52  
    53  // Reconcile is part of the main kubernetes reconciliation loop which aims to
    54  // move the current state of the cluster closer to the desired state.
    55  //
    56  // For more details, check Reconcile and its Result here:
    57  // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.8.3/pkg/reconcile
    58  func (r *PrometheusStackdriverReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
    59  	// decorate logger with info about the prometheus that we are patching
    60  	// the context provides information about the resource that is triggering the
    61  	// reconcile
    62  	log := ctrl.LoggerFrom(ctx).WithValues("prometheus", r.prometheus)
    63  	ctrl.LoggerInto(ctx, log)
    64  
    65  	// Resetting/creating metrics map
    66  	r.metricsList = make(map[string]bool)
    67  
    68  	// Fetching list of service monitors
    69  	serviceMonitorList := &monitoringv1.ServiceMonitorList{}
    70  	if err := r.List(ctx, serviceMonitorList); err != nil {
    71  		log.Error(err, "failed to get service monitors")
    72  		return ctrl.Result{RequeueAfter: r.reconcileInterval}, err
    73  	}
    74  
    75  	// Appending metrics from service monitor to metrics map
    76  	for _, serviceMonitor := range serviceMonitorList.Items {
    77  		r.fetchMetricsAnnotations(serviceMonitor.ObjectMeta)
    78  	}
    79  
    80  	// Fetching list of pod monitors
    81  	podMonitorList := &monitoringv1.PodMonitorList{}
    82  	if err := r.List(ctx, podMonitorList); err != nil {
    83  		log.Error(err, "failed to get pod monitors")
    84  		return ctrl.Result{RequeueAfter: r.reconcileInterval}, err
    85  	}
    86  
    87  	// Appending metrics from pod monitor to metrics map
    88  	for _, podMonitor := range podMonitorList.Items {
    89  		r.fetchMetricsAnnotations(podMonitor.ObjectMeta)
    90  	}
    91  
    92  	if err := r.patchPrometheus(ctx); err != nil {
    93  		log.Error(err, "failed to patch prometheus resource")
    94  		return ctrl.Result{RequeueAfter: r.reconcileInterval}, err
    95  	}
    96  	log.Info("reconciler has patched prometheus based on changes to prometheus CRD")
    97  
    98  	return ctrl.Result{}, nil
    99  }
   100  
   101  func (r *PrometheusStackdriverReconciler) patchPrometheus(ctx context.Context) error {
   102  	log := ctrl.LoggerFrom(ctx)
   103  	prometheus := &monitoringv1.Prometheus{}
   104  	if err := r.Get(ctx, eclient.ObjectKeyFromRef(r.prometheus), prometheus); err != nil {
   105  		return fmt.Errorf("failed to get object %s: %w", r.prometheus, err)
   106  	}
   107  
   108  	patch := client.MergeFrom(prometheus.DeepCopy())
   109  	prometheusCopy := prometheus.DeepCopy()
   110  
   111  	container, err := prometheusContainer(prometheus.Spec.Containers)
   112  	if err != nil {
   113  		container = corev1.Container{
   114  			Name: "prometheus",
   115  		}
   116  	}
   117  
   118  	// args we always want passed to the prometheus binary
   119  	// they may exist already, but we will dedupe them at the end
   120  	container.Args = []string{
   121  		"--enable-feature=new-service-discovery-manager",
   122  		"--enable-feature=no-default-scrape-port",
   123  		"--storage.tsdb.max-block-duration=30m",
   124  		"--storage.tsdb.path=/prometheus/data",
   125  		"--storage.tsdb.retention.time=4d",
   126  		"--log.format=json",
   127  		"--config.file=/etc/prometheus/config_out/prometheus.env.yaml",
   128  		"--web.enable-lifecycle",
   129  		"--export.compression=gzip",
   130  	}
   131  	for metric := range r.metricsList {
   132  		container.Args = append(container.Args, metricArg(metric))
   133  	}
   134  
   135  	if cluster.Type(r.clusterProvider) != cluster.GKE {
   136  		log.Info("adding gcp api key configuration for non-GKE cluster",
   137  			"cluster-type", r.clusterProvider)
   138  		container.Env = envVars()
   139  		prometheus.Spec.Volumes, prometheus.Spec.VolumeMounts = volumeCfg()
   140  	}
   141  
   142  	cfgEdge := &corev1.ConfigMap{}
   143  	edgeNamespacedName := types.NamespacedName{
   144  		Name:      info.EdgeConfigMapName,
   145  		Namespace: info.EdgeConfigMapNS,
   146  	}
   147  
   148  	// append labels if edge-info configmap is present
   149  	if err := r.Get(ctx, edgeNamespacedName, cfgEdge); err != nil {
   150  		log.Info("edge-info unavailable in cluster")
   151  	} else if info.ValidateConfigMap(cfgEdge) == nil {
   152  		log.Info("appending edge-info labels to externalLabels")
   153  		edgeInfo := info.FromConfigMap(cfgEdge)
   154  		prometheus.Spec.ExternalLabels["banner_id"] = edgeInfo.BannerEdgeID
   155  		prometheus.Spec.ExternalLabels["banner_name"] = edgeInfo.BannerName
   156  		prometheus.Spec.ExternalLabels["store_name"] = edgeInfo.Store
   157  		prometheus.Spec.ExternalLabels["cluster_fleet"] = edgeInfo.Fleet
   158  	}
   159  
   160  	cfgBSL := &corev1.ConfigMap{}
   161  	bslNamespacedName := types.NamespacedName{
   162  		Name:      bsl.BSLInfoConfigMapName,
   163  		Namespace: metav1.NamespacePublic,
   164  	}
   165  
   166  	// append the organization_id label if bsl-info configmap is present
   167  	if err := r.Get(ctx, bslNamespacedName, cfgBSL); err != nil {
   168  		log.Info("bsl-info unavailable in cluster")
   169  	} else if bslInfo, err := bsl.FromConfigMap(cfgBSL); err == nil {
   170  		log.Info("appending bsl-info labels to externalLabels")
   171  		if bslInfo.OrganizationID != "" {
   172  			prometheus.Spec.ExternalLabels["organization_id"] = bslInfo.OrganizationID
   173  			prometheus.Spec.ExternalLabels["enterprise_unit_id"] = bslInfo.ID
   174  		}
   175  	}
   176  
   177  	// dedupe arguments in case some things are already set
   178  	container.Args = dedupe(container.Args)
   179  
   180  	// update prometheus spec with new container
   181  	prometheus.Spec.Containers = []corev1.Container{container}
   182  	for _, c := range prometheusCopy.Spec.Containers {
   183  		if c.Name != "prometheus" {
   184  			prometheus.Spec.Containers = append(prometheus.Spec.Containers, c)
   185  		}
   186  	}
   187  
   188  	// patch resource
   189  	return r.Patch(ctx, prometheus, patch, client.FieldOwner(r.Name))
   190  }
   191  
   192  func (r *PrometheusStackdriverReconciler) fetchMetricsAnnotations(objMeta metav1.ObjectMeta) {
   193  	if !metav1.HasAnnotation(objMeta, allowedMetrics) {
   194  		return
   195  	}
   196  
   197  	for _, metric := range strings.Split(objMeta.Annotations[allowedMetrics], "\n") {
   198  		if metric != "" {
   199  			r.metricsList[metric] = true
   200  		}
   201  	}
   202  }
   203  
   204  // checks to see if string provided is a metric name format
   205  func isMetricName(name string) bool {
   206  	r, _ := regexp.Compile(`{[\w_]+[=~!]+"[\w\W]+"}`)
   207  	return !r.MatchString(name)
   208  }
   209  
   210  func metricArg(name string) string {
   211  	if !isMetricName(name) {
   212  		return fmt.Sprintf("--export.match=%s", name)
   213  	}
   214  
   215  	return fmt.Sprintf("--export.match={__name__=\"%s\"}", name)
   216  }
   217  
   218  func prometheusContainer(cc []corev1.Container) (corev1.Container, error) {
   219  	for _, c := range cc {
   220  		if c.Name == "prometheus" {
   221  			return c, nil
   222  		}
   223  	}
   224  	return corev1.Container{}, fmt.Errorf("no prometheus container found")
   225  }
   226  
   227  func dedupe(ss []string) []string {
   228  	visited := make(map[string]bool, len(ss))
   229  	var result []string
   230  	for _, s := range ss {
   231  		if !visited[s] {
   232  			result = append(result, s)
   233  		}
   234  		visited[s] = true
   235  	}
   236  	return result
   237  }
   238  
   239  // volumeCfg returns the volume configuration for mounting a GCP API key
   240  func volumeCfg() ([]corev1.Volume, []corev1.VolumeMount) {
   241  	return []corev1.Volume{
   242  			{
   243  				Name: "gcp-api-key", VolumeSource: corev1.VolumeSource{
   244  					Secret: &corev1.SecretVolumeSource{SecretName: "gcp-api-key"},
   245  				},
   246  			},
   247  		}, []corev1.VolumeMount{
   248  			{
   249  				Name:      "gcp-api-key",
   250  				MountPath: "/var/secrets/google",
   251  			},
   252  		}
   253  }
   254  
   255  // envVars returns the container env vars for loading a GCP API key
   256  func envVars() []corev1.EnvVar {
   257  	return []corev1.EnvVar{
   258  		{
   259  			Name:  "GOOGLE_APPLICATION_CREDENTIALS",
   260  			Value: "/var/secrets/google/key.json",
   261  		},
   262  	}
   263  }
   264  

View as plain text