package clusterctl import ( "context" "errors" "os" "runtime/pprof" "strings" "sync" "time" containerAPI "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/container/v1beta1" iamv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/iam/v1beta1" kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1" sourceApi "github.com/fluxcd/source-controller/api/v1" "github.com/go-logr/logr" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" ctrl "sigs.k8s.io/controller-runtime" "edge-infra.dev/pkg/edge/api/services" "edge-infra.dev/pkg/edge/api/services/channels" "edge-infra.dev/pkg/edge/api/services/edgenode" "edge-infra.dev/pkg/edge/api/types" clusterApi "edge-infra.dev/pkg/edge/apis/cluster/v1alpha1" gkeClusterApi "edge-infra.dev/pkg/edge/apis/gkecluster/v1alpha1" syncedobjectApi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/activationcode" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/clusternetworkservice" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/clustersecrets" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/dnsconfig" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/emissarycert" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/helmreleases" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/helmreleases/cache/providers/memory" infoconfigmaps "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/info-configmaps" loglevels "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/log-levels" logreplay "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/log-replay" pluginmetrics "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/metrics" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/multikustomization" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/spegelconfig" "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/vpnconfig" "edge-infra.dev/pkg/edge/controllers/dbmetrics" "edge-infra.dev/pkg/edge/controllers/util/edgedb" whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2" kccAPI "edge-infra.dev/pkg/k8s/konfigkonnector/apis/configconnector/v1beta1" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" "edge-infra.dev/pkg/k8s/runtime/events" secretMgrApi "edge-infra.dev/pkg/lib/gcp/secretmanager" "edge-infra.dev/pkg/lib/logging" v1vpnconfig "edge-infra.dev/pkg/sds/remoteaccess/k8s/apis/vpnconfigs/v1" ) const ( Name = "cluster-controller" reconcileTimeLimit = 10 * time.Minute stateMonitoringInterval = 2 * time.Minute ) // Used to check if clusterctl is still reconciling resources. // // TODO(om185040): Remove all reconcile metadata stuff once the following issue is // closed, https://github.com/ncr-swt-retail/edge-roadmap/issues/9759. var reconcileMetadata = struct { sync.Mutex // Contains all of the Clusters currently being reconciled // (map[clusterEdgeID]reconcileStartTime) clusterResources map[string]time.Time // Contains all of the GKEClusters currently being reconciled // (map[clusterEdgeID]reconcileStartTime) gkeClusterResources map[string]time.Time // Timestamp that marks the beginning of the latest reconciliation lastReconcileTime time.Time }{ clusterResources: make(map[string]time.Time), gkeClusterResources: make(map[string]time.Time), lastReconcileTime: time.Now(), } // Run creates the manager, sets up the controller, and then starts // everything. It returns the created manager for testing purposes func Run(config Config, o ...controller.Option) error { registerPlugins(config) mgr, log, err := Create(config, o...) if err != nil { return err } go func() { log = log.WithName("clusterctl-state-monitor").WithValues("reconcileTimeLimit", reconcileTimeLimit.String(), "interval", stateMonitoringInterval.String()) log.Info("starting goroutine to monitor clusterctl's state") for isClusterctlFunctioning(log) { } // Clusterctl has stopped reconciling resources, attempt to log as much relevant // info as possible using the 'goroutine' and 'block' profiles before exiting. sb := &strings.Builder{} err := pprof.Lookup("goroutine").WriteTo(sb, 1) if err != nil { log.Error(err, "failed to write pprof snapshot of the goroutine profile") } log.Info("logging stack traces of all current goroutines", "stacktrace", sb.String()) sb.Reset() err = pprof.Lookup("block").WriteTo(sb, 1) if err != nil { log.Error(err, "failed to write pprof snapshot of the block profile") } log.Info("logging stack traces that led to blocking on synchronization primitives", "stacktrace", sb.String()) os.Exit(1) }() log.Info("starting manager") if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { log.Error(err, "problem running manager") return err } return nil } // Create wires up the reconciler(s) with a created manager and returns the // manager + setup logger func Create(config Config, o ...controller.Option) (ctrl.Manager, logr.Logger, error) { ctrl.SetLogger(logging.NewLogger().Logger) log := ctrl.Log.WithName("setup") cfg, opts := controller.ProcessOptions(o...) opts.LeaderElectionID = "clusterctl.edge.ncr.com" opts.Scheme = createScheme() mgr, err := ctrl.NewManager(cfg, opts) if err != nil { log.Error(err, "failed to create manager") return nil, logr.Logger{}, err } c, err := memory.New(config.HelmCacheLimit) if err != nil { log.Error(err, "failed to initialize helm cache") return nil, logr.Logger{}, err } eventRecorder := events.NewRecorder(mgr, ctrl.Log, Name) dbm := dbmetrics.New("clusterctl") collectors := append(dbm.Collectors(), pluginmetrics.PluginExecutionTimeMetric, pluginmetrics.PluginFinalizerTimeMetric, pluginmetrics.PluginExecutionErrorCountMetric, pluginmetrics.PluginFinalizerErrorCountMetric, pluginmetrics.RegisteredPluginsCountMetric) var clusterReconciler = &ClusterReconciler{ manager: mgr, Client: mgr.GetClient(), EventRecorder: eventRecorder, Log: ctrl.Log.WithName("cluster-reconciler"), Metrics: metrics.New(mgr, "clusterctl", metrics.WithCollectors(collectors...)), Config: &config, DefaultRequeue: config.DefaultRequeue, WaitForSetTimeout: config.WaitForSetTimeout, Name: Name, Conditions: clusterConditions, EdgeDB: &edgedb.EdgeDB{DB: config.DB}, Recorder: dbm, Concurrency: config.ClusterReconcilerConcurrency, WaitForSetMap: &sync.Map{}, c: make(chan int, config.BackgroundWaitForSetConcurrency), HelmCache: c, } if err := clusterReconciler.setResourceManager(); err != nil { log.Error(err, "failed to set resource manager", "controller", "cluster-reconciler") return nil, logr.Logger{}, err } if err := clusterReconciler.SetupWithManager(mgr); err != nil { log.Error(err, "failed to create controller and set up with manager", "controller", "cluster-reconciler") return nil, logr.Logger{}, err } var gkeclusterReconciler = &GKEClusterReconciler{ manager: mgr, Client: mgr.GetClient(), EventRecorder: eventRecorder, Log: ctrl.Log.WithName("gkecluster-reconciler"), Metrics: metrics.New(mgr, "gke_clusterctl"), Scheme: mgr.GetScheme(), WaitForSetTimeout: config.WaitForSetTimeout, EdgeAPI: config.EdgeAPI, CreateClient: config.CreateClient, IPRangerClient: config.IPRangerClient, DefaultRequeue: config.DefaultRequeue, TopLevelProject: config.TopLevelProjectID, TopLevelCNRMSA: config.TopLevelCNRMSA, TotpSecret: config.TotpSecret, Name: "gke-cluster-controller", Conditions: gkeClusterConditions, Concurrency: config.GKEClusterReconcilerConcurrency, } if err := gkeclusterReconciler.setResourceManager(); err != nil { log.Error(err, "unable to set GKEClusterReconciler resource manager", "controller", "gkecluster-reconciler") return nil, logr.Logger{}, err } if err := gkeclusterReconciler.SetupWithManager(mgr); err != nil { log.Error(err, "unable to set up GKEClusterReconciler", "controller", "gkecluster-reconciler") return nil, logr.Logger{}, err } return mgr, log, nil } func createScheme() *runtime.Scheme { scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(clusterApi.AddToScheme(scheme)) utilruntime.Must(containerAPI.AddToScheme(scheme)) utilruntime.Must(syncedobjectApi.AddToScheme(scheme)) utilruntime.Must(gkeClusterApi.AddToScheme(scheme)) utilruntime.Must(iamv1beta1.AddToScheme(scheme)) utilruntime.Must(sourceApi.AddToScheme(scheme)) utilruntime.Must(kustomizeApi.AddToScheme(scheme)) utilruntime.Must(apiextensionsv1.AddToScheme(scheme)) utilruntime.Must(kccAPI.AddToScheme(scheme)) utilruntime.Must(whv1.AddToScheme(scheme)) utilruntime.Must(v1vpnconfig.AddToScheme(scheme)) return scheme } // registerPlugins add new plugins here to be automatically executed by controller. func registerPlugins(config Config) { // Initialize API services needed for plugins. artifactRegistrySvc := services.NewArtifactRegistryService(config.DB) bannerSvc := services.NewBannerService(nil, nil, nil, config.TopLevelProjectID, config.DB, &types.Config{}) bootstrapSvc := services.NewBootstrapService("", nil, config.DB) bslSiteSvc := services.NewBSLSiteService(config.BSLClient, config.DB) clusterConfigSvc := services.NewClusterConfigService(config.DB) helmSvc := services.NewHelmService(nil, nil, config.GCPService, config.DB, nil, nil) labelSvc := services.NewLabelService(config.ArtifactsService, config.DB) logReplaySvc := services.NewLogReplayService(config.DB) storeClusterSvc := services.NewStoreClusterService(nil, nil, config.DB, nil, nil, nil) channelSvc := channels.NewChannelService(config.DB, config.TopLevelProjectID, nil) terminalSvc := services.NewTerminalService(config.DB, labelSvc) activationCodeSvc := edgenode.NewActivationCodeService(config.DB, terminalSvc, storeClusterSvc, clusterConfigSvc, nil, nil, bannerSvc) secretManagerProvider := func(ctx context.Context, projectID string) (types.SecretManagerService, error) { return secretMgrApi.NewWithOptions(ctx, projectID) } // Register the Cluster Secrets Plugin. plugins.Register(clustersecrets.Plugin{ SecretManagerProvider: secretManagerProvider, TopLevelProjectID: config.TopLevelProjectID, }) // Register the MultiKustomization Plugin. plugins.Register(multikustomization.NewPlugin(storeClusterSvc)) // Register the OktaClient Plugin. plugins.Register(plugins.OktaClientPlugin{ SecretManagerProvider: secretManagerProvider, TopLevelProjectID: config.TopLevelProjectID, DB: config.DB, }) // Register the emissary certs Plugin. plugins.Register(emissarycert.EmissaryCertsPlugin{ SecretManagerProvider: secretManagerProvider, }) // Register the LogLevels Plugin. plugins.Register(loglevels.LogLevelsPlugin{ DB: config.DB, }) // Register the LogReplay Plugin. plugins.Register(logreplay.Plugin{ LogReplayService: logReplaySvc, }) // Register the RemoteAccessIP Plugin. plugins.Register(plugins.RemoteAccessIPPlugin{ DB: config.DB, }) // Register the BootstrapTokens Plugin. plugins.Register(plugins.BootstrapTokensPlugin{ BootstrapService: bootstrapSvc, }) // Register the HelmReleases Plugin. plugins.Register(helmreleases.NewPlugin(config.DB, helmSvc, labelSvc, config.GCPService, storeClusterSvc, channelSvc)) // Register the SpegelConfig Plugin. plugins.Register(spegelconfig.Plugin{ ArtifactRegistryService: artifactRegistrySvc, }) // Register the InfoConfigmaps Plugin. plugins.Register(infoconfigmaps.Plugin{ BannerService: bannerSvc, BSLSiteService: bslSiteSvc, StoreClusterService: storeClusterSvc, BSLEndpoint: config.BSLConfig.Endpoint, EdgeAPI: config.EdgeAPI, TopLevelProjectID: config.TopLevelProjectID, }) // Register the VpnConfig Plugin. plugins.Register(vpnconfig.NewPlugin(clusterConfigSvc)) // Register the Cluster Network Services Plugin. plugins.Register(clusternetworkservice.NewPlugin(storeClusterSvc)) // Register the Store DNS Config Plugin. plugins.Register(dnsconfig.NewPlugin(storeClusterSvc)) // Register the ActivationCode Plugin. plugins.Register(activationcode.NewPlugin(activationCodeSvc, clusterConfigSvc, terminalSvc)) } // isClusterctlFunctioning determines if clusterctl is still reconciling resources. func isClusterctlFunctioning(log logr.Logger) bool { // Sleep here in order to only perform this check every n minutes. time.Sleep(stateMonitoringInterval) now := time.Now() reconcileMetadata.Lock() defer reconcileMetadata.Unlock() // Log any resources that have a reconcile start time difference greater // than the set time limit. var diff time.Duration for name, start := range reconcileMetadata.clusterResources { diff = now.Sub(start) if diff > reconcileTimeLimit { log.Info("stalled resource found", "cluster_name", name, "start_time", start, "duration", diff.String()) } } diff = now.Sub(reconcileMetadata.lastReconcileTime) // Check if the lastest reconcile start time difference from now is over the time // limit. If so, exit since clusterctl has stopped reconciling resources. if diff > reconcileTimeLimit { err := errors.New("clusterctl has stopped reconciling resources") log.Error(err, "exiting to force a restart", "lastReconcileTime", reconcileMetadata.lastReconcileTime, "clusters", reconcileMetadata.clusterResources, "gkeClusters", reconcileMetadata.gkeClusterResources, ) return false } return true } // updateReconcileMetadata updates the reconcileMetadata global var with information // from the latest reconciliation. func updateReconcileMetadata(ctx context.Context, obj conditions.Getter, reconcileStart time.Time) { log := ctrl.LoggerFrom(ctx).WithName("update-reconcile-metadata").WithValues("lastReconcileTime", reconcileStart) kind := obj.GetObjectKind().GroupVersionKind().Kind reconcileMetadata.Lock() defer reconcileMetadata.Unlock() switch kind { case clusterApi.Kind: reconcileMetadata.clusterResources[obj.GetName()] = reconcileStart case gkeClusterApi.Kind: reconcileMetadata.gkeClusterResources[obj.GetName()] = reconcileStart } reconcileMetadata.lastReconcileTime = reconcileStart log.Info("reconcile metadata updated", "clusters", reconcileMetadata.clusterResources, "gkeClusters", reconcileMetadata.gkeClusterResources) } // deleteResourceEntry removes an entry from the appropriate map in 'reconcileMetadata' // using the 'obj' that is passed in. func deleteResourceEntry(obj conditions.Getter) { kind := obj.GetObjectKind().GroupVersionKind().Kind reconcileMetadata.Lock() defer reconcileMetadata.Unlock() switch kind { case clusterApi.Kind: delete(reconcileMetadata.clusterResources, obj.GetName()) case gkeClusterApi.Kind: delete(reconcileMetadata.gkeClusterResources, obj.GetName()) } }