package monitor import ( "context" "encoding/json" "fmt" "sync" "time" pubsub "cloud.google.com/go/pubsub" kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1beta2" sourceApi "github.com/fluxcd/source-controller/api/v1beta2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" "edge-infra.dev/pkg/edge/api/graph/mapper" "edge-infra.dev/pkg/edge/ctlfish" "edge-infra.dev/pkg/edge/ctlfish/metrics" "edge-infra.dev/pkg/edge/ctlfish/option" kinformmapper "edge-infra.dev/pkg/f8n/kinform/mapper" metastatus "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/lib/logging" ) var watches = map[string]string{} type ClusterStatus struct { Status string //(options: Ready, Syncing Error, Bucket Error) Buckets map[string]BucketInfo Kustomizations map[string]KustomizationInfo Error *SyncError NodeVersion string } type KustomizationInfo struct { Path string Source string FluxStatus SyncInfo } type BucketInfo struct { Excludes string BucketName string FluxStatus SyncInfo } type SyncInfo struct { LastUpdated string Revision string StatusMessage string Error bool Suspended bool } type SyncError struct { Message string //concatenated error message ErrorType string //(Bucket, Kustomization, both) } var KustomizationMap = map[string]KustomizationInfo{} var BucketMap = map[string]BucketInfo{} var clusterStatusMutex = sync.RWMutex{} const ( createOp = "create" updateOp = "update" deleteOp = "delete" ctlfishTopic = "ctlfish-pubsub" bqTimestampFormat = "2006-01-02 15:04:05.999999" ) type CtlfishWatcher struct { cs dynamic.Interface dClient discovery.DiscoveryInterface logger *logging.EdgeLogger factory dynamicinformer.DynamicSharedInformerFactory cfg *option.MetricConfig psClient *pubsub.Client } // DynamicMetricsInformation takes in a kubernetes dynamic client, a struct of the api group, the version, and the resource type, // and a struct of the resource type. It then queries the Kube API using the gvr struct and turns the unstructured object that gets returned // into its actual type using the struct type. That object then gets json marshalled and printed to std out func (c *CtlfishWatcher) DynamicMetricsInformation(resource schema.GroupVersionResource) error { informer := c.factory.ForResource(resource).Informer() if c.cfg.IsWatched(resource) { _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.processUpdate(obj, resource, createOp) }, UpdateFunc: func(_, newObj interface{}) { c.processUpdate(newObj, resource, updateOp) }, DeleteFunc: func(obj interface{}) { c.processUpdate(obj, resource, deleteOp) }, }) return err } _, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: func(obj interface{}) { c.processUpdate(obj, resource, deleteOp) }, }) return err } func (c *CtlfishWatcher) processUpdate(item interface{}, resource schema.GroupVersionResource, opType string) { castItem := item.(*unstructured.Unstructured) castItem = sanitizeResource(castItem) jsonPrint, err := castItem.MarshalJSON() if err != nil { c.logger.Error(err, "Failed to Marshal the Resource", "gvr", resource) } if castItem.GetAPIVersion() == "" { c.logger.Info("Missing APIVersion, skipping log...") //This was .Warn before find alternative } else { gvk := castItem.GetObjectKind().GroupVersionKind() c.sendPubSubMessage(c.logger, gvk, castItem.GetName(), castItem.GetNamespace(), opType, jsonPrint) c.handleFluxResource(resource, castItem, opType) switch opType { case "create": metrics.CtlfishResourceCreations.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Inc() case "update": metrics.CtlfishResourceUpdates.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Inc() default: metrics.CtlfishResourceDeletions.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Inc() } } } func (c *CtlfishWatcher) handleFluxResource(resource schema.GroupVersionResource, castItem *unstructured.Unstructured, opType string) { if isFluxResource(resource) { clusterStatusMutex.Lock() //update flux status from map updateFluxObjectsStatusMap(c.logger, castItem, opType) kubeVersion := getKubeVersionFromNodes(c.cs, c.logger) //create the cluster status object res := createClusterStatus(KustomizationMap, BucketMap, kubeVersion) converted, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&res) if err != nil { c.logger.Error(err, "failed to convert clusterStatus to unstructured") } data := &unstructured.Unstructured{ Object: converted, } jsonPrint, err := data.MarshalJSON() if err != nil { c.logger.Error(err, "failed to marshal the resource", "clusterStatus", data) } c.sendPubSubMessage(c.logger, schema.GroupVersionKind{Group: ctlfish.ClusterStatusGroup, Version: ctlfish.ClusterStatusVersion, Kind: ctlfish.ClusterStatusKind}, ctlfish.ClusterStatusName, ctlfish.ClusterStatusNamespace, ctlfish.ClusterStatusOperation, jsonPrint) clusterStatusMutex.Unlock() } } func (c *CtlfishWatcher) sendPubSubMessage(logger *logging.EdgeLogger, gvk schema.GroupVersionKind, name, namespace, opType string, jsonPrint []byte) { if !c.cfg.PubSubActive { return } attys := c.createMap(gvk, name, namespace, opType) message := c.createMap(gvk, name, namespace, opType) message["resource"] = string(jsonPrint) mdata, err := json.Marshal(message) if err != nil { logger.Error(err, "failed to marshall message", "data", message) } ressy := c.psClient.Topic(ctlfishTopic).Publish(context.Background(), &pubsub.Message{ Data: mdata, Attributes: attys, }) _, err = ressy.Get(context.Background()) if err != nil { logger.Error(err, "failed to send message", "data", mdata) } } func (c *CtlfishWatcher) createMap(gvk schema.GroupVersionKind, name, namespace, opType string) map[string]string { return map[string]string{ "timestamp": time.Now().Format(bqTimestampFormat), "cluster_name": c.cfg.ClusterName, "project_id": c.cfg.ProjectID, "group": gvk.Group, "version": gvk.Version, "kind": gvk.Kind, "name": name, "namespace": namespace, "operation": opType, "cluster_edge_id": c.cfg.ClusterEdgeID} } func getKubeVersionFromNodes(cs dynamic.Interface, logger *logging.EdgeLogger) string { nodesResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"} nodesRes, err := cs.Resource(nodesResource).Namespace("").List(context.Background(), metav1.ListOptions{}) if err != nil { logger.Error(err, "failed to get nodes") } else { nodes, err := mapper.ToConvertUnstructuredToNode(nodesRes) if err != nil { logger.Error(err, "failed to convert unstructured to nodes") } if len(nodes) > 0 { return nodes[0].Status.NodeInfo.KubeletVersion } } return "" } // updateFluxObjectsStatusMap will update the flux object maps for the resource func updateFluxObjectsStatusMap(logger *logging.EdgeLogger, obj *unstructured.Unstructured, opType string) { if obj.GetKind() == "Kustomization" { //if kustomization deleted remove it from the status map if opType == deleteOp { delete(KustomizationMap, obj.GetName()) return } kustomization, err := kinformmapper.ToConvertUnstructuredToKustomization(obj) if err != nil { logger.Error(err, "failed to convert unstructured to kustomization") } valid, status := getKustomizationStatusInfo(kustomization) if valid { KustomizationMap[obj.GetName()] = status } return } // if bucket deleted remove it from the status map if opType == deleteOp { delete(BucketMap, obj.GetName()) return } bucket, err := kinformmapper.ToConvertUnstructuredToBucket(obj) if err != nil { logger.Error(err, "failed to convert unstructured to bucket") } valid, status := getBucketStatusInfo(bucket) if valid { BucketMap[obj.GetName()] = status } } func createClusterStatus(kustomizationMap map[string]KustomizationInfo, bucketMap map[string]BucketInfo, version string) ClusterStatus { status := metastatus.ReadyCondition bucketMessage := "" kustomizationMessage := "" for _, syncInfo := range kustomizationMap { if syncInfo.FluxStatus.Error { status = mapper.SyncingError kustomizationMessage = fmt.Sprintf("%s, %s", syncInfo.FluxStatus.StatusMessage, kustomizationMessage) } } for _, syncInfo := range bucketMap { if syncInfo.FluxStatus.Error { status = mapper.BucketError bucketMessage = fmt.Sprintf("%s, %s", syncInfo.FluxStatus.StatusMessage, bucketMessage) } } var err *SyncError err = nil if kustomizationMessage != "" { err = &SyncError{ Message: kustomizationMessage, ErrorType: "Kustomization", } } if bucketMessage != "" { message := bucketMessage if err != nil { message = fmt.Sprintf("%s, %s", message, err.Message) } err = &SyncError{ Message: message, ErrorType: "Bucket", } } return ClusterStatus{ Status: status, Buckets: BucketMap, Kustomizations: KustomizationMap, Error: err, NodeVersion: version, } } func getBucketRevision(status sourceApi.BucketStatus) string { if status.Artifact != nil { return status.Artifact.Revision } return "" } func getKustomizationStatusInfo(kustomization *kustomizeApi.Kustomization) (bool, KustomizationInfo) { valid, status := getStatusFromCondition(kustomization.Status.Conditions) if valid { status.Suspended = kustomization.Spec.Suspend status.Revision = kustomization.Status.LastAppliedRevision return true, KustomizationInfo{ Path: kustomization.Spec.Path, Source: kustomization.Spec.SourceRef.Name, FluxStatus: status, } } return false, KustomizationInfo{} } func getBucketStatusInfo(bucket *sourceApi.Bucket) (bool, BucketInfo) { valid, status := getStatusFromCondition(bucket.Status.Conditions) if valid { status.Suspended = bucket.Spec.Suspend status.Revision = getBucketRevision(bucket.Status) excludes := "" if bucket.Spec.Ignore != nil { excludes = *bucket.Spec.Ignore } return true, BucketInfo{ Excludes: excludes, BucketName: bucket.Spec.BucketName, FluxStatus: status, } } return false, BucketInfo{} } // response valid, error, message, at func getStatusFromCondition(conditions []metav1.Condition) (bool, SyncInfo) { for _, condition := range conditions { if condition.Type == metastatus.ReadyCondition { if condition.Status == metav1.ConditionTrue { return true, SyncInfo{ LastUpdated: condition.LastTransitionTime.String(), Revision: "", StatusMessage: fmt.Sprintf("%s: %s", condition.Reason, condition.Message), Error: false, } } if condition.Status == metav1.ConditionFalse { return true, SyncInfo{ LastUpdated: condition.LastTransitionTime.String(), Revision: "", StatusMessage: fmt.Sprintf("%s: %s", condition.Reason, condition.Message), Error: true, } } } } return false, SyncInfo{} } // GetAllResources uses the discovery client to get all resources and calls dynamic metrics for each one func (c *CtlfishWatcher) GetAllResources(ctx context.Context) { _, groups, err := c.dClient.ServerGroupsAndResources() //need to filter to only listable and watchable resources groups = discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"list", "watch", "get"}}, groups) if err != nil { c.logger.Error(err, "failed to get resources with discovery client") } for _, group := range groups { gv, err := schema.ParseGroupVersion(group.GroupVersion) for _, resource := range group.APIResources { gvr := gv.WithResource(resource.Name) if _, ok := watches[gvr.String()]; c.cfg.IsWatched(gvr) && !ok { if err != nil { c.logger.Error(err, "failed to parse group version", "resource", resource) } if err := c.DynamicMetricsInformation(gvr); err != nil { c.logger.Error(err, "failed to add event handlers to informer", "resource", resource) } watches[gvr.String()] = "done" } if c.cfg.IsMonitored(gvr) { list, err := c.cs.Resource(gvr).List(ctx, metav1.ListOptions{}) if err != nil { c.logger.Error(err, "failed to list resource", "resource", resource) continue } for i := range list.Items { c.processUpdate(&list.Items[i], gvr, "update") } } } } } func isFluxResource(resource schema.GroupVersionResource) bool { switch resource.Resource { case "buckets": return true case "kustomizations": return true default: return false } } // sanitizeResource sanitizes resources before they are logged by ctlfish in bigquery func sanitizeResource(item *unstructured.Unstructured) *unstructured.Unstructured { switch kind := item.GetObjectKind().GroupVersionKind().Kind; kind { case "Secret": return sanitizeSecret(item) default: return item } } // sanitizeSecret removes secret values from kubectl.kubernetes.io/last-applied-configuration // annotation field and also secret data fields for each key func sanitizeSecret(secret *unstructured.Unstructured) *unstructured.Unstructured { secretObject := secret.Object secretAnnotationsMap, foundAnnotations, err := unstructured.NestedMap(secretObject, "metadata", "annotations") if err != nil { return nil } if foundAnnotations { for key := range secretAnnotationsMap { if key == "kubectl.kubernetes.io/last-applied-configuration" { secretAnnotationsMap[key] = nil } } secretObject["metadata"] = secretAnnotationsMap secret.SetUnstructuredContent(secretObject) } secretDataMap, foundData, err := unstructured.NestedMap(secretObject, "data") if err != nil { return nil } if foundData { for key := range secretDataMap { secretDataMap[key] = nil } secretObject["data"] = secretDataMap secret.SetUnstructuredContent(secretObject) } return secret } func NewWatcher(cs dynamic.Interface, dClient discovery.DiscoveryInterface, logger *logging.EdgeLogger, factory dynamicinformer.DynamicSharedInformerFactory, cfg *option.MetricConfig, pClient *pubsub.Client) *CtlfishWatcher { return &CtlfishWatcher{ cs: cs, dClient: dClient, logger: logger, factory: factory, cfg: cfg, psClient: pClient, } }