...

Source file src/edge-infra.dev/pkg/edge/edgeagent/edgeagent.go

Documentation: edge-infra.dev/pkg/edge/edgeagent

     1  package edgeagent
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"strconv"
     8  	"time"
     9  
    10  	"cloud.google.com/go/pubsub"
    11  
    12  	"github.com/fluxcd/pkg/apis/meta"
    13  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    14  	"k8s.io/apimachinery/pkg/runtime/schema"
    15  	"k8s.io/apimachinery/pkg/types"
    16  	"k8s.io/client-go/rest"
    17  	"k8s.io/client-go/util/retry"
    18  	"sigs.k8s.io/controller-runtime/pkg/client"
    19  
    20  	helmApi "github.com/fluxcd/helm-controller/api/v2"
    21  	kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1"
    22  	kubevirtApi "kubevirt.io/api/core/v1"
    23  	"kubevirt.io/client-go/kubecli"
    24  
    25  	"edge-infra.dev/pkg/edge/edgeagent/audit"
    26  	"edge-infra.dev/pkg/edge/edgeagent/model"
    27  )
    28  
    29  type EdgeAgent struct {
    30  	ClusterEdgeID string
    31  	Client        client.Client
    32  }
    33  
    34  func (ea *EdgeAgent) HandleMsg(ctx context.Context, msg *pubsub.Message) error {
    35  	var eventHandlingErrors error
    36  	notification, err := model.FromPubSubMessage(msg)
    37  	if err != nil {
    38  		auditLog := audit.New(model.EdgeAgentTopicAndOwner)
    39  		auditLog.Log(model.ErrValidation, "requestID", msg.ID, "error", err)
    40  		msg.Ack()
    41  		return err
    42  	}
    43  
    44  	auditLog := audit.New(notification.Actor)
    45  
    46  	for _, event := range notification.Events {
    47  		if err := ea.handleRequest(ctx, event); err != nil {
    48  			auditLog.Log(event.Type, "actionBy", notification.Actor, "requestID", msg.ID, "clusterEdgeID", notification.ClusterEdgeID, "error", err)
    49  			eventHandlingErrors = errors.Join(eventHandlingErrors, err)
    50  		} else {
    51  			auditLog.Log(event.Type, "actionBy", notification.Actor, "requestID", msg.ID, "clusterEdgeID", notification.ClusterEdgeID, "kind", event.Kind, "name", event.Name)
    52  		}
    53  	}
    54  	msg.Ack()
    55  	return eventHandlingErrors
    56  }
    57  func (ea *EdgeAgent) handleRequest(ctx context.Context, event model.Event) error {
    58  	var gvk schema.GroupVersionKind
    59  	switch event.Kind {
    60  	case helmApi.HelmReleaseKind:
    61  		gvk = helmApi.GroupVersion.WithKind(event.Kind)
    62  	case kustomizeApi.KustomizationKind:
    63  		gvk = kustomizeApi.GroupVersion.WithKind(event.Kind)
    64  	}
    65  	nsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name}
    66  
    67  	return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
    68  		// timestamp used for triggering reconcile and live migration events
    69  		ts := time.Now().Format(time.RFC3339Nano)
    70  		switch event.Type {
    71  		case model.EventTypeReconcile:
    72  			return ea.handleReconcile(ctx, &gvk, &nsName, ts)
    73  		case model.EventTypeLiveMigration:
    74  			return ea.handleLiveMigration(ctx, &nsName)
    75  		case model.EventTypeStart, model.EventTypeStop:
    76  			return ea.handleStartStopRestart(ctx, &nsName, event.Type)
    77  		}
    78  		return nil
    79  	})
    80  }
    81  
    82  func (ea *EdgeAgent) handleReconcile(ctx context.Context, gvk *schema.GroupVersionKind, nsName *types.NamespacedName, ts string) error {
    83  	object := &metav1.PartialObjectMetadata{}
    84  	object.SetGroupVersionKind(*gvk)
    85  	object.SetName(nsName.Name)
    86  	object.SetNamespace(nsName.Namespace)
    87  	if err := ea.Client.Get(ctx, *nsName, object); err != nil {
    88  		return err
    89  	}
    90  	patch := client.MergeFrom(object.DeepCopy())
    91  	annotations := object.GetAnnotations()
    92  	if annotations == nil {
    93  		annotations = make(map[string]string, 1)
    94  	}
    95  	annotations[meta.ReconcileRequestAnnotation] = ts
    96  
    97  	// HelmRelease specific annotations to force a release
    98  	if gvk.Kind == helmApi.HelmReleaseKind {
    99  		annotations[helmApi.ForceRequestAnnotation] = ts
   100  	}
   101  	object.SetAnnotations(annotations)
   102  	return ea.Client.Patch(ctx, object, patch, client.FieldOwner(model.EdgeAgentTopicAndOwner))
   103  }
   104  
   105  func (ea *EdgeAgent) handleLiveMigration(ctx context.Context, nsName *types.NamespacedName) error {
   106  	// create live migration cr
   107  	liveMigration := &kubevirtApi.VirtualMachineInstanceMigration{
   108  		ObjectMeta: metav1.ObjectMeta{
   109  			Name:      fmt.Sprintf("%s-%s", nsName.Name, strconv.FormatInt(time.Now().Unix(), 10)),
   110  			Namespace: nsName.Namespace,
   111  		},
   112  		Spec: kubevirtApi.VirtualMachineInstanceMigrationSpec{
   113  			VMIName: nsName.Name,
   114  		},
   115  	}
   116  	return ea.Client.Create(ctx, liveMigration, client.FieldOwner(model.EdgeAgentTopicAndOwner))
   117  }
   118  
   119  func (ea *EdgeAgent) handleStartStopRestart(ctx context.Context, nsName *types.NamespacedName, eventType string) error {
   120  	config, err := rest.InClusterConfig()
   121  	if err != nil && !errors.Is(err, rest.ErrNotInCluster) {
   122  		return err
   123  	}
   124  
   125  	virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(config)
   126  	if err != nil {
   127  		return err
   128  	}
   129  	switch eventType {
   130  	case model.EventTypeStart:
   131  		return virtClient.VirtualMachine(nsName.Namespace).Start(ctx, nsName.Name, &kubevirtApi.StartOptions{})
   132  	case model.EventTypeStop:
   133  		return virtClient.VirtualMachine(nsName.Namespace).Stop(ctx, nsName.Name, &kubevirtApi.StopOptions{})
   134  	case model.EventTypeRestart:
   135  		return virtClient.VirtualMachine(nsName.Namespace).Restart(ctx, nsName.Name, &kubevirtApi.RestartOptions{})
   136  	}
   137  	return nil
   138  }
   139  

View as plain text