...

Source file src/edge-infra.dev/pkg/f8n/warehouse/lift/cmd/internal/applier.go

Documentation: edge-infra.dev/pkg/f8n/warehouse/lift/cmd/internal

     1  package internal
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"log"
     7  	"time"
     8  
     9  	corev1 "k8s.io/api/core/v1"
    10  	"k8s.io/apimachinery/pkg/api/meta"
    11  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    12  	"k8s.io/apimachinery/pkg/runtime"
    13  	"k8s.io/apimachinery/pkg/runtime/schema"
    14  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    15  	"k8s.io/client-go/dynamic"
    16  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    17  	"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
    18  	"sigs.k8s.io/controller-runtime/pkg/client"
    19  
    20  	whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
    21  	"edge-infra.dev/pkg/f8n/warehouse/lift/unpack"
    22  	"edge-infra.dev/pkg/f8n/warehouse/oci"
    23  	"edge-infra.dev/pkg/f8n/warehouse/oci/layer"
    24  	"edge-infra.dev/pkg/f8n/warehouse/pallet"
    25  	"edge-infra.dev/pkg/k8s/kcli"
    26  	"edge-infra.dev/pkg/k8s/object"
    27  	"edge-infra.dev/pkg/k8s/runtime/sap"
    28  	"edge-infra.dev/pkg/k8s/unstructured"
    29  	"edge-infra.dev/pkg/lib/cli/rags"
    30  	"edge-infra.dev/pkg/lib/cli/sink"
    31  )
    32  
    33  type Applier struct {
    34  	// kubecfg contains details about which KUBECONFIG file and context to use
    35  	kubecfg kcli.KubeConfig
    36  	// infraKubeCtx is the Kube context within the parsed kbueconfig to use when
    37  	// applying infra objects
    38  	infraKubeCtx string
    39  	scheme       *runtime.Scheme
    40  	mapper       meta.RESTMapper
    41  
    42  	// K8s client and inventory for working with base layers during applying of
    43  	// unpacked resources, instantiated via Initialize() based on bound and parsed
    44  	// configuration values.
    45  	Klient          client.Client
    46  	ResourceManager *sap.ResourceManager
    47  
    48  	// K8s client and inventory for working with infra layers during applying of
    49  	// unpacked resources, instantiated via Initialize() based on bound and parsed
    50  	// configuration values.
    51  	InfraKlient          client.Client
    52  	InfraResourceManager *sap.ResourceManager
    53  
    54  	// apply options
    55  	infraNamespace string
    56  	force          bool
    57  	timeout        time.Duration
    58  }
    59  
    60  func NewApplier() *Applier {
    61  	return &Applier{}
    62  }
    63  
    64  // RegisterFlags binds all of the flags needed for applying and managing pallets
    65  // applied via `lift`
    66  func (a *Applier) RegisterFlags(fs *rags.RagSet) {
    67  	// client machinery
    68  	a.kubecfg.RegisterFlags(fs.FlagSet())
    69  	fs.StringVar(&a.infraKubeCtx, "infra-context", "", "name of the kubeconfig context to use to schedule infra objects.  default behavior is to apply all objects to the same K8s context")
    70  	fs.DurationVar(&a.timeout, "wait", time.Second*120, "how long to wait for resources to become ready")
    71  	fs.BoolVar(&a.force, "force", true, "force re-creation of resources via server-side apply if immutability conflicts are encountered")
    72  }
    73  
    74  // TODO: refactor, WithKlient alone would be preferred for some use cases
    75  func (a *Applier) BeforeRun(ctx context.Context, r sink.Run) (context.Context, sink.Run, error) {
    76  	var err error
    77  
    78  	a.scheme = createScheme()
    79  
    80  	// set up base K8s client
    81  	a.Klient, err = a.kubecfg.Client(client.Options{Scheme: a.scheme})
    82  	if err != nil {
    83  		return ctx, r, fmt.Errorf("failed to create k8s client: %w", err)
    84  	}
    85  	a.mapper, err = a.kubecfg.Mapper()
    86  	if err != nil {
    87  		return ctx, r, fmt.Errorf("failed to instantiate REST Mapper: %w", err)
    88  	}
    89  	d, err := a.kubecfg.DynamicClient()
    90  	if err != nil {
    91  		return ctx, r, fmt.Errorf("failed to instantiate Dynamic client: %w", err)
    92  	}
    93  	a.ResourceManager = a.resourceManager(a.mapper, a.Klient, d)
    94  
    95  	// by default, the same K8s client is used for both base and infra layers
    96  	if a.infraKubeCtx == "" {
    97  		a.InfraKlient = a.Klient
    98  		// set internal state so that correct infra context is printed during usage
    99  		a.infraKubeCtx = a.kubecfg.Context
   100  	} else {
   101  		// if a specific context for infra is provided, create an infra-specific client
   102  		// and replace it
   103  		if err := a.initializeInfraKlient(); err != nil {
   104  			return ctx, r, fmt.Errorf("failed to create k8s infrastructure client: %w", err)
   105  		}
   106  		d, err = a.kubecfg.DynamicClient()
   107  		if err != nil {
   108  			return ctx, r, fmt.Errorf("failed to instantiate Dynamic client for infrastructure: %w", err)
   109  		}
   110  		a.mapper, err = a.kubecfg.Mapper()
   111  		if err != nil {
   112  			return ctx, r, fmt.Errorf("failed to instantiate REST Mapper for infrastructure: %w", err)
   113  		}
   114  	}
   115  	// create resource manager based on resolved infra klient
   116  	a.InfraResourceManager = a.resourceManager(a.mapper, a.InfraKlient, d)
   117  
   118  	return ctx, r, nil
   119  }
   120  
   121  func (a *Applier) Apply(ctx context.Context, artifact oci.Artifact, opts ...unpack.Option) error {
   122  	if a.infraNamespace != "" {
   123  		if err := a.InfraKlient.Create(ctx,
   124  			&corev1.Namespace{
   125  				ObjectMeta: metav1.ObjectMeta{Name: a.infraNamespace},
   126  			},
   127  		); client.IgnoreAlreadyExists(err) != nil {
   128  			return fmt.Errorf("failed to create infra namespace %s: %w",
   129  				a.infraNamespace, err)
   130  		}
   131  		log.Println("creating infrastructure namespace", a.infraNamespace)
   132  	}
   133  
   134  	return unpack.Walk(artifact, func(p pallet.Pallet, layers []layer.Layer) error {
   135  		return a.apply(ctx, p, layers)
   136  	}, opts...)
   137  }
   138  
   139  func (a *Applier) apply(ctx context.Context, p pallet.Pallet, layers []layer.Layer) error {
   140  	if len(layers) == 0 {
   141  		log.Printf("skipping %s because it did not produce any manifests for the "+
   142  			"current unpacking options", p.Name())
   143  		return nil
   144  	}
   145  	log.Printf("applying %s", p.Name())
   146  	log.Println()
   147  
   148  	var (
   149  		runtime = make([]*unstructured.Unstructured, 0)
   150  		infra   = make([]*unstructured.Unstructured, 0)
   151  	)
   152  	for _, l := range layers {
   153  		lo, err := l.Unstructured()
   154  		if err != nil {
   155  			return err
   156  		}
   157  		switch l.Type() {
   158  		case layer.Infra:
   159  			infra = append(infra, lo...)
   160  		case layer.Runtime:
   161  			runtime = append(runtime, lo...)
   162  		}
   163  	}
   164  
   165  	// runtime first
   166  	runtimeChanges, err := a.applyObjects(ctx, a.ResourceManager, runtime)
   167  	if err != nil {
   168  		return err
   169  	}
   170  	// then infra
   171  	infraChanges, err := a.applyObjects(ctx, a.InfraResourceManager, infra)
   172  	if err != nil {
   173  		return err
   174  	}
   175  
   176  	// then wait in inverse order, because usually runtime resources need infra
   177  	// to provision external state / cloud resources
   178  	if err := a.waitForSet(ctx, layer.Infra, a.InfraResourceManager, infraChanges); err != nil {
   179  		return err
   180  	}
   181  	return a.waitForSet(ctx, layer.Runtime, a.ResourceManager, runtimeChanges)
   182  }
   183  
   184  func (a *Applier) applyObjects(ctx context.Context, mgr *sap.ResourceManager, objs []*unstructured.Unstructured) (*sap.ChangeSet, error) {
   185  	changes := sap.NewChangeSet()
   186  	for _, o := range objs {
   187  		change, err := mgr.Apply(ctx, o, a.ApplyOpts())
   188  		if err != nil {
   189  			return nil, err
   190  		}
   191  		log.Println(change)
   192  		changes.Add(*change)
   193  	}
   194  	return changes, nil
   195  }
   196  
   197  func (a *Applier) waitForSet(ctx context.Context, t layer.Type, mgr *sap.ResourceManager, c *sap.ChangeSet) error {
   198  	if len(c.Entries) > 0 {
   199  		log.Println()
   200  		log.Printf("waiting for %s resources to become ready...\n", t)
   201  		if err := mgr.WaitForSet(ctx, c.ToObjMetadataSet(), a.WaitOpts()); err != nil {
   202  			statusDump(ctx, mgr, c)
   203  			return err
   204  		}
   205  		log.Println("...done")
   206  		log.Println()
   207  	}
   208  
   209  	return nil
   210  }
   211  
   212  func (a *Applier) Info(r sink.Run) {
   213  	kubecfg, err := a.kubecfg.RawConfig()
   214  	if err != nil {
   215  		r.Log.Error(err, "failed to load kubeconfig")
   216  		return
   217  	}
   218  
   219  	values := []any{
   220  		"k8s-context", kubecfg.CurrentContext,
   221  	}
   222  	if a.infraKubeCtx != "" {
   223  		values = append(values, "infra-k8s-context", a.infraKubeCtx)
   224  	}
   225  
   226  	r.Log.Info("applying", values...)
   227  }
   228  
   229  func (a *Applier) initializeInfraKlient() error {
   230  	// verify that infra context exists in kubeconfig
   231  	raw, err := a.kubecfg.RawConfig()
   232  	if err != nil {
   233  		return fmt.Errorf("failed to read kubeconfig: %w", err)
   234  	}
   235  	if _, ok := raw.Contexts[a.infraKubeCtx]; !ok {
   236  		return fmt.Errorf("infra kube context %s does not exist", a.infraKubeCtx)
   237  	}
   238  
   239  	// TODO(aw185176): this seems hacky, k8s/runtime/client should probably
   240  	// provide some helpers for doing this more cleanly
   241  	a.kubecfg.Context = a.infraKubeCtx
   242  	if err := a.kubecfg.SetupClientConfig(); err != nil {
   243  		return fmt.Errorf("failed to set up client loading config for infra client: %w", err)
   244  	}
   245  	a.InfraKlient, err = a.kubecfg.Client(client.Options{Scheme: a.scheme})
   246  	if err != nil {
   247  		return fmt.Errorf("failed to instantiate infra client: %w", err)
   248  	}
   249  	return nil
   250  }
   251  
   252  // resourceManager creates a standard Lift CLI resource manager for server-side
   253  // apply based on the client you provide it.
   254  func (a *Applier) resourceManager(m meta.RESTMapper, c client.Client, d dynamic.Interface) *sap.ResourceManager {
   255  	return sap.NewResourceManager(
   256  		c,
   257  		watcher.NewDefaultStatusWatcher(d, m),
   258  		sap.Owner{Field: "lift"},
   259  	)
   260  }
   261  
   262  func (a *Applier) ApplyOpts() sap.ApplyOptions {
   263  	return sap.ApplyOptions{
   264  		Force:       a.force,
   265  		WaitTimeout: a.timeout,
   266  	}
   267  }
   268  
   269  func (a *Applier) WaitOpts() sap.WaitOptions {
   270  	return sap.WaitOptions{
   271  		Timeout: a.timeout,
   272  	}
   273  }
   274  
   275  func createScheme() *runtime.Scheme {
   276  	scheme := runtime.NewScheme()
   277  	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
   278  	utilruntime.Must(whv1.AddToScheme(scheme))
   279  
   280  	return scheme
   281  }
   282  
   283  func statusDump(ctx context.Context, mgr *sap.ResourceManager, changeSet *sap.ChangeSet) {
   284  	for i := range changeSet.Entries {
   285  		gv, err := schema.ParseGroupVersion(changeSet.Entries[i].GroupVersion)
   286  		if err != nil {
   287  			continue
   288  		}
   289  		groupVersion := schema.GroupVersion{
   290  			Group:   changeSet.Entries[i].ObjMetadata.GroupKind.Group,
   291  			Version: gv.Version,
   292  		}
   293  		if err != nil {
   294  			continue
   295  		}
   296  		kind := changeSet.Entries[i].ObjMetadata.GroupKind.Kind
   297  		name := changeSet.Entries[i].ObjMetadata.Name
   298  		namespace := changeSet.Entries[i].ObjMetadata.Namespace
   299  		obj := unstructured.New(groupVersion, kind, namespace, name)
   300  
   301  		err = mgr.Client().Get(ctx, client.ObjectKeyFromObject(obj), obj)
   302  		if err != nil {
   303  			log.Print(err)
   304  			continue
   305  		}
   306  		status := object.GetConditions(obj)
   307  		for _, s := range status {
   308  			if s.Status != "True" {
   309  				log.Printf("Object %s Status:%s Reason:%s Message:%s",
   310  					obj.GetName(), s.Status, s.Reason, s.Message)
   311  			}
   312  		}
   313  	}
   314  }
   315  

View as plain text