1 package edgeagent
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "strconv"
8 "time"
9
10 "cloud.google.com/go/pubsub"
11
12 "github.com/fluxcd/pkg/apis/meta"
13 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14 "k8s.io/apimachinery/pkg/runtime/schema"
15 "k8s.io/apimachinery/pkg/types"
16 "k8s.io/client-go/rest"
17 "k8s.io/client-go/util/retry"
18 "sigs.k8s.io/controller-runtime/pkg/client"
19
20 helmApi "github.com/fluxcd/helm-controller/api/v2"
21 kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1"
22 kubevirtApi "kubevirt.io/api/core/v1"
23 "kubevirt.io/client-go/kubecli"
24
25 "edge-infra.dev/pkg/edge/edgeagent/audit"
26 "edge-infra.dev/pkg/edge/edgeagent/model"
27 )
28
29 type EdgeAgent struct {
30 ClusterEdgeID string
31 Client client.Client
32 }
33
34 func (ea *EdgeAgent) HandleMsg(ctx context.Context, msg *pubsub.Message) error {
35 var eventHandlingErrors error
36 notification, err := model.FromPubSubMessage(msg)
37 if err != nil {
38 auditLog := audit.New(model.EdgeAgentTopicAndOwner)
39 auditLog.Log(model.ErrValidation, "requestID", msg.ID, "error", err)
40 msg.Ack()
41 return err
42 }
43
44 auditLog := audit.New(notification.Actor)
45
46 for _, event := range notification.Events {
47 if err := ea.handleRequest(ctx, event); err != nil {
48 auditLog.Log(event.Type, "actionBy", notification.Actor, "requestID", msg.ID, "clusterEdgeID", notification.ClusterEdgeID, "error", err)
49 eventHandlingErrors = errors.Join(eventHandlingErrors, err)
50 } else {
51 auditLog.Log(event.Type, "actionBy", notification.Actor, "requestID", msg.ID, "clusterEdgeID", notification.ClusterEdgeID, "kind", event.Kind, "name", event.Name)
52 }
53 }
54 msg.Ack()
55 return eventHandlingErrors
56 }
57 func (ea *EdgeAgent) handleRequest(ctx context.Context, event model.Event) error {
58 var gvk schema.GroupVersionKind
59 switch event.Kind {
60 case helmApi.HelmReleaseKind:
61 gvk = helmApi.GroupVersion.WithKind(event.Kind)
62 case kustomizeApi.KustomizationKind:
63 gvk = kustomizeApi.GroupVersion.WithKind(event.Kind)
64 }
65 nsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name}
66
67 return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
68
69 ts := time.Now().Format(time.RFC3339Nano)
70 switch event.Type {
71 case model.EventTypeReconcile:
72 return ea.handleReconcile(ctx, &gvk, &nsName, ts)
73 case model.EventTypeLiveMigration:
74 return ea.handleLiveMigration(ctx, &nsName)
75 case model.EventTypeStart, model.EventTypeStop:
76 return ea.handleStartStopRestart(ctx, &nsName, event.Type)
77 }
78 return nil
79 })
80 }
81
82 func (ea *EdgeAgent) handleReconcile(ctx context.Context, gvk *schema.GroupVersionKind, nsName *types.NamespacedName, ts string) error {
83 object := &metav1.PartialObjectMetadata{}
84 object.SetGroupVersionKind(*gvk)
85 object.SetName(nsName.Name)
86 object.SetNamespace(nsName.Namespace)
87 if err := ea.Client.Get(ctx, *nsName, object); err != nil {
88 return err
89 }
90 patch := client.MergeFrom(object.DeepCopy())
91 annotations := object.GetAnnotations()
92 if annotations == nil {
93 annotations = make(map[string]string, 1)
94 }
95 annotations[meta.ReconcileRequestAnnotation] = ts
96
97
98 if gvk.Kind == helmApi.HelmReleaseKind {
99 annotations[helmApi.ForceRequestAnnotation] = ts
100 }
101 object.SetAnnotations(annotations)
102 return ea.Client.Patch(ctx, object, patch, client.FieldOwner(model.EdgeAgentTopicAndOwner))
103 }
104
105 func (ea *EdgeAgent) handleLiveMigration(ctx context.Context, nsName *types.NamespacedName) error {
106
107 liveMigration := &kubevirtApi.VirtualMachineInstanceMigration{
108 ObjectMeta: metav1.ObjectMeta{
109 Name: fmt.Sprintf("%s-%s", nsName.Name, strconv.FormatInt(time.Now().Unix(), 10)),
110 Namespace: nsName.Namespace,
111 },
112 Spec: kubevirtApi.VirtualMachineInstanceMigrationSpec{
113 VMIName: nsName.Name,
114 },
115 }
116 return ea.Client.Create(ctx, liveMigration, client.FieldOwner(model.EdgeAgentTopicAndOwner))
117 }
118
119 func (ea *EdgeAgent) handleStartStopRestart(ctx context.Context, nsName *types.NamespacedName, eventType string) error {
120 config, err := rest.InClusterConfig()
121 if err != nil && !errors.Is(err, rest.ErrNotInCluster) {
122 return err
123 }
124
125 virtClient, err := kubecli.GetKubevirtClientFromRESTConfig(config)
126 if err != nil {
127 return err
128 }
129 switch eventType {
130 case model.EventTypeStart:
131 return virtClient.VirtualMachine(nsName.Namespace).Start(ctx, nsName.Name, &kubevirtApi.StartOptions{})
132 case model.EventTypeStop:
133 return virtClient.VirtualMachine(nsName.Namespace).Stop(ctx, nsName.Name, &kubevirtApi.StopOptions{})
134 case model.EventTypeRestart:
135 return virtClient.VirtualMachine(nsName.Namespace).Restart(ctx, nsName.Name, &kubevirtApi.RestartOptions{})
136 }
137 return nil
138 }
139
View as plain text