package internal import ( "context" "fmt" "log" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/dynamic" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" "sigs.k8s.io/controller-runtime/pkg/client" whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2" "edge-infra.dev/pkg/f8n/warehouse/lift/unpack" "edge-infra.dev/pkg/f8n/warehouse/oci" "edge-infra.dev/pkg/f8n/warehouse/oci/layer" "edge-infra.dev/pkg/f8n/warehouse/pallet" "edge-infra.dev/pkg/k8s/kcli" "edge-infra.dev/pkg/k8s/object" "edge-infra.dev/pkg/k8s/runtime/sap" "edge-infra.dev/pkg/k8s/unstructured" "edge-infra.dev/pkg/lib/cli/rags" "edge-infra.dev/pkg/lib/cli/sink" ) type Applier struct { // kubecfg contains details about which KUBECONFIG file and context to use kubecfg kcli.KubeConfig // infraKubeCtx is the Kube context within the parsed kbueconfig to use when // applying infra objects infraKubeCtx string scheme *runtime.Scheme mapper meta.RESTMapper // K8s client and inventory for working with base layers during applying of // unpacked resources, instantiated via Initialize() based on bound and parsed // configuration values. Klient client.Client ResourceManager *sap.ResourceManager // K8s client and inventory for working with infra layers during applying of // unpacked resources, instantiated via Initialize() based on bound and parsed // configuration values. InfraKlient client.Client InfraResourceManager *sap.ResourceManager // apply options infraNamespace string force bool timeout time.Duration } func NewApplier() *Applier { return &Applier{} } // RegisterFlags binds all of the flags needed for applying and managing pallets // applied via `lift` func (a *Applier) RegisterFlags(fs *rags.RagSet) { // client machinery a.kubecfg.RegisterFlags(fs.FlagSet()) fs.StringVar(&a.infraKubeCtx, "infra-context", "", "name of the kubeconfig context to use to schedule infra objects. default behavior is to apply all objects to the same K8s context") fs.DurationVar(&a.timeout, "wait", time.Second*120, "how long to wait for resources to become ready") fs.BoolVar(&a.force, "force", true, "force re-creation of resources via server-side apply if immutability conflicts are encountered") } // TODO: refactor, WithKlient alone would be preferred for some use cases func (a *Applier) BeforeRun(ctx context.Context, r sink.Run) (context.Context, sink.Run, error) { var err error a.scheme = createScheme() // set up base K8s client a.Klient, err = a.kubecfg.Client(client.Options{Scheme: a.scheme}) if err != nil { return ctx, r, fmt.Errorf("failed to create k8s client: %w", err) } a.mapper, err = a.kubecfg.Mapper() if err != nil { return ctx, r, fmt.Errorf("failed to instantiate REST Mapper: %w", err) } d, err := a.kubecfg.DynamicClient() if err != nil { return ctx, r, fmt.Errorf("failed to instantiate Dynamic client: %w", err) } a.ResourceManager = a.resourceManager(a.mapper, a.Klient, d) // by default, the same K8s client is used for both base and infra layers if a.infraKubeCtx == "" { a.InfraKlient = a.Klient // set internal state so that correct infra context is printed during usage a.infraKubeCtx = a.kubecfg.Context } else { // if a specific context for infra is provided, create an infra-specific client // and replace it if err := a.initializeInfraKlient(); err != nil { return ctx, r, fmt.Errorf("failed to create k8s infrastructure client: %w", err) } d, err = a.kubecfg.DynamicClient() if err != nil { return ctx, r, fmt.Errorf("failed to instantiate Dynamic client for infrastructure: %w", err) } a.mapper, err = a.kubecfg.Mapper() if err != nil { return ctx, r, fmt.Errorf("failed to instantiate REST Mapper for infrastructure: %w", err) } } // create resource manager based on resolved infra klient a.InfraResourceManager = a.resourceManager(a.mapper, a.InfraKlient, d) return ctx, r, nil } func (a *Applier) Apply(ctx context.Context, artifact oci.Artifact, opts ...unpack.Option) error { if a.infraNamespace != "" { if err := a.InfraKlient.Create(ctx, &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{Name: a.infraNamespace}, }, ); client.IgnoreAlreadyExists(err) != nil { return fmt.Errorf("failed to create infra namespace %s: %w", a.infraNamespace, err) } log.Println("creating infrastructure namespace", a.infraNamespace) } return unpack.Walk(artifact, func(p pallet.Pallet, layers []layer.Layer) error { return a.apply(ctx, p, layers) }, opts...) } func (a *Applier) apply(ctx context.Context, p pallet.Pallet, layers []layer.Layer) error { if len(layers) == 0 { log.Printf("skipping %s because it did not produce any manifests for the "+ "current unpacking options", p.Name()) return nil } log.Printf("applying %s", p.Name()) log.Println() var ( runtime = make([]*unstructured.Unstructured, 0) infra = make([]*unstructured.Unstructured, 0) ) for _, l := range layers { lo, err := l.Unstructured() if err != nil { return err } switch l.Type() { case layer.Infra: infra = append(infra, lo...) case layer.Runtime: runtime = append(runtime, lo...) } } // runtime first runtimeChanges, err := a.applyObjects(ctx, a.ResourceManager, runtime) if err != nil { return err } // then infra infraChanges, err := a.applyObjects(ctx, a.InfraResourceManager, infra) if err != nil { return err } // then wait in inverse order, because usually runtime resources need infra // to provision external state / cloud resources if err := a.waitForSet(ctx, layer.Infra, a.InfraResourceManager, infraChanges); err != nil { return err } return a.waitForSet(ctx, layer.Runtime, a.ResourceManager, runtimeChanges) } func (a *Applier) applyObjects(ctx context.Context, mgr *sap.ResourceManager, objs []*unstructured.Unstructured) (*sap.ChangeSet, error) { changes := sap.NewChangeSet() for _, o := range objs { change, err := mgr.Apply(ctx, o, a.ApplyOpts()) if err != nil { return nil, err } log.Println(change) changes.Add(*change) } return changes, nil } func (a *Applier) waitForSet(ctx context.Context, t layer.Type, mgr *sap.ResourceManager, c *sap.ChangeSet) error { if len(c.Entries) > 0 { log.Println() log.Printf("waiting for %s resources to become ready...\n", t) if err := mgr.WaitForSet(ctx, c.ToObjMetadataSet(), a.WaitOpts()); err != nil { statusDump(ctx, mgr, c) return err } log.Println("...done") log.Println() } return nil } func (a *Applier) Info(r sink.Run) { kubecfg, err := a.kubecfg.RawConfig() if err != nil { r.Log.Error(err, "failed to load kubeconfig") return } values := []any{ "k8s-context", kubecfg.CurrentContext, } if a.infraKubeCtx != "" { values = append(values, "infra-k8s-context", a.infraKubeCtx) } r.Log.Info("applying", values...) } func (a *Applier) initializeInfraKlient() error { // verify that infra context exists in kubeconfig raw, err := a.kubecfg.RawConfig() if err != nil { return fmt.Errorf("failed to read kubeconfig: %w", err) } if _, ok := raw.Contexts[a.infraKubeCtx]; !ok { return fmt.Errorf("infra kube context %s does not exist", a.infraKubeCtx) } // TODO(aw185176): this seems hacky, k8s/runtime/client should probably // provide some helpers for doing this more cleanly a.kubecfg.Context = a.infraKubeCtx if err := a.kubecfg.SetupClientConfig(); err != nil { return fmt.Errorf("failed to set up client loading config for infra client: %w", err) } a.InfraKlient, err = a.kubecfg.Client(client.Options{Scheme: a.scheme}) if err != nil { return fmt.Errorf("failed to instantiate infra client: %w", err) } return nil } // resourceManager creates a standard Lift CLI resource manager for server-side // apply based on the client you provide it. func (a *Applier) resourceManager(m meta.RESTMapper, c client.Client, d dynamic.Interface) *sap.ResourceManager { return sap.NewResourceManager( c, watcher.NewDefaultStatusWatcher(d, m), sap.Owner{Field: "lift"}, ) } func (a *Applier) ApplyOpts() sap.ApplyOptions { return sap.ApplyOptions{ Force: a.force, WaitTimeout: a.timeout, } } func (a *Applier) WaitOpts() sap.WaitOptions { return sap.WaitOptions{ Timeout: a.timeout, } } func createScheme() *runtime.Scheme { scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(whv1.AddToScheme(scheme)) return scheme } func statusDump(ctx context.Context, mgr *sap.ResourceManager, changeSet *sap.ChangeSet) { for i := range changeSet.Entries { gv, err := schema.ParseGroupVersion(changeSet.Entries[i].GroupVersion) if err != nil { continue } groupVersion := schema.GroupVersion{ Group: changeSet.Entries[i].ObjMetadata.GroupKind.Group, Version: gv.Version, } if err != nil { continue } kind := changeSet.Entries[i].ObjMetadata.GroupKind.Kind name := changeSet.Entries[i].ObjMetadata.Name namespace := changeSet.Entries[i].ObjMetadata.Namespace obj := unstructured.New(groupVersion, kind, namespace, name) err = mgr.Client().Get(ctx, client.ObjectKeyFromObject(obj), obj) if err != nil { log.Print(err) continue } status := object.GetConditions(obj) for _, s := range status { if s.Status != "True" { log.Printf("Object %s Status:%s Reason:%s Message:%s", obj.GetName(), s.Status, s.Reason, s.Message) } } } }