1 package internal
2
3 import (
4 "context"
5 "fmt"
6 "log"
7 "time"
8
9 corev1 "k8s.io/api/core/v1"
10 "k8s.io/apimachinery/pkg/api/meta"
11 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12 "k8s.io/apimachinery/pkg/runtime"
13 "k8s.io/apimachinery/pkg/runtime/schema"
14 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
15 "k8s.io/client-go/dynamic"
16 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
17 "sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
18 "sigs.k8s.io/controller-runtime/pkg/client"
19
20 whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
21 "edge-infra.dev/pkg/f8n/warehouse/lift/unpack"
22 "edge-infra.dev/pkg/f8n/warehouse/oci"
23 "edge-infra.dev/pkg/f8n/warehouse/oci/layer"
24 "edge-infra.dev/pkg/f8n/warehouse/pallet"
25 "edge-infra.dev/pkg/k8s/kcli"
26 "edge-infra.dev/pkg/k8s/object"
27 "edge-infra.dev/pkg/k8s/runtime/sap"
28 "edge-infra.dev/pkg/k8s/unstructured"
29 "edge-infra.dev/pkg/lib/cli/rags"
30 "edge-infra.dev/pkg/lib/cli/sink"
31 )
32
33 type Applier struct {
34
35 kubecfg kcli.KubeConfig
36
37
38 infraKubeCtx string
39 scheme *runtime.Scheme
40 mapper meta.RESTMapper
41
42
43
44
45 Klient client.Client
46 ResourceManager *sap.ResourceManager
47
48
49
50
51 InfraKlient client.Client
52 InfraResourceManager *sap.ResourceManager
53
54
55 infraNamespace string
56 force bool
57 timeout time.Duration
58 }
59
60 func NewApplier() *Applier {
61 return &Applier{}
62 }
63
64
65
66 func (a *Applier) RegisterFlags(fs *rags.RagSet) {
67
68 a.kubecfg.RegisterFlags(fs.FlagSet())
69 fs.StringVar(&a.infraKubeCtx, "infra-context", "", "name of the kubeconfig context to use to schedule infra objects. default behavior is to apply all objects to the same K8s context")
70 fs.DurationVar(&a.timeout, "wait", time.Second*120, "how long to wait for resources to become ready")
71 fs.BoolVar(&a.force, "force", true, "force re-creation of resources via server-side apply if immutability conflicts are encountered")
72 }
73
74
75 func (a *Applier) BeforeRun(ctx context.Context, r sink.Run) (context.Context, sink.Run, error) {
76 var err error
77
78 a.scheme = createScheme()
79
80
81 a.Klient, err = a.kubecfg.Client(client.Options{Scheme: a.scheme})
82 if err != nil {
83 return ctx, r, fmt.Errorf("failed to create k8s client: %w", err)
84 }
85 a.mapper, err = a.kubecfg.Mapper()
86 if err != nil {
87 return ctx, r, fmt.Errorf("failed to instantiate REST Mapper: %w", err)
88 }
89 d, err := a.kubecfg.DynamicClient()
90 if err != nil {
91 return ctx, r, fmt.Errorf("failed to instantiate Dynamic client: %w", err)
92 }
93 a.ResourceManager = a.resourceManager(a.mapper, a.Klient, d)
94
95
96 if a.infraKubeCtx == "" {
97 a.InfraKlient = a.Klient
98
99 a.infraKubeCtx = a.kubecfg.Context
100 } else {
101
102
103 if err := a.initializeInfraKlient(); err != nil {
104 return ctx, r, fmt.Errorf("failed to create k8s infrastructure client: %w", err)
105 }
106 d, err = a.kubecfg.DynamicClient()
107 if err != nil {
108 return ctx, r, fmt.Errorf("failed to instantiate Dynamic client for infrastructure: %w", err)
109 }
110 a.mapper, err = a.kubecfg.Mapper()
111 if err != nil {
112 return ctx, r, fmt.Errorf("failed to instantiate REST Mapper for infrastructure: %w", err)
113 }
114 }
115
116 a.InfraResourceManager = a.resourceManager(a.mapper, a.InfraKlient, d)
117
118 return ctx, r, nil
119 }
120
121 func (a *Applier) Apply(ctx context.Context, artifact oci.Artifact, opts ...unpack.Option) error {
122 if a.infraNamespace != "" {
123 if err := a.InfraKlient.Create(ctx,
124 &corev1.Namespace{
125 ObjectMeta: metav1.ObjectMeta{Name: a.infraNamespace},
126 },
127 ); client.IgnoreAlreadyExists(err) != nil {
128 return fmt.Errorf("failed to create infra namespace %s: %w",
129 a.infraNamespace, err)
130 }
131 log.Println("creating infrastructure namespace", a.infraNamespace)
132 }
133
134 return unpack.Walk(artifact, func(p pallet.Pallet, layers []layer.Layer) error {
135 return a.apply(ctx, p, layers)
136 }, opts...)
137 }
138
139 func (a *Applier) apply(ctx context.Context, p pallet.Pallet, layers []layer.Layer) error {
140 if len(layers) == 0 {
141 log.Printf("skipping %s because it did not produce any manifests for the "+
142 "current unpacking options", p.Name())
143 return nil
144 }
145 log.Printf("applying %s", p.Name())
146 log.Println()
147
148 var (
149 runtime = make([]*unstructured.Unstructured, 0)
150 infra = make([]*unstructured.Unstructured, 0)
151 )
152 for _, l := range layers {
153 lo, err := l.Unstructured()
154 if err != nil {
155 return err
156 }
157 switch l.Type() {
158 case layer.Infra:
159 infra = append(infra, lo...)
160 case layer.Runtime:
161 runtime = append(runtime, lo...)
162 }
163 }
164
165
166 runtimeChanges, err := a.applyObjects(ctx, a.ResourceManager, runtime)
167 if err != nil {
168 return err
169 }
170
171 infraChanges, err := a.applyObjects(ctx, a.InfraResourceManager, infra)
172 if err != nil {
173 return err
174 }
175
176
177
178 if err := a.waitForSet(ctx, layer.Infra, a.InfraResourceManager, infraChanges); err != nil {
179 return err
180 }
181 return a.waitForSet(ctx, layer.Runtime, a.ResourceManager, runtimeChanges)
182 }
183
184 func (a *Applier) applyObjects(ctx context.Context, mgr *sap.ResourceManager, objs []*unstructured.Unstructured) (*sap.ChangeSet, error) {
185 changes := sap.NewChangeSet()
186 for _, o := range objs {
187 change, err := mgr.Apply(ctx, o, a.ApplyOpts())
188 if err != nil {
189 return nil, err
190 }
191 log.Println(change)
192 changes.Add(*change)
193 }
194 return changes, nil
195 }
196
197 func (a *Applier) waitForSet(ctx context.Context, t layer.Type, mgr *sap.ResourceManager, c *sap.ChangeSet) error {
198 if len(c.Entries) > 0 {
199 log.Println()
200 log.Printf("waiting for %s resources to become ready...\n", t)
201 if err := mgr.WaitForSet(ctx, c.ToObjMetadataSet(), a.WaitOpts()); err != nil {
202 statusDump(ctx, mgr, c)
203 return err
204 }
205 log.Println("...done")
206 log.Println()
207 }
208
209 return nil
210 }
211
212 func (a *Applier) Info(r sink.Run) {
213 kubecfg, err := a.kubecfg.RawConfig()
214 if err != nil {
215 r.Log.Error(err, "failed to load kubeconfig")
216 return
217 }
218
219 values := []any{
220 "k8s-context", kubecfg.CurrentContext,
221 }
222 if a.infraKubeCtx != "" {
223 values = append(values, "infra-k8s-context", a.infraKubeCtx)
224 }
225
226 r.Log.Info("applying", values...)
227 }
228
229 func (a *Applier) initializeInfraKlient() error {
230
231 raw, err := a.kubecfg.RawConfig()
232 if err != nil {
233 return fmt.Errorf("failed to read kubeconfig: %w", err)
234 }
235 if _, ok := raw.Contexts[a.infraKubeCtx]; !ok {
236 return fmt.Errorf("infra kube context %s does not exist", a.infraKubeCtx)
237 }
238
239
240
241 a.kubecfg.Context = a.infraKubeCtx
242 if err := a.kubecfg.SetupClientConfig(); err != nil {
243 return fmt.Errorf("failed to set up client loading config for infra client: %w", err)
244 }
245 a.InfraKlient, err = a.kubecfg.Client(client.Options{Scheme: a.scheme})
246 if err != nil {
247 return fmt.Errorf("failed to instantiate infra client: %w", err)
248 }
249 return nil
250 }
251
252
253
254 func (a *Applier) resourceManager(m meta.RESTMapper, c client.Client, d dynamic.Interface) *sap.ResourceManager {
255 return sap.NewResourceManager(
256 c,
257 watcher.NewDefaultStatusWatcher(d, m),
258 sap.Owner{Field: "lift"},
259 )
260 }
261
262 func (a *Applier) ApplyOpts() sap.ApplyOptions {
263 return sap.ApplyOptions{
264 Force: a.force,
265 WaitTimeout: a.timeout,
266 }
267 }
268
269 func (a *Applier) WaitOpts() sap.WaitOptions {
270 return sap.WaitOptions{
271 Timeout: a.timeout,
272 }
273 }
274
275 func createScheme() *runtime.Scheme {
276 scheme := runtime.NewScheme()
277 utilruntime.Must(clientgoscheme.AddToScheme(scheme))
278 utilruntime.Must(whv1.AddToScheme(scheme))
279
280 return scheme
281 }
282
283 func statusDump(ctx context.Context, mgr *sap.ResourceManager, changeSet *sap.ChangeSet) {
284 for i := range changeSet.Entries {
285 gv, err := schema.ParseGroupVersion(changeSet.Entries[i].GroupVersion)
286 if err != nil {
287 continue
288 }
289 groupVersion := schema.GroupVersion{
290 Group: changeSet.Entries[i].ObjMetadata.GroupKind.Group,
291 Version: gv.Version,
292 }
293 if err != nil {
294 continue
295 }
296 kind := changeSet.Entries[i].ObjMetadata.GroupKind.Kind
297 name := changeSet.Entries[i].ObjMetadata.Name
298 namespace := changeSet.Entries[i].ObjMetadata.Namespace
299 obj := unstructured.New(groupVersion, kind, namespace, name)
300
301 err = mgr.Client().Get(ctx, client.ObjectKeyFromObject(obj), obj)
302 if err != nil {
303 log.Print(err)
304 continue
305 }
306 status := object.GetConditions(obj)
307 for _, s := range status {
308 if s.Status != "True" {
309 log.Printf("Object %s Status:%s Reason:%s Message:%s",
310 obj.GetName(), s.Status, s.Reason, s.Message)
311 }
312 }
313 }
314 }
315
View as plain text