...

Source file src/edge-infra.dev/pkg/edge/controllers/clusterctl/controller.go

Documentation: edge-infra.dev/pkg/edge/controllers/clusterctl

     1  package clusterctl
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"os"
     7  	"runtime/pprof"
     8  	"strings"
     9  	"sync"
    10  	"time"
    11  
    12  	containerAPI "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/container/v1beta1"
    13  	iamv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/iam/v1beta1"
    14  	kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1"
    15  	sourceApi "github.com/fluxcd/source-controller/api/v1"
    16  	"github.com/go-logr/logr"
    17  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    18  	"k8s.io/apimachinery/pkg/runtime"
    19  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    20  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    21  	ctrl "sigs.k8s.io/controller-runtime"
    22  
    23  	"edge-infra.dev/pkg/edge/api/services"
    24  	"edge-infra.dev/pkg/edge/api/services/channels"
    25  	"edge-infra.dev/pkg/edge/api/services/edgenode"
    26  	"edge-infra.dev/pkg/edge/api/types"
    27  	clusterApi "edge-infra.dev/pkg/edge/apis/cluster/v1alpha1"
    28  	gkeClusterApi "edge-infra.dev/pkg/edge/apis/gkecluster/v1alpha1"
    29  	syncedobjectApi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1"
    30  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins"
    31  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/activationcode"
    32  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/clusternetworkservice"
    33  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/clustersecrets"
    34  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/dnsconfig"
    35  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/emissarycert"
    36  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/helmreleases"
    37  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/helmreleases/cache/providers/memory"
    38  	infoconfigmaps "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/info-configmaps"
    39  	loglevels "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/log-levels"
    40  	logreplay "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/log-replay"
    41  	pluginmetrics "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/metrics"
    42  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/multikustomization"
    43  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/spegelconfig"
    44  	"edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/vpnconfig"
    45  	"edge-infra.dev/pkg/edge/controllers/dbmetrics"
    46  	"edge-infra.dev/pkg/edge/controllers/util/edgedb"
    47  	whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
    48  	kccAPI "edge-infra.dev/pkg/k8s/konfigkonnector/apis/configconnector/v1beta1"
    49  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    50  	"edge-infra.dev/pkg/k8s/runtime/controller"
    51  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    52  	"edge-infra.dev/pkg/k8s/runtime/events"
    53  	secretMgrApi "edge-infra.dev/pkg/lib/gcp/secretmanager"
    54  	"edge-infra.dev/pkg/lib/logging"
    55  	v1vpnconfig "edge-infra.dev/pkg/sds/remoteaccess/k8s/apis/vpnconfigs/v1"
    56  )
    57  
    58  const (
    59  	Name                    = "cluster-controller"
    60  	reconcileTimeLimit      = 10 * time.Minute
    61  	stateMonitoringInterval = 2 * time.Minute
    62  )
    63  
    64  // Used to check if clusterctl is still reconciling resources.
    65  //
    66  // TODO(om185040): Remove all reconcile metadata stuff once the following issue is
    67  // closed, https://github.com/ncr-swt-retail/edge-roadmap/issues/9759.
    68  var reconcileMetadata = struct {
    69  	sync.Mutex
    70  
    71  	// Contains all of the Clusters currently being reconciled
    72  	// (map[clusterEdgeID]reconcileStartTime)
    73  	clusterResources map[string]time.Time
    74  
    75  	// Contains all of the GKEClusters currently being reconciled
    76  	// (map[clusterEdgeID]reconcileStartTime)
    77  	gkeClusterResources map[string]time.Time
    78  
    79  	// Timestamp that marks the beginning of the latest reconciliation
    80  	lastReconcileTime time.Time
    81  }{
    82  	clusterResources:    make(map[string]time.Time),
    83  	gkeClusterResources: make(map[string]time.Time),
    84  	lastReconcileTime:   time.Now(),
    85  }
    86  
    87  // Run creates the manager, sets up the controller, and then starts
    88  // everything.  It returns the created manager for testing purposes
    89  func Run(config Config, o ...controller.Option) error {
    90  	registerPlugins(config)
    91  	mgr, log, err := Create(config, o...)
    92  	if err != nil {
    93  		return err
    94  	}
    95  
    96  	go func() {
    97  		log = log.WithName("clusterctl-state-monitor").WithValues("reconcileTimeLimit", reconcileTimeLimit.String(), "interval", stateMonitoringInterval.String())
    98  		log.Info("starting goroutine to monitor clusterctl's state")
    99  
   100  		for isClusterctlFunctioning(log) {
   101  		}
   102  
   103  		// Clusterctl has stopped reconciling resources, attempt to log as much relevant
   104  		// info as possible using the 'goroutine' and 'block' profiles before exiting.
   105  		sb := &strings.Builder{}
   106  		err := pprof.Lookup("goroutine").WriteTo(sb, 1)
   107  		if err != nil {
   108  			log.Error(err, "failed to write pprof snapshot of the goroutine profile")
   109  		}
   110  		log.Info("logging stack traces of all current goroutines", "stacktrace", sb.String())
   111  
   112  		sb.Reset()
   113  		err = pprof.Lookup("block").WriteTo(sb, 1)
   114  		if err != nil {
   115  			log.Error(err, "failed to write pprof snapshot of the block profile")
   116  		}
   117  		log.Info("logging stack traces that led to blocking on synchronization primitives", "stacktrace", sb.String())
   118  
   119  		os.Exit(1)
   120  	}()
   121  
   122  	log.Info("starting manager")
   123  	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
   124  		log.Error(err, "problem running manager")
   125  		return err
   126  	}
   127  
   128  	return nil
   129  }
   130  
   131  // Create wires up the reconciler(s) with a created manager and returns the
   132  // manager + setup logger
   133  func Create(config Config, o ...controller.Option) (ctrl.Manager, logr.Logger, error) {
   134  	ctrl.SetLogger(logging.NewLogger().Logger)
   135  	log := ctrl.Log.WithName("setup")
   136  
   137  	cfg, opts := controller.ProcessOptions(o...)
   138  	opts.LeaderElectionID = "clusterctl.edge.ncr.com"
   139  	opts.Scheme = createScheme()
   140  
   141  	mgr, err := ctrl.NewManager(cfg, opts)
   142  	if err != nil {
   143  		log.Error(err, "failed to create manager")
   144  		return nil, logr.Logger{}, err
   145  	}
   146  
   147  	c, err := memory.New(config.HelmCacheLimit)
   148  	if err != nil {
   149  		log.Error(err, "failed to initialize helm cache")
   150  		return nil, logr.Logger{}, err
   151  	}
   152  
   153  	eventRecorder := events.NewRecorder(mgr, ctrl.Log, Name)
   154  	dbm := dbmetrics.New("clusterctl")
   155  	collectors := append(dbm.Collectors(),
   156  		pluginmetrics.PluginExecutionTimeMetric,
   157  		pluginmetrics.PluginFinalizerTimeMetric,
   158  		pluginmetrics.PluginExecutionErrorCountMetric,
   159  		pluginmetrics.PluginFinalizerErrorCountMetric,
   160  		pluginmetrics.RegisteredPluginsCountMetric)
   161  
   162  	var clusterReconciler = &ClusterReconciler{
   163  		manager:           mgr,
   164  		Client:            mgr.GetClient(),
   165  		EventRecorder:     eventRecorder,
   166  		Log:               ctrl.Log.WithName("cluster-reconciler"),
   167  		Metrics:           metrics.New(mgr, "clusterctl", metrics.WithCollectors(collectors...)),
   168  		Config:            &config,
   169  		DefaultRequeue:    config.DefaultRequeue,
   170  		WaitForSetTimeout: config.WaitForSetTimeout,
   171  		Name:              Name,
   172  		Conditions:        clusterConditions,
   173  		EdgeDB:            &edgedb.EdgeDB{DB: config.DB},
   174  		Recorder:          dbm,
   175  		Concurrency:       config.ClusterReconcilerConcurrency,
   176  		WaitForSetMap:     &sync.Map{},
   177  		c:                 make(chan int, config.BackgroundWaitForSetConcurrency),
   178  		HelmCache:         c,
   179  	}
   180  	if err := clusterReconciler.setResourceManager(); err != nil {
   181  		log.Error(err, "failed to set resource manager", "controller", "cluster-reconciler")
   182  		return nil, logr.Logger{}, err
   183  	}
   184  	if err := clusterReconciler.SetupWithManager(mgr); err != nil {
   185  		log.Error(err, "failed to create controller and set up with manager", "controller", "cluster-reconciler")
   186  		return nil, logr.Logger{}, err
   187  	}
   188  
   189  	var gkeclusterReconciler = &GKEClusterReconciler{
   190  		manager:           mgr,
   191  		Client:            mgr.GetClient(),
   192  		EventRecorder:     eventRecorder,
   193  		Log:               ctrl.Log.WithName("gkecluster-reconciler"),
   194  		Metrics:           metrics.New(mgr, "gke_clusterctl"),
   195  		Scheme:            mgr.GetScheme(),
   196  		WaitForSetTimeout: config.WaitForSetTimeout,
   197  		EdgeAPI:           config.EdgeAPI,
   198  		CreateClient:      config.CreateClient,
   199  		IPRangerClient:    config.IPRangerClient,
   200  		DefaultRequeue:    config.DefaultRequeue,
   201  		TopLevelProject:   config.TopLevelProjectID,
   202  		TopLevelCNRMSA:    config.TopLevelCNRMSA,
   203  		TotpSecret:        config.TotpSecret,
   204  		Name:              "gke-cluster-controller",
   205  		Conditions:        gkeClusterConditions,
   206  		Concurrency:       config.GKEClusterReconcilerConcurrency,
   207  	}
   208  	if err := gkeclusterReconciler.setResourceManager(); err != nil {
   209  		log.Error(err, "unable to set GKEClusterReconciler resource manager", "controller", "gkecluster-reconciler")
   210  		return nil, logr.Logger{}, err
   211  	}
   212  	if err := gkeclusterReconciler.SetupWithManager(mgr); err != nil {
   213  		log.Error(err, "unable to set up GKEClusterReconciler", "controller", "gkecluster-reconciler")
   214  		return nil, logr.Logger{}, err
   215  	}
   216  
   217  	return mgr, log, nil
   218  }
   219  
   220  func createScheme() *runtime.Scheme {
   221  	scheme := runtime.NewScheme()
   222  
   223  	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
   224  	utilruntime.Must(clusterApi.AddToScheme(scheme))
   225  	utilruntime.Must(containerAPI.AddToScheme(scheme))
   226  	utilruntime.Must(syncedobjectApi.AddToScheme(scheme))
   227  	utilruntime.Must(gkeClusterApi.AddToScheme(scheme))
   228  	utilruntime.Must(iamv1beta1.AddToScheme(scheme))
   229  	utilruntime.Must(sourceApi.AddToScheme(scheme))
   230  	utilruntime.Must(kustomizeApi.AddToScheme(scheme))
   231  	utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
   232  	utilruntime.Must(kccAPI.AddToScheme(scheme))
   233  	utilruntime.Must(whv1.AddToScheme(scheme))
   234  	utilruntime.Must(v1vpnconfig.AddToScheme(scheme))
   235  	return scheme
   236  }
   237  
   238  // registerPlugins add new plugins here to be automatically executed by controller.
   239  func registerPlugins(config Config) {
   240  	// Initialize API services needed for plugins.
   241  	artifactRegistrySvc := services.NewArtifactRegistryService(config.DB)
   242  	bannerSvc := services.NewBannerService(nil, nil, nil, config.TopLevelProjectID, config.DB, &types.Config{})
   243  	bootstrapSvc := services.NewBootstrapService("", nil, config.DB)
   244  	bslSiteSvc := services.NewBSLSiteService(config.BSLClient, config.DB)
   245  	clusterConfigSvc := services.NewClusterConfigService(config.DB)
   246  	helmSvc := services.NewHelmService(nil, nil, config.GCPService, config.DB, nil, nil)
   247  	labelSvc := services.NewLabelService(config.ArtifactsService, config.DB)
   248  	logReplaySvc := services.NewLogReplayService(config.DB)
   249  	storeClusterSvc := services.NewStoreClusterService(nil, nil, config.DB, nil, nil, nil)
   250  	channelSvc := channels.NewChannelService(config.DB, config.TopLevelProjectID, nil)
   251  	terminalSvc := services.NewTerminalService(config.DB, labelSvc)
   252  	activationCodeSvc := edgenode.NewActivationCodeService(config.DB, terminalSvc, storeClusterSvc, clusterConfigSvc, nil, nil, bannerSvc)
   253  	secretManagerProvider := func(ctx context.Context, projectID string) (types.SecretManagerService, error) {
   254  		return secretMgrApi.NewWithOptions(ctx, projectID)
   255  	}
   256  
   257  	// Register the Cluster Secrets Plugin.
   258  	plugins.Register(clustersecrets.Plugin{
   259  		SecretManagerProvider: secretManagerProvider,
   260  		TopLevelProjectID:     config.TopLevelProjectID,
   261  	})
   262  	// Register the MultiKustomization Plugin.
   263  	plugins.Register(multikustomization.NewPlugin(storeClusterSvc))
   264  	// Register the OktaClient Plugin.
   265  	plugins.Register(plugins.OktaClientPlugin{
   266  		SecretManagerProvider: secretManagerProvider,
   267  		TopLevelProjectID:     config.TopLevelProjectID,
   268  		DB:                    config.DB,
   269  	})
   270  	// Register the emissary certs Plugin.
   271  	plugins.Register(emissarycert.EmissaryCertsPlugin{
   272  		SecretManagerProvider: secretManagerProvider,
   273  	})
   274  	// Register the LogLevels Plugin.
   275  	plugins.Register(loglevels.LogLevelsPlugin{
   276  		DB: config.DB,
   277  	})
   278  	// Register the LogReplay Plugin.
   279  	plugins.Register(logreplay.Plugin{
   280  		LogReplayService: logReplaySvc,
   281  	})
   282  	// Register the RemoteAccessIP Plugin.
   283  	plugins.Register(plugins.RemoteAccessIPPlugin{
   284  		DB: config.DB,
   285  	})
   286  	// Register the BootstrapTokens Plugin.
   287  	plugins.Register(plugins.BootstrapTokensPlugin{
   288  		BootstrapService: bootstrapSvc,
   289  	})
   290  	// Register the HelmReleases Plugin.
   291  	plugins.Register(helmreleases.NewPlugin(config.DB, helmSvc, labelSvc, config.GCPService, storeClusterSvc, channelSvc))
   292  	// Register the SpegelConfig Plugin.
   293  	plugins.Register(spegelconfig.Plugin{
   294  		ArtifactRegistryService: artifactRegistrySvc,
   295  	})
   296  	// Register the InfoConfigmaps Plugin.
   297  	plugins.Register(infoconfigmaps.Plugin{
   298  		BannerService:       bannerSvc,
   299  		BSLSiteService:      bslSiteSvc,
   300  		StoreClusterService: storeClusterSvc,
   301  		BSLEndpoint:         config.BSLConfig.Endpoint,
   302  		EdgeAPI:             config.EdgeAPI,
   303  		TopLevelProjectID:   config.TopLevelProjectID,
   304  	})
   305  	// Register the VpnConfig Plugin.
   306  	plugins.Register(vpnconfig.NewPlugin(clusterConfigSvc))
   307  	// Register the Cluster Network Services Plugin.
   308  	plugins.Register(clusternetworkservice.NewPlugin(storeClusterSvc))
   309  	// Register the Store DNS Config Plugin.
   310  	plugins.Register(dnsconfig.NewPlugin(storeClusterSvc))
   311  	// Register the ActivationCode Plugin.
   312  	plugins.Register(activationcode.NewPlugin(activationCodeSvc, clusterConfigSvc, terminalSvc))
   313  }
   314  
   315  // isClusterctlFunctioning determines if clusterctl is still reconciling resources.
   316  func isClusterctlFunctioning(log logr.Logger) bool {
   317  	// Sleep here in order to only perform this check every n minutes.
   318  	time.Sleep(stateMonitoringInterval)
   319  	now := time.Now()
   320  
   321  	reconcileMetadata.Lock()
   322  	defer reconcileMetadata.Unlock()
   323  
   324  	// Log any resources that have a reconcile start time difference greater
   325  	// than the set time limit.
   326  	var diff time.Duration
   327  	for name, start := range reconcileMetadata.clusterResources {
   328  		diff = now.Sub(start)
   329  		if diff > reconcileTimeLimit {
   330  			log.Info("stalled resource found", "cluster_name", name, "start_time", start, "duration", diff.String())
   331  		}
   332  	}
   333  	diff = now.Sub(reconcileMetadata.lastReconcileTime)
   334  
   335  	// Check if the lastest reconcile start time difference from now is over the time
   336  	// limit. If so, exit since clusterctl has stopped reconciling resources.
   337  	if diff > reconcileTimeLimit {
   338  		err := errors.New("clusterctl has stopped reconciling resources")
   339  		log.Error(err, "exiting to force a restart",
   340  			"lastReconcileTime", reconcileMetadata.lastReconcileTime,
   341  			"clusters", reconcileMetadata.clusterResources,
   342  			"gkeClusters", reconcileMetadata.gkeClusterResources,
   343  		)
   344  		return false
   345  	}
   346  
   347  	return true
   348  }
   349  
   350  // updateReconcileMetadata updates the reconcileMetadata global var with information
   351  // from the latest reconciliation.
   352  func updateReconcileMetadata(ctx context.Context, obj conditions.Getter, reconcileStart time.Time) {
   353  	log := ctrl.LoggerFrom(ctx).WithName("update-reconcile-metadata").WithValues("lastReconcileTime", reconcileStart)
   354  	kind := obj.GetObjectKind().GroupVersionKind().Kind
   355  	reconcileMetadata.Lock()
   356  	defer reconcileMetadata.Unlock()
   357  
   358  	switch kind {
   359  	case clusterApi.Kind:
   360  		reconcileMetadata.clusterResources[obj.GetName()] = reconcileStart
   361  	case gkeClusterApi.Kind:
   362  		reconcileMetadata.gkeClusterResources[obj.GetName()] = reconcileStart
   363  	}
   364  	reconcileMetadata.lastReconcileTime = reconcileStart
   365  
   366  	log.Info("reconcile metadata updated", "clusters", reconcileMetadata.clusterResources, "gkeClusters", reconcileMetadata.gkeClusterResources)
   367  }
   368  
   369  // deleteResourceEntry removes an entry from the appropriate map in 'reconcileMetadata'
   370  // using the 'obj' that is passed in.
   371  func deleteResourceEntry(obj conditions.Getter) {
   372  	kind := obj.GetObjectKind().GroupVersionKind().Kind
   373  	reconcileMetadata.Lock()
   374  	defer reconcileMetadata.Unlock()
   375  
   376  	switch kind {
   377  	case clusterApi.Kind:
   378  		delete(reconcileMetadata.clusterResources, obj.GetName())
   379  	case gkeClusterApi.Kind:
   380  		delete(reconcileMetadata.gkeClusterResources, obj.GetName())
   381  	}
   382  }
   383  

View as plain text