1 package prometheusctl
2
3 import (
4 "context"
5 "fmt"
6 "regexp"
7 "strings"
8
9 monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
10
11 corev1 "k8s.io/api/core/v1"
12 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13 "k8s.io/apimachinery/pkg/types"
14 ctrl "sigs.k8s.io/controller-runtime"
15 "sigs.k8s.io/controller-runtime/pkg/client"
16 "sigs.k8s.io/controller-runtime/pkg/handler"
17
18 "edge-infra.dev/pkg/edge/bsl"
19 "edge-infra.dev/pkg/edge/constants/api/cluster"
20 "edge-infra.dev/pkg/edge/info"
21 eclient "edge-infra.dev/pkg/k8s/runtime/client"
22 )
23
24
25
26
27
28
29 const allowedMetrics = "monitoring.edge.ncr.com/allowed-metrics"
30
31
32 type PrometheusStackdriverReconciler struct {
33 Name string
34 client.Client
35 config
36 metricsList map[string]bool
37 }
38
39
40 func (r *PrometheusStackdriverReconciler) SetupWithManager(mgr ctrl.Manager) error {
41 if r.Name == "" {
42 r.Name = "prometheusctl"
43 }
44
45 return ctrl.NewControllerManagedBy(mgr).
46 For(&monitoringv1.Prometheus{}).
47 Watches(&monitoringv1.ServiceMonitor{}, &handler.EnqueueRequestForObject{}).
48 Watches(&monitoringv1.PodMonitor{}, &handler.EnqueueRequestForObject{}).
49 WithEventFilter(predicates(r.prometheus)).
50 Complete(r)
51 }
52
53
54
55
56
57
58 func (r *PrometheusStackdriverReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
59
60
61
62 log := ctrl.LoggerFrom(ctx).WithValues("prometheus", r.prometheus)
63 ctrl.LoggerInto(ctx, log)
64
65
66 r.metricsList = make(map[string]bool)
67
68
69 serviceMonitorList := &monitoringv1.ServiceMonitorList{}
70 if err := r.List(ctx, serviceMonitorList); err != nil {
71 log.Error(err, "failed to get service monitors")
72 return ctrl.Result{RequeueAfter: r.reconcileInterval}, err
73 }
74
75
76 for _, serviceMonitor := range serviceMonitorList.Items {
77 r.fetchMetricsAnnotations(serviceMonitor.ObjectMeta)
78 }
79
80
81 podMonitorList := &monitoringv1.PodMonitorList{}
82 if err := r.List(ctx, podMonitorList); err != nil {
83 log.Error(err, "failed to get pod monitors")
84 return ctrl.Result{RequeueAfter: r.reconcileInterval}, err
85 }
86
87
88 for _, podMonitor := range podMonitorList.Items {
89 r.fetchMetricsAnnotations(podMonitor.ObjectMeta)
90 }
91
92 if err := r.patchPrometheus(ctx); err != nil {
93 log.Error(err, "failed to patch prometheus resource")
94 return ctrl.Result{RequeueAfter: r.reconcileInterval}, err
95 }
96 log.Info("reconciler has patched prometheus based on changes to prometheus CRD")
97
98 return ctrl.Result{}, nil
99 }
100
101 func (r *PrometheusStackdriverReconciler) patchPrometheus(ctx context.Context) error {
102 log := ctrl.LoggerFrom(ctx)
103 prometheus := &monitoringv1.Prometheus{}
104 if err := r.Get(ctx, eclient.ObjectKeyFromRef(r.prometheus), prometheus); err != nil {
105 return fmt.Errorf("failed to get object %s: %w", r.prometheus, err)
106 }
107
108 patch := client.MergeFrom(prometheus.DeepCopy())
109 prometheusCopy := prometheus.DeepCopy()
110
111 container, err := prometheusContainer(prometheus.Spec.Containers)
112 if err != nil {
113 container = corev1.Container{
114 Name: "prometheus",
115 }
116 }
117
118
119
120 container.Args = []string{
121 "--enable-feature=new-service-discovery-manager",
122 "--enable-feature=no-default-scrape-port",
123 "--storage.tsdb.max-block-duration=30m",
124 "--storage.tsdb.path=/prometheus/data",
125 "--storage.tsdb.retention.time=4d",
126 "--log.format=json",
127 "--config.file=/etc/prometheus/config_out/prometheus.env.yaml",
128 "--web.enable-lifecycle",
129 "--export.compression=gzip",
130 }
131 for metric := range r.metricsList {
132 container.Args = append(container.Args, metricArg(metric))
133 }
134
135 if cluster.Type(r.clusterProvider) != cluster.GKE {
136 log.Info("adding gcp api key configuration for non-GKE cluster",
137 "cluster-type", r.clusterProvider)
138 container.Env = envVars()
139 prometheus.Spec.Volumes, prometheus.Spec.VolumeMounts = volumeCfg()
140 }
141
142 cfgEdge := &corev1.ConfigMap{}
143 edgeNamespacedName := types.NamespacedName{
144 Name: info.EdgeConfigMapName,
145 Namespace: info.EdgeConfigMapNS,
146 }
147
148
149 if err := r.Get(ctx, edgeNamespacedName, cfgEdge); err != nil {
150 log.Info("edge-info unavailable in cluster")
151 } else if info.ValidateConfigMap(cfgEdge) == nil {
152 log.Info("appending edge-info labels to externalLabels")
153 edgeInfo := info.FromConfigMap(cfgEdge)
154 prometheus.Spec.ExternalLabels["banner_id"] = edgeInfo.BannerEdgeID
155 prometheus.Spec.ExternalLabels["banner_name"] = edgeInfo.BannerName
156 prometheus.Spec.ExternalLabels["store_name"] = edgeInfo.Store
157 prometheus.Spec.ExternalLabels["cluster_fleet"] = edgeInfo.Fleet
158 }
159
160 cfgBSL := &corev1.ConfigMap{}
161 bslNamespacedName := types.NamespacedName{
162 Name: bsl.BSLInfoConfigMapName,
163 Namespace: metav1.NamespacePublic,
164 }
165
166
167 if err := r.Get(ctx, bslNamespacedName, cfgBSL); err != nil {
168 log.Info("bsl-info unavailable in cluster")
169 } else if bslInfo, err := bsl.FromConfigMap(cfgBSL); err == nil {
170 log.Info("appending bsl-info labels to externalLabels")
171 if bslInfo.OrganizationID != "" {
172 prometheus.Spec.ExternalLabels["organization_id"] = bslInfo.OrganizationID
173 prometheus.Spec.ExternalLabels["enterprise_unit_id"] = bslInfo.ID
174 }
175 }
176
177
178 container.Args = dedupe(container.Args)
179
180
181 prometheus.Spec.Containers = []corev1.Container{container}
182 for _, c := range prometheusCopy.Spec.Containers {
183 if c.Name != "prometheus" {
184 prometheus.Spec.Containers = append(prometheus.Spec.Containers, c)
185 }
186 }
187
188
189 return r.Patch(ctx, prometheus, patch, client.FieldOwner(r.Name))
190 }
191
192 func (r *PrometheusStackdriverReconciler) fetchMetricsAnnotations(objMeta metav1.ObjectMeta) {
193 if !metav1.HasAnnotation(objMeta, allowedMetrics) {
194 return
195 }
196
197 for _, metric := range strings.Split(objMeta.Annotations[allowedMetrics], "\n") {
198 if metric != "" {
199 r.metricsList[metric] = true
200 }
201 }
202 }
203
204
205 func isMetricName(name string) bool {
206 r, _ := regexp.Compile(`{[\w_]+[=~!]+"[\w\W]+"}`)
207 return !r.MatchString(name)
208 }
209
210 func metricArg(name string) string {
211 if !isMetricName(name) {
212 return fmt.Sprintf("--export.match=%s", name)
213 }
214
215 return fmt.Sprintf("--export.match={__name__=\"%s\"}", name)
216 }
217
218 func prometheusContainer(cc []corev1.Container) (corev1.Container, error) {
219 for _, c := range cc {
220 if c.Name == "prometheus" {
221 return c, nil
222 }
223 }
224 return corev1.Container{}, fmt.Errorf("no prometheus container found")
225 }
226
227 func dedupe(ss []string) []string {
228 visited := make(map[string]bool, len(ss))
229 var result []string
230 for _, s := range ss {
231 if !visited[s] {
232 result = append(result, s)
233 }
234 visited[s] = true
235 }
236 return result
237 }
238
239
240 func volumeCfg() ([]corev1.Volume, []corev1.VolumeMount) {
241 return []corev1.Volume{
242 {
243 Name: "gcp-api-key", VolumeSource: corev1.VolumeSource{
244 Secret: &corev1.SecretVolumeSource{SecretName: "gcp-api-key"},
245 },
246 },
247 }, []corev1.VolumeMount{
248 {
249 Name: "gcp-api-key",
250 MountPath: "/var/secrets/google",
251 },
252 }
253 }
254
255
256 func envVars() []corev1.EnvVar {
257 return []corev1.EnvVar{
258 {
259 Name: "GOOGLE_APPLICATION_CREDENTIALS",
260 Value: "/var/secrets/google/key.json",
261 },
262 }
263 }
264
View as plain text