...

Source file src/edge-infra.dev/pkg/edge/ctlfish/monitor/dynamic.go

Documentation: edge-infra.dev/pkg/edge/ctlfish/monitor

     1  package monitor
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"sync"
     8  	"time"
     9  
    10  	pubsub "cloud.google.com/go/pubsub"
    11  	kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1beta2"
    12  	sourceApi "github.com/fluxcd/source-controller/api/v1beta2"
    13  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    14  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    15  	"k8s.io/apimachinery/pkg/runtime"
    16  	"k8s.io/apimachinery/pkg/runtime/schema"
    17  	"k8s.io/client-go/discovery"
    18  	"k8s.io/client-go/dynamic"
    19  	"k8s.io/client-go/dynamic/dynamicinformer"
    20  	"k8s.io/client-go/tools/cache"
    21  
    22  	"edge-infra.dev/pkg/edge/api/graph/mapper"
    23  	"edge-infra.dev/pkg/edge/ctlfish"
    24  	"edge-infra.dev/pkg/edge/ctlfish/metrics"
    25  	"edge-infra.dev/pkg/edge/ctlfish/option"
    26  	kinformmapper "edge-infra.dev/pkg/f8n/kinform/mapper"
    27  	metastatus "edge-infra.dev/pkg/k8s/meta/status"
    28  	"edge-infra.dev/pkg/lib/logging"
    29  )
    30  
    31  var watches = map[string]string{}
    32  
    33  type ClusterStatus struct {
    34  	Status         string //(options: Ready, Syncing Error, Bucket Error)
    35  	Buckets        map[string]BucketInfo
    36  	Kustomizations map[string]KustomizationInfo
    37  	Error          *SyncError
    38  	NodeVersion    string
    39  }
    40  
    41  type KustomizationInfo struct {
    42  	Path       string
    43  	Source     string
    44  	FluxStatus SyncInfo
    45  }
    46  
    47  type BucketInfo struct {
    48  	Excludes   string
    49  	BucketName string
    50  	FluxStatus SyncInfo
    51  }
    52  
    53  type SyncInfo struct {
    54  	LastUpdated   string
    55  	Revision      string
    56  	StatusMessage string
    57  	Error         bool
    58  	Suspended     bool
    59  }
    60  
    61  type SyncError struct {
    62  	Message   string //concatenated error message
    63  	ErrorType string //(Bucket, Kustomization, both)
    64  }
    65  
    66  var KustomizationMap = map[string]KustomizationInfo{}
    67  var BucketMap = map[string]BucketInfo{}
    68  var clusterStatusMutex = sync.RWMutex{}
    69  
    70  const (
    71  	createOp          = "create"
    72  	updateOp          = "update"
    73  	deleteOp          = "delete"
    74  	ctlfishTopic      = "ctlfish-pubsub"
    75  	bqTimestampFormat = "2006-01-02 15:04:05.999999"
    76  )
    77  
    78  type CtlfishWatcher struct {
    79  	cs       dynamic.Interface
    80  	dClient  discovery.DiscoveryInterface
    81  	logger   *logging.EdgeLogger
    82  	factory  dynamicinformer.DynamicSharedInformerFactory
    83  	cfg      *option.MetricConfig
    84  	psClient *pubsub.Client
    85  }
    86  
    87  // DynamicMetricsInformation takes in a kubernetes dynamic client, a struct of the api group, the version, and the resource type,
    88  // and a struct of the resource type.  It then queries the Kube API using the gvr struct and turns the unstructured object that gets returned
    89  // into its actual type using the struct type.  That object then gets json marshalled and printed to std out
    90  func (c *CtlfishWatcher) DynamicMetricsInformation(resource schema.GroupVersionResource) error {
    91  	informer := c.factory.ForResource(resource).Informer()
    92  
    93  	if c.cfg.IsWatched(resource) {
    94  		_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    95  			AddFunc: func(obj interface{}) {
    96  				c.processUpdate(obj, resource, createOp)
    97  			},
    98  			UpdateFunc: func(_, newObj interface{}) {
    99  				c.processUpdate(newObj, resource, updateOp)
   100  			},
   101  			DeleteFunc: func(obj interface{}) {
   102  				c.processUpdate(obj, resource, deleteOp)
   103  			},
   104  		})
   105  		return err
   106  	}
   107  
   108  	_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
   109  		DeleteFunc: func(obj interface{}) {
   110  			c.processUpdate(obj, resource, deleteOp)
   111  		},
   112  	})
   113  	return err
   114  }
   115  
   116  func (c *CtlfishWatcher) processUpdate(item interface{}, resource schema.GroupVersionResource, opType string) {
   117  	castItem := item.(*unstructured.Unstructured)
   118  	castItem = sanitizeResource(castItem)
   119  	jsonPrint, err := castItem.MarshalJSON()
   120  	if err != nil {
   121  		c.logger.Error(err, "Failed to Marshal the Resource", "gvr", resource)
   122  	}
   123  	if castItem.GetAPIVersion() == "" {
   124  		c.logger.Info("Missing APIVersion, skipping log...") //This was .Warn before find alternative
   125  	} else {
   126  		gvk := castItem.GetObjectKind().GroupVersionKind()
   127  		c.sendPubSubMessage(c.logger, gvk, castItem.GetName(), castItem.GetNamespace(), opType, jsonPrint)
   128  		c.handleFluxResource(resource, castItem, opType)
   129  		switch opType {
   130  		case "create":
   131  			metrics.CtlfishResourceCreations.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Inc()
   132  		case "update":
   133  			metrics.CtlfishResourceUpdates.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Inc()
   134  		default:
   135  			metrics.CtlfishResourceDeletions.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind).Inc()
   136  		}
   137  	}
   138  }
   139  
   140  func (c *CtlfishWatcher) handleFluxResource(resource schema.GroupVersionResource, castItem *unstructured.Unstructured, opType string) {
   141  	if isFluxResource(resource) {
   142  		clusterStatusMutex.Lock()
   143  		//update flux status from map
   144  		updateFluxObjectsStatusMap(c.logger, castItem, opType)
   145  
   146  		kubeVersion := getKubeVersionFromNodes(c.cs, c.logger)
   147  		//create the cluster status object
   148  		res := createClusterStatus(KustomizationMap, BucketMap, kubeVersion)
   149  
   150  		converted, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&res)
   151  		if err != nil {
   152  			c.logger.Error(err, "failed to convert clusterStatus to unstructured")
   153  		}
   154  		data := &unstructured.Unstructured{
   155  			Object: converted,
   156  		}
   157  
   158  		jsonPrint, err := data.MarshalJSON()
   159  		if err != nil {
   160  			c.logger.Error(err, "failed to marshal the resource", "clusterStatus", data)
   161  		}
   162  
   163  		c.sendPubSubMessage(c.logger, schema.GroupVersionKind{Group: ctlfish.ClusterStatusGroup, Version: ctlfish.ClusterStatusVersion, Kind: ctlfish.ClusterStatusKind}, ctlfish.ClusterStatusName, ctlfish.ClusterStatusNamespace, ctlfish.ClusterStatusOperation, jsonPrint)
   164  		clusterStatusMutex.Unlock()
   165  	}
   166  }
   167  
   168  func (c *CtlfishWatcher) sendPubSubMessage(logger *logging.EdgeLogger, gvk schema.GroupVersionKind, name, namespace, opType string, jsonPrint []byte) {
   169  	if !c.cfg.PubSubActive {
   170  		return
   171  	}
   172  	attys := c.createMap(gvk, name, namespace, opType)
   173  	message := c.createMap(gvk, name, namespace, opType)
   174  	message["resource"] = string(jsonPrint)
   175  	mdata, err := json.Marshal(message)
   176  	if err != nil {
   177  		logger.Error(err, "failed to marshall message", "data", message)
   178  	}
   179  	ressy := c.psClient.Topic(ctlfishTopic).Publish(context.Background(), &pubsub.Message{
   180  		Data:       mdata,
   181  		Attributes: attys,
   182  	})
   183  	_, err = ressy.Get(context.Background())
   184  	if err != nil {
   185  		logger.Error(err, "failed to send message", "data", mdata)
   186  	}
   187  }
   188  
   189  func (c *CtlfishWatcher) createMap(gvk schema.GroupVersionKind, name, namespace, opType string) map[string]string {
   190  	return map[string]string{
   191  		"timestamp":       time.Now().Format(bqTimestampFormat),
   192  		"cluster_name":    c.cfg.ClusterName,
   193  		"project_id":      c.cfg.ProjectID,
   194  		"group":           gvk.Group,
   195  		"version":         gvk.Version,
   196  		"kind":            gvk.Kind,
   197  		"name":            name,
   198  		"namespace":       namespace,
   199  		"operation":       opType,
   200  		"cluster_edge_id": c.cfg.ClusterEdgeID}
   201  }
   202  
   203  func getKubeVersionFromNodes(cs dynamic.Interface, logger *logging.EdgeLogger) string {
   204  	nodesResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "nodes"}
   205  	nodesRes, err := cs.Resource(nodesResource).Namespace("").List(context.Background(), metav1.ListOptions{})
   206  	if err != nil {
   207  		logger.Error(err, "failed to get nodes")
   208  	} else {
   209  		nodes, err := mapper.ToConvertUnstructuredToNode(nodesRes)
   210  		if err != nil {
   211  			logger.Error(err, "failed to convert unstructured to nodes")
   212  		}
   213  		if len(nodes) > 0 {
   214  			return nodes[0].Status.NodeInfo.KubeletVersion
   215  		}
   216  	}
   217  	return ""
   218  }
   219  
   220  // updateFluxObjectsStatusMap will update the flux object maps for the resource
   221  func updateFluxObjectsStatusMap(logger *logging.EdgeLogger, obj *unstructured.Unstructured, opType string) {
   222  	if obj.GetKind() == "Kustomization" {
   223  		//if kustomization deleted remove it from the status map
   224  		if opType == deleteOp {
   225  			delete(KustomizationMap, obj.GetName())
   226  			return
   227  		}
   228  		kustomization, err := kinformmapper.ToConvertUnstructuredToKustomization(obj)
   229  		if err != nil {
   230  			logger.Error(err, "failed to convert unstructured to kustomization")
   231  		}
   232  		valid, status := getKustomizationStatusInfo(kustomization)
   233  		if valid {
   234  			KustomizationMap[obj.GetName()] = status
   235  		}
   236  		return
   237  	}
   238  	// if bucket deleted remove it from the status map
   239  	if opType == deleteOp {
   240  		delete(BucketMap, obj.GetName())
   241  		return
   242  	}
   243  	bucket, err := kinformmapper.ToConvertUnstructuredToBucket(obj)
   244  	if err != nil {
   245  		logger.Error(err, "failed to convert unstructured to bucket")
   246  	}
   247  	valid, status := getBucketStatusInfo(bucket)
   248  	if valid {
   249  		BucketMap[obj.GetName()] = status
   250  	}
   251  }
   252  
   253  func createClusterStatus(kustomizationMap map[string]KustomizationInfo, bucketMap map[string]BucketInfo, version string) ClusterStatus {
   254  	status := metastatus.ReadyCondition
   255  	bucketMessage := ""
   256  	kustomizationMessage := ""
   257  	for _, syncInfo := range kustomizationMap {
   258  		if syncInfo.FluxStatus.Error {
   259  			status = mapper.SyncingError
   260  			kustomizationMessage = fmt.Sprintf("%s, %s", syncInfo.FluxStatus.StatusMessage, kustomizationMessage)
   261  		}
   262  	}
   263  	for _, syncInfo := range bucketMap {
   264  		if syncInfo.FluxStatus.Error {
   265  			status = mapper.BucketError
   266  			bucketMessage = fmt.Sprintf("%s, %s", syncInfo.FluxStatus.StatusMessage, bucketMessage)
   267  		}
   268  	}
   269  	var err *SyncError
   270  	err = nil
   271  	if kustomizationMessage != "" {
   272  		err = &SyncError{
   273  			Message:   kustomizationMessage,
   274  			ErrorType: "Kustomization",
   275  		}
   276  	}
   277  	if bucketMessage != "" {
   278  		message := bucketMessage
   279  		if err != nil {
   280  			message = fmt.Sprintf("%s, %s", message, err.Message)
   281  		}
   282  		err = &SyncError{
   283  			Message:   message,
   284  			ErrorType: "Bucket",
   285  		}
   286  	}
   287  
   288  	return ClusterStatus{
   289  		Status:         status,
   290  		Buckets:        BucketMap,
   291  		Kustomizations: KustomizationMap,
   292  		Error:          err,
   293  		NodeVersion:    version,
   294  	}
   295  }
   296  
   297  func getBucketRevision(status sourceApi.BucketStatus) string {
   298  	if status.Artifact != nil {
   299  		return status.Artifact.Revision
   300  	}
   301  	return ""
   302  }
   303  
   304  func getKustomizationStatusInfo(kustomization *kustomizeApi.Kustomization) (bool, KustomizationInfo) {
   305  	valid, status := getStatusFromCondition(kustomization.Status.Conditions)
   306  	if valid {
   307  		status.Suspended = kustomization.Spec.Suspend
   308  		status.Revision = kustomization.Status.LastAppliedRevision
   309  		return true, KustomizationInfo{
   310  			Path:       kustomization.Spec.Path,
   311  			Source:     kustomization.Spec.SourceRef.Name,
   312  			FluxStatus: status,
   313  		}
   314  	}
   315  	return false, KustomizationInfo{}
   316  }
   317  
   318  func getBucketStatusInfo(bucket *sourceApi.Bucket) (bool, BucketInfo) {
   319  	valid, status := getStatusFromCondition(bucket.Status.Conditions)
   320  	if valid {
   321  		status.Suspended = bucket.Spec.Suspend
   322  		status.Revision = getBucketRevision(bucket.Status)
   323  		excludes := ""
   324  		if bucket.Spec.Ignore != nil {
   325  			excludes = *bucket.Spec.Ignore
   326  		}
   327  		return true, BucketInfo{
   328  			Excludes:   excludes,
   329  			BucketName: bucket.Spec.BucketName,
   330  			FluxStatus: status,
   331  		}
   332  	}
   333  	return false, BucketInfo{}
   334  }
   335  
   336  // response valid, error, message, at
   337  func getStatusFromCondition(conditions []metav1.Condition) (bool, SyncInfo) {
   338  	for _, condition := range conditions {
   339  		if condition.Type == metastatus.ReadyCondition {
   340  			if condition.Status == metav1.ConditionTrue {
   341  				return true, SyncInfo{
   342  					LastUpdated:   condition.LastTransitionTime.String(),
   343  					Revision:      "",
   344  					StatusMessage: fmt.Sprintf("%s: %s", condition.Reason, condition.Message),
   345  					Error:         false,
   346  				}
   347  			}
   348  			if condition.Status == metav1.ConditionFalse {
   349  				return true, SyncInfo{
   350  					LastUpdated:   condition.LastTransitionTime.String(),
   351  					Revision:      "",
   352  					StatusMessage: fmt.Sprintf("%s: %s", condition.Reason, condition.Message),
   353  					Error:         true,
   354  				}
   355  			}
   356  		}
   357  	}
   358  	return false, SyncInfo{}
   359  }
   360  
   361  // GetAllResources uses the discovery client to get all resources and calls dynamic metrics for each one
   362  func (c *CtlfishWatcher) GetAllResources(ctx context.Context) {
   363  	_, groups, err := c.dClient.ServerGroupsAndResources()
   364  	//need to filter to only listable and watchable resources
   365  	groups = discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"list", "watch", "get"}}, groups)
   366  	if err != nil {
   367  		c.logger.Error(err, "failed to get resources with discovery client")
   368  	}
   369  
   370  	for _, group := range groups {
   371  		gv, err := schema.ParseGroupVersion(group.GroupVersion)
   372  		for _, resource := range group.APIResources {
   373  			gvr := gv.WithResource(resource.Name)
   374  			if _, ok := watches[gvr.String()]; c.cfg.IsWatched(gvr) && !ok {
   375  				if err != nil {
   376  					c.logger.Error(err, "failed to parse group version", "resource", resource)
   377  				}
   378  				if err := c.DynamicMetricsInformation(gvr); err != nil {
   379  					c.logger.Error(err, "failed to add event handlers to informer", "resource", resource)
   380  				}
   381  				watches[gvr.String()] = "done"
   382  			}
   383  
   384  			if c.cfg.IsMonitored(gvr) {
   385  				list, err := c.cs.Resource(gvr).List(ctx, metav1.ListOptions{})
   386  				if err != nil {
   387  					c.logger.Error(err, "failed to list resource", "resource", resource)
   388  					continue
   389  				}
   390  				for i := range list.Items {
   391  					c.processUpdate(&list.Items[i], gvr, "update")
   392  				}
   393  			}
   394  		}
   395  	}
   396  }
   397  
   398  func isFluxResource(resource schema.GroupVersionResource) bool {
   399  	switch resource.Resource {
   400  	case "buckets":
   401  		return true
   402  	case "kustomizations":
   403  		return true
   404  	default:
   405  		return false
   406  	}
   407  }
   408  
   409  // sanitizeResource sanitizes resources before they are logged by ctlfish in bigquery
   410  func sanitizeResource(item *unstructured.Unstructured) *unstructured.Unstructured {
   411  	switch kind := item.GetObjectKind().GroupVersionKind().Kind; kind {
   412  	case "Secret":
   413  		return sanitizeSecret(item)
   414  	default:
   415  		return item
   416  	}
   417  }
   418  
   419  // sanitizeSecret removes secret values from kubectl.kubernetes.io/last-applied-configuration
   420  // annotation field and also secret data fields for each key
   421  func sanitizeSecret(secret *unstructured.Unstructured) *unstructured.Unstructured {
   422  	secretObject := secret.Object
   423  	secretAnnotationsMap, foundAnnotations, err := unstructured.NestedMap(secretObject, "metadata", "annotations")
   424  	if err != nil {
   425  		return nil
   426  	}
   427  	if foundAnnotations {
   428  		for key := range secretAnnotationsMap {
   429  			if key == "kubectl.kubernetes.io/last-applied-configuration" {
   430  				secretAnnotationsMap[key] = nil
   431  			}
   432  		}
   433  		secretObject["metadata"] = secretAnnotationsMap
   434  		secret.SetUnstructuredContent(secretObject)
   435  	}
   436  
   437  	secretDataMap, foundData, err := unstructured.NestedMap(secretObject, "data")
   438  	if err != nil {
   439  		return nil
   440  	}
   441  	if foundData {
   442  		for key := range secretDataMap {
   443  			secretDataMap[key] = nil
   444  		}
   445  		secretObject["data"] = secretDataMap
   446  		secret.SetUnstructuredContent(secretObject)
   447  	}
   448  	return secret
   449  }
   450  
   451  func NewWatcher(cs dynamic.Interface, dClient discovery.DiscoveryInterface, logger *logging.EdgeLogger, factory dynamicinformer.DynamicSharedInformerFactory, cfg *option.MetricConfig, pClient *pubsub.Client) *CtlfishWatcher {
   452  	return &CtlfishWatcher{
   453  		cs:       cs,
   454  		dClient:  dClient,
   455  		logger:   logger,
   456  		factory:  factory,
   457  		cfg:      cfg,
   458  		psClient: pClient,
   459  	}
   460  }
   461  

View as plain text