1 package monitor
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "sync"
8 "time"
9
10 pubsub "cloud.google.com/go/pubsub"
11 kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1beta2"
12 sourceApi "github.com/fluxcd/source-controller/api/v1beta2"
13 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
15 "k8s.io/apimachinery/pkg/runtime"
16 "k8s.io/apimachinery/pkg/runtime/schema"
17 "k8s.io/client-go/discovery"
18 "k8s.io/client-go/dynamic"
19 "k8s.io/client-go/dynamic/dynamicinformer"
20 "k8s.io/client-go/tools/cache"
21
22 "edge-infra.dev/pkg/edge/api/graph/mapper"
23 "edge-infra.dev/pkg/edge/ctlfish"
24 "edge-infra.dev/pkg/edge/ctlfish/metrics"
25 "edge-infra.dev/pkg/edge/ctlfish/option"
26 kinformmapper "edge-infra.dev/pkg/f8n/kinform/mapper"
27 metastatus "edge-infra.dev/pkg/k8s/meta/status"
28 "edge-infra.dev/pkg/lib/logging"
29 )
30
31 var watches = map[string]string{}
32
33 type ClusterStatus struct {
34 Status string
35 Buckets map[string]BucketInfo
36 Kustomizations map[string]KustomizationInfo
37 Error *SyncError
38 NodeVersion string
39 }
40
41 type KustomizationInfo struct {
42 Path string
43 Source string
44 FluxStatus SyncInfo
45 }
46
47 type BucketInfo struct {
48 Excludes string
49 BucketName string
50 FluxStatus SyncInfo
51 }
52
53 type SyncInfo struct {
54 LastUpdated string
55 Revision string
56 StatusMessage string
57 Error bool
58 Suspended bool
59 }
60
61 type SyncError struct {
62 Message string
63 ErrorType string
64 }
65
66 var KustomizationMap = map[string]KustomizationInfo{}
67 var BucketMap = map[string]BucketInfo{}
68 var clusterStatusMutex = sync.RWMutex{}
69
70 const (
71 createOp = "create"
72 updateOp = "update"
73 deleteOp = "delete"
74 ctlfishTopic = "ctlfish-pubsub"
75 bqTimestampFormat = "2006-01-02 15:04:05.999999"
76 )
77
78 type CtlfishWatcher struct {
79 cs dynamic.Interface
80 dClient discovery.DiscoveryInterface
81 logger *logging.EdgeLogger
82 factory dynamicinformer.DynamicSharedInformerFactory
83 cfg *option.MetricConfig
84 psClient *pubsub.Client
85 }
86
87
88
89
90 func (c *CtlfishWatcher) DynamicMetricsInformation(resource schema.GroupVersionResource) error {
91 informer := c.factory.ForResource(resource).Informer()
92
93 if c.cfg.IsWatched(resource) {
94 _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
95 AddFunc: func(obj interface{}) {
96 c.processUpdate(obj, resource, createOp)
97 },
98 UpdateFunc: func(_, newObj interface{}) {
99 c.processUpdate(newObj, resource, updateOp)
100 },
101 DeleteFunc: func(obj interface{}) {
102 c.processUpdate(obj, resource, deleteOp)
103 },
104 })
105 return err
106 }
107
108 _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
109 DeleteFunc: func(obj interface{}) {
110 c.processUpdate(obj, resource, deleteOp)
111 },
112 })
113 return err
114 }
115
116 func (c *CtlfishWatcher) processUpdate(item interface{}, resource schema.GroupVersionResource, opType string) {
117 castItem := item.(*unstructured.Unstructured)
118 castItem = sanitizeResource(castItem)
119 jsonPrint, err := castItem.MarshalJSON()
120 if err != nil {
121 c.logger.Error(err, "Failed to Marshal the Resource", "gvr", resource)
122 }
123 if castItem.GetAPIVersion() == "" {
124 c.logger.Info("Missing APIVersion, skipping log...")
125 } else {
126 gvk := castItem.GetObjectKind().GroupVersionKind()
127 c.sendPubSubMessage(c.logger, gvk, castItem.GetName(), castItem.GetNamespace(), opType, jsonPrint)
128 c.handleFluxResource(resource, castItem, opType)
129 switch opType {
130 case "create":
131 metrics.CtlfishResourceCreations.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Inc()
132 case "update":
133 metrics.CtlfishResourceUpdates.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Inc()
134 default:
135 metrics.CtlfishResourceDeletions.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Inc()
136 }
137 }
138 }
139
140 func (c *CtlfishWatcher) handleFluxResource(resource schema.GroupVersionResource, castItem *unstructured.Unstructured, opType string) {
141 if isFluxResource(resource) {
142 clusterStatusMutex.Lock()
143
144 updateFluxObjectsStatusMap(c.logger, castItem, opType)
145
146 kubeVersion := getKubeVersionFromNodes(c.cs, c.logger)
147
148 res := createClusterStatus(KustomizationMap, BucketMap, kubeVersion)
149
150 converted, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&res)
151 if err != nil {
152 c.logger.Error(err, "failed to convert clusterStatus to unstructured")
153 }
154 data := &unstructured.Unstructured{
155 Object: converted,
156 }
157
158 jsonPrint, err := data.MarshalJSON()
159 if err != nil {
160 c.logger.Error(err, "failed to marshal the resource", "clusterStatus", data)
161 }
162
163 c.sendPubSubMessage(c.logger, schema.GroupVersionKind{Group: ctlfish.ClusterStatusGroup, Version: ctlfish.ClusterStatusVersion, Kind: ctlfish.ClusterStatusKind}, ctlfish.ClusterStatusName, ctlfish.ClusterStatusNamespace, ctlfish.ClusterStatusOperation, jsonPrint)
164 clusterStatusMutex.Unlock()
165 }
166 }
167
168 func (c *CtlfishWatcher) sendPubSubMessage(logger *logging.EdgeLogger, gvk schema.GroupVersionKind, name, namespace, opType string, jsonPrint []byte) {
169 if !c.cfg.PubSubActive {
170 return
171 }
172 attys := c.createMap(gvk, name, namespace, opType)
173 message := c.createMap(gvk, name, namespace, opType)
174 message["resource"] = string(jsonPrint)
175 mdata, err := json.Marshal(message)
176 if err != nil {
177 logger.Error(err, "failed to marshall message", "data", message)
178 }
179 ressy := c.psClient.Topic(ctlfishTopic).Publish(context.Background(), &pubsub.Message{
180 Data: mdata,
181 Attributes: attys,
182 })
183 _, err = ressy.Get(context.Background())
184 if err != nil {
185 logger.Error(err, "failed to send message", "data", mdata)
186 }
187 }
188
189 func (c *CtlfishWatcher) createMap(gvk schema.GroupVersionKind, name, namespace, opType string) map[string]string {
190 return map[string]string{
191 "timestamp": time.Now().Format(bqTimestampFormat),
192 "cluster_name": c.cfg.ClusterName,
193 "project_id": c.cfg.ProjectID,
194 "group": gvk.Group,
195 "version": gvk.Version,
196 "kind": gvk.Kind,
197 "name": name,
198 "namespace": namespace,
199 "operation": opType,
200 "cluster_edge_id": c.cfg.ClusterEdgeID}
201 }
202
203 func getKubeVersionFromNodes(cs dynamic.Interface, logger *logging.EdgeLogger) string {
204 nodesResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"}
205 nodesRes, err := cs.Resource(nodesResource).Namespace("").List(context.Background(), metav1.ListOptions{})
206 if err != nil {
207 logger.Error(err, "failed to get nodes")
208 } else {
209 nodes, err := mapper.ToConvertUnstructuredToNode(nodesRes)
210 if err != nil {
211 logger.Error(err, "failed to convert unstructured to nodes")
212 }
213 if len(nodes) > 0 {
214 return nodes[0].Status.NodeInfo.KubeletVersion
215 }
216 }
217 return ""
218 }
219
220
221 func updateFluxObjectsStatusMap(logger *logging.EdgeLogger, obj *unstructured.Unstructured, opType string) {
222 if obj.GetKind() == "Kustomization" {
223
224 if opType == deleteOp {
225 delete(KustomizationMap, obj.GetName())
226 return
227 }
228 kustomization, err := kinformmapper.ToConvertUnstructuredToKustomization(obj)
229 if err != nil {
230 logger.Error(err, "failed to convert unstructured to kustomization")
231 }
232 valid, status := getKustomizationStatusInfo(kustomization)
233 if valid {
234 KustomizationMap[obj.GetName()] = status
235 }
236 return
237 }
238
239 if opType == deleteOp {
240 delete(BucketMap, obj.GetName())
241 return
242 }
243 bucket, err := kinformmapper.ToConvertUnstructuredToBucket(obj)
244 if err != nil {
245 logger.Error(err, "failed to convert unstructured to bucket")
246 }
247 valid, status := getBucketStatusInfo(bucket)
248 if valid {
249 BucketMap[obj.GetName()] = status
250 }
251 }
252
253 func createClusterStatus(kustomizationMap map[string]KustomizationInfo, bucketMap map[string]BucketInfo, version string) ClusterStatus {
254 status := metastatus.ReadyCondition
255 bucketMessage := ""
256 kustomizationMessage := ""
257 for _, syncInfo := range kustomizationMap {
258 if syncInfo.FluxStatus.Error {
259 status = mapper.SyncingError
260 kustomizationMessage = fmt.Sprintf("%s, %s", syncInfo.FluxStatus.StatusMessage, kustomizationMessage)
261 }
262 }
263 for _, syncInfo := range bucketMap {
264 if syncInfo.FluxStatus.Error {
265 status = mapper.BucketError
266 bucketMessage = fmt.Sprintf("%s, %s", syncInfo.FluxStatus.StatusMessage, bucketMessage)
267 }
268 }
269 var err *SyncError
270 err = nil
271 if kustomizationMessage != "" {
272 err = &SyncError{
273 Message: kustomizationMessage,
274 ErrorType: "Kustomization",
275 }
276 }
277 if bucketMessage != "" {
278 message := bucketMessage
279 if err != nil {
280 message = fmt.Sprintf("%s, %s", message, err.Message)
281 }
282 err = &SyncError{
283 Message: message,
284 ErrorType: "Bucket",
285 }
286 }
287
288 return ClusterStatus{
289 Status: status,
290 Buckets: BucketMap,
291 Kustomizations: KustomizationMap,
292 Error: err,
293 NodeVersion: version,
294 }
295 }
296
297 func getBucketRevision(status sourceApi.BucketStatus) string {
298 if status.Artifact != nil {
299 return status.Artifact.Revision
300 }
301 return ""
302 }
303
304 func getKustomizationStatusInfo(kustomization *kustomizeApi.Kustomization) (bool, KustomizationInfo) {
305 valid, status := getStatusFromCondition(kustomization.Status.Conditions)
306 if valid {
307 status.Suspended = kustomization.Spec.Suspend
308 status.Revision = kustomization.Status.LastAppliedRevision
309 return true, KustomizationInfo{
310 Path: kustomization.Spec.Path,
311 Source: kustomization.Spec.SourceRef.Name,
312 FluxStatus: status,
313 }
314 }
315 return false, KustomizationInfo{}
316 }
317
318 func getBucketStatusInfo(bucket *sourceApi.Bucket) (bool, BucketInfo) {
319 valid, status := getStatusFromCondition(bucket.Status.Conditions)
320 if valid {
321 status.Suspended = bucket.Spec.Suspend
322 status.Revision = getBucketRevision(bucket.Status)
323 excludes := ""
324 if bucket.Spec.Ignore != nil {
325 excludes = *bucket.Spec.Ignore
326 }
327 return true, BucketInfo{
328 Excludes: excludes,
329 BucketName: bucket.Spec.BucketName,
330 FluxStatus: status,
331 }
332 }
333 return false, BucketInfo{}
334 }
335
336
337 func getStatusFromCondition(conditions []metav1.Condition) (bool, SyncInfo) {
338 for _, condition := range conditions {
339 if condition.Type == metastatus.ReadyCondition {
340 if condition.Status == metav1.ConditionTrue {
341 return true, SyncInfo{
342 LastUpdated: condition.LastTransitionTime.String(),
343 Revision: "",
344 StatusMessage: fmt.Sprintf("%s: %s", condition.Reason, condition.Message),
345 Error: false,
346 }
347 }
348 if condition.Status == metav1.ConditionFalse {
349 return true, SyncInfo{
350 LastUpdated: condition.LastTransitionTime.String(),
351 Revision: "",
352 StatusMessage: fmt.Sprintf("%s: %s", condition.Reason, condition.Message),
353 Error: true,
354 }
355 }
356 }
357 }
358 return false, SyncInfo{}
359 }
360
361
362 func (c *CtlfishWatcher) GetAllResources(ctx context.Context) {
363 _, groups, err := c.dClient.ServerGroupsAndResources()
364
365 groups = discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"list", "watch", "get"}}, groups)
366 if err != nil {
367 c.logger.Error(err, "failed to get resources with discovery client")
368 }
369
370 for _, group := range groups {
371 gv, err := schema.ParseGroupVersion(group.GroupVersion)
372 for _, resource := range group.APIResources {
373 gvr := gv.WithResource(resource.Name)
374 if _, ok := watches[gvr.String()]; c.cfg.IsWatched(gvr) && !ok {
375 if err != nil {
376 c.logger.Error(err, "failed to parse group version", "resource", resource)
377 }
378 if err := c.DynamicMetricsInformation(gvr); err != nil {
379 c.logger.Error(err, "failed to add event handlers to informer", "resource", resource)
380 }
381 watches[gvr.String()] = "done"
382 }
383
384 if c.cfg.IsMonitored(gvr) {
385 list, err := c.cs.Resource(gvr).List(ctx, metav1.ListOptions{})
386 if err != nil {
387 c.logger.Error(err, "failed to list resource", "resource", resource)
388 continue
389 }
390 for i := range list.Items {
391 c.processUpdate(&list.Items[i], gvr, "update")
392 }
393 }
394 }
395 }
396 }
397
398 func isFluxResource(resource schema.GroupVersionResource) bool {
399 switch resource.Resource {
400 case "buckets":
401 return true
402 case "kustomizations":
403 return true
404 default:
405 return false
406 }
407 }
408
409
410 func sanitizeResource(item *unstructured.Unstructured) *unstructured.Unstructured {
411 switch kind := item.GetObjectKind().GroupVersionKind().Kind; kind {
412 case "Secret":
413 return sanitizeSecret(item)
414 default:
415 return item
416 }
417 }
418
419
420
421 func sanitizeSecret(secret *unstructured.Unstructured) *unstructured.Unstructured {
422 secretObject := secret.Object
423 secretAnnotationsMap, foundAnnotations, err := unstructured.NestedMap(secretObject, "metadata", "annotations")
424 if err != nil {
425 return nil
426 }
427 if foundAnnotations {
428 for key := range secretAnnotationsMap {
429 if key == "kubectl.kubernetes.io/last-applied-configuration" {
430 secretAnnotationsMap[key] = nil
431 }
432 }
433 secretObject["metadata"] = secretAnnotationsMap
434 secret.SetUnstructuredContent(secretObject)
435 }
436
437 secretDataMap, foundData, err := unstructured.NestedMap(secretObject, "data")
438 if err != nil {
439 return nil
440 }
441 if foundData {
442 for key := range secretDataMap {
443 secretDataMap[key] = nil
444 }
445 secretObject["data"] = secretDataMap
446 secret.SetUnstructuredContent(secretObject)
447 }
448 return secret
449 }
450
451 func NewWatcher(cs dynamic.Interface, dClient discovery.DiscoveryInterface, logger *logging.EdgeLogger, factory dynamicinformer.DynamicSharedInformerFactory, cfg *option.MetricConfig, pClient *pubsub.Client) *CtlfishWatcher {
452 return &CtlfishWatcher{
453 cs: cs,
454 dClient: dClient,
455 logger: logger,
456 factory: factory,
457 cfg: cfg,
458 psClient: pClient,
459 }
460 }
461
View as plain text