...

Source file src/edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/controller.go

Documentation: edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl

     1  package lumperctl
     2  
     3  import (
     4  	"fmt"
     5  	"os"
     6  
     7  	"github.com/go-logr/logr"
     8  	ggcrlog "github.com/google/go-containerregistry/pkg/logs"
     9  	"k8s.io/apimachinery/pkg/runtime"
    10  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    11  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    12  	ctrl "sigs.k8s.io/controller-runtime"
    13  
    14  	whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
    15  	"edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/internal"
    16  	"edge-infra.dev/pkg/k8s/runtime/controller"
    17  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    18  	"edge-infra.dev/pkg/lib/fog"
    19  	"edge-infra.dev/pkg/lib/runtime/version"
    20  )
    21  
    22  // Run creates the manager, sets up the controller, and then starts
    23  // everything. Errors are returned for handling in package main.
    24  func Run(opts ...controller.Option) error {
    25  	// use the controller's logger to write logs produced by imported pkgs
    26  	SetPackageLoggers(fog.New())
    27  	log := ctrl.Log.WithName("startup")
    28  	log.Info("version", "version", version.New().String())
    29  
    30  	// create config first, so that controller-runtime flags are parsed when
    31  	// we create the manager
    32  	cfg, err := newConfig(os.Args[1:])
    33  	if err != nil {
    34  		log.Error(err, "failed to instantiate config")
    35  		return err
    36  	}
    37  
    38  	healthzChecker := internal.New(ctrl.Log.WithName("lumper-liveness"))
    39  	mgrCreator := CreateMgr(healthzChecker)
    40  	mgr, err := mgrCreator(opts...)
    41  
    42  	if err != nil {
    43  		log.Error(err, "failed to create manager")
    44  		return err
    45  	}
    46  
    47  	ctx := ctrl.SetupSignalHandler()
    48  	if err := cfg.afterParse(ctx, mgr.GetClient()); err != nil {
    49  		log.Error(err, "failed to compute config values after parsing")
    50  		return err
    51  	}
    52  	// TODO: this always prints '{}'. either export fields or remove
    53  	log.Info("parsed options", "config", cfg)
    54  
    55  	if err := RegisterControllers(mgr, cfg, healthzChecker); err != nil {
    56  		log.Error(err, "failed to register controllers")
    57  		return err
    58  	}
    59  
    60  	log.Info("starting manager")
    61  	if err := mgr.Start(ctx); err != nil {
    62  		log.Error(err, "manager encountered a fatal error")
    63  		return err
    64  	}
    65  
    66  	return nil
    67  }
    68  
    69  func CreateMgr(healthzChecker *internal.LivenessChecker) func(o ...controller.Option) (ctrl.Manager, error) {
    70  	return func(o ...controller.Option) (ctrl.Manager, error) {
    71  		cfg, opts := controller.ProcessOptions(o...)
    72  		opts.LeaderElectionID = "lumperctl20ds.edge.ncr.com"
    73  		opts.Scheme = createScheme()
    74  		opts.HealthProbeBindAddress = ":8082"
    75  
    76  		mgr, err := ctrl.NewManager(cfg, opts)
    77  		if err != nil {
    78  			return nil, err
    79  		}
    80  
    81  		if err := mgr.AddHealthzCheck("lumper-status", healthzChecker.Check); err != nil {
    82  			return nil, fmt.Errorf("failed to set up Liveness Check: %w", err)
    83  		}
    84  
    85  		return mgr, nil
    86  	}
    87  }
    88  
    89  // registerControllers instantiates each reconciler for this controller and
    90  // registers it with the input controller manager
    91  func RegisterControllers(mgr ctrl.Manager, cfg *Config, healthzChecker *internal.LivenessChecker) error {
    92  	metrics := metrics.New(mgr, "lpctl",
    93  		metrics.WithSuspend(),
    94  		metrics.WithReason(),
    95  		metrics.WithCollectors(
    96  			internal.CacheReadOpsMetric,
    97  			internal.CacheLenMetric,
    98  			internal.EdgeVersionMetric,
    99  		),
   100  	)
   101  
   102  	if err := (&UnpackedPalletReconciler{
   103  		Reconciler: &internal.Reconciler{
   104  			OwnerGroupLabel: whv1.UnpackedPalletLabel,
   105  			Conditions:      unpackedPalletConditions,
   106  			Metrics:         metrics,
   107  			Cache:           cfg.Cache,
   108  			Provider:        cfg.Provider,
   109  			LivenessChecker: healthzChecker,
   110  		},
   111  	}).SetupWithManager(mgr, cfg.UpCfg); err != nil {
   112  		return fmt.Errorf("failed to set up UnpackedPalletReconciler: %w", err)
   113  	}
   114  
   115  	if err := (&ShipmentReconciler{
   116  		Reconciler: &internal.Reconciler{
   117  			OwnerGroupLabel: whv1.ShipmentLabel,
   118  			Conditions:      shipmentConditions,
   119  			Metrics:         metrics,
   120  			Cache:           cfg.Cache,
   121  			Provider:        cfg.Provider,
   122  			LivenessChecker: healthzChecker,
   123  		},
   124  	}).SetupWithManager(mgr, cfg.ShipCfg); err != nil {
   125  		return fmt.Errorf("failed to set up ShipmentReconciler: %w", err)
   126  	}
   127  
   128  	return nil
   129  }
   130  
   131  func createScheme() *runtime.Scheme {
   132  	scheme := runtime.NewScheme()
   133  	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
   134  	utilruntime.Must(whv1.AddToScheme(scheme))
   135  
   136  	return scheme
   137  }
   138  
   139  func SetPackageLoggers(log logr.Logger) {
   140  	ctrl.SetLogger(log)
   141  
   142  	// inject log adapters for ggcr so errors and diagnostic logs it produces are
   143  	// written the same as other controller logs. by default, this package discards
   144  	// all log messages
   145  	gcr := "go-containerregistry"
   146  	ggcrlog.Debug.SetOutput(fog.InfoWriter(log, gcr))
   147  	ggcrlog.Progress.SetOutput(fog.InfoWriter(log, gcr))
   148  	ggcrlog.Warn.SetOutput(fog.ErrorWriter(log, gcr))
   149  }
   150  

View as plain text