package edgeagent import ( "context" "errors" "fmt" "strconv" "time" "cloud.google.com/go/pubsub" "github.com/fluxcd/pkg/apis/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" helmApi "github.com/fluxcd/helm-controller/api/v2" kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1" kubevirtApi "kubevirt.io/api/core/v1" "kubevirt.io/client-go/kubecli" "edge-infra.dev/pkg/edge/edgeagent/audit" "edge-infra.dev/pkg/edge/edgeagent/model" ) type EdgeAgent struct { ClusterEdgeID string Client client.Client } func (ea *EdgeAgent) HandleMsg(ctx context.Context, msg *pubsub.Message) error { var eventHandlingErrors error notification, err := model.FromPubSubMessage(msg) if err != nil { auditLog := audit.New(model.EdgeAgentTopicAndOwner) auditLog.Log(model.ErrValidation, "requestID", msg.ID, "error", err) msg.Ack() return err } auditLog := audit.New(notification.Actor) for _, event := range notification.Events { if err := ea.handleRequest(ctx, event); err != nil { auditLog.Log(event.Type, "actionBy", notification.Actor, "requestID", msg.ID, "clusterEdgeID", notification.ClusterEdgeID, "error", err) eventHandlingErrors = errors.Join(eventHandlingErrors, err) } else { auditLog.Log(event.Type, "actionBy", notification.Actor, "requestID", msg.ID, "clusterEdgeID", notification.ClusterEdgeID, "kind", event.Kind, "name", event.Name) } } msg.Ack() return eventHandlingErrors } func (ea *EdgeAgent) handleRequest(ctx context.Context, event model.Event) error { var gvk schema.GroupVersionKind switch event.Kind { case helmApi.HelmReleaseKind: gvk = helmApi.GroupVersion.WithKind(event.Kind) case kustomizeApi.KustomizationKind: gvk = kustomizeApi.GroupVersion.WithKind(event.Kind) } nsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name} return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { // timestamp used for triggering reconcile and live migration events ts := time.Now().Format(time.RFC3339Nano) switch event.Type { case model.EventTypeReconcile: return ea.handleReconcile(ctx, &gvk, &nsName, ts) case model.EventTypeLiveMigration: return ea.handleLiveMigration(ctx, &nsName) case model.EventTypeStart, model.EventTypeStop: return ea.handleStartStopRestart(ctx, &nsName, event.Type) } return nil }) } func (ea *EdgeAgent) handleReconcile(ctx context.Context, gvk *schema.GroupVersionKind, nsName *types.NamespacedName, ts string) error { object := &metav1.PartialObjectMetadata{} object.SetGroupVersionKind(*gvk) object.SetName(nsName.Name) object.SetNamespace(nsName.Namespace) if err := ea.Client.Get(ctx, *nsName, object); err != nil { return err } patch := client.MergeFrom(object.DeepCopy()) annotations := object.GetAnnotations() if annotations == nil { annotations = make(map[string]string, 1) } annotations[meta.ReconcileRequestAnnotation] = ts // HelmRelease specific annotations to force a release if gvk.Kind == helmApi.HelmReleaseKind { annotations[helmApi.ForceRequestAnnotation] = ts } object.SetAnnotations(annotations) return ea.Client.Patch(ctx, object, patch, client.FieldOwner(model.EdgeAgentTopicAndOwner)) } func (ea *EdgeAgent) handleLiveMigration(ctx context.Context, nsName *types.NamespacedName) error { // create live migration cr liveMigration := &kubevirtApi.VirtualMachineInstanceMigration{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%s", nsName.Name, strconv.FormatInt(time.Now().Unix(), 10)), Namespace: nsName.Namespace, }, Spec: kubevirtApi.VirtualMachineInstanceMigrationSpec{ VMIName: nsName.Name, }, } return ea.Client.Create(ctx, liveMigration, client.FieldOwner(model.EdgeAgentTopicAndOwner)) } func (ea *EdgeAgent) handleStartStopRestart(ctx context.Context, nsName *types.NamespacedName, eventType string) error { config, err := rest.InClusterConfig() if err != nil && !errors.Is(err, rest.ErrNotInCluster) { return err } virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(config) if err != nil { return err } switch eventType { case model.EventTypeStart: return virtClient.VirtualMachine(nsName.Namespace).Start(ctx, nsName.Name, &kubevirtApi.StartOptions{}) case model.EventTypeStop: return virtClient.VirtualMachine(nsName.Namespace).Stop(ctx, nsName.Name, &kubevirtApi.StopOptions{}) case model.EventTypeRestart: return virtClient.VirtualMachine(nsName.Namespace).Restart(ctx, nsName.Name, &kubevirtApi.RestartOptions{}) } return nil }