1 package clusterctl
2
3 import (
4 "context"
5 "errors"
6 "os"
7 "runtime/pprof"
8 "strings"
9 "sync"
10 "time"
11
12 containerAPI "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/container/v1beta1"
13 iamv1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/clients/generated/apis/iam/v1beta1"
14 kustomizeApi "github.com/fluxcd/kustomize-controller/api/v1"
15 sourceApi "github.com/fluxcd/source-controller/api/v1"
16 "github.com/go-logr/logr"
17 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
18 "k8s.io/apimachinery/pkg/runtime"
19 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
20 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
21 ctrl "sigs.k8s.io/controller-runtime"
22
23 "edge-infra.dev/pkg/edge/api/services"
24 "edge-infra.dev/pkg/edge/api/services/channels"
25 "edge-infra.dev/pkg/edge/api/services/edgenode"
26 "edge-infra.dev/pkg/edge/api/types"
27 clusterApi "edge-infra.dev/pkg/edge/apis/cluster/v1alpha1"
28 gkeClusterApi "edge-infra.dev/pkg/edge/apis/gkecluster/v1alpha1"
29 syncedobjectApi "edge-infra.dev/pkg/edge/apis/syncedobject/apis/v1alpha1"
30 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins"
31 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/activationcode"
32 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/clusternetworkservice"
33 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/clustersecrets"
34 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/dnsconfig"
35 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/emissarycert"
36 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/helmreleases"
37 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/helmreleases/cache/providers/memory"
38 infoconfigmaps "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/info-configmaps"
39 loglevels "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/log-levels"
40 logreplay "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/log-replay"
41 pluginmetrics "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/metrics"
42 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/multikustomization"
43 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/spegelconfig"
44 "edge-infra.dev/pkg/edge/controllers/clusterctl/pkg/plugins/vpnconfig"
45 "edge-infra.dev/pkg/edge/controllers/dbmetrics"
46 "edge-infra.dev/pkg/edge/controllers/util/edgedb"
47 whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
48 kccAPI "edge-infra.dev/pkg/k8s/konfigkonnector/apis/configconnector/v1beta1"
49 "edge-infra.dev/pkg/k8s/runtime/conditions"
50 "edge-infra.dev/pkg/k8s/runtime/controller"
51 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
52 "edge-infra.dev/pkg/k8s/runtime/events"
53 secretMgrApi "edge-infra.dev/pkg/lib/gcp/secretmanager"
54 "edge-infra.dev/pkg/lib/logging"
55 v1vpnconfig "edge-infra.dev/pkg/sds/remoteaccess/k8s/apis/vpnconfigs/v1"
56 )
57
58 const (
59 Name = "cluster-controller"
60 reconcileTimeLimit = 10 * time.Minute
61 stateMonitoringInterval = 2 * time.Minute
62 )
63
64
65
66
67
68 var reconcileMetadata = struct {
69 sync.Mutex
70
71
72
73 clusterResources map[string]time.Time
74
75
76
77 gkeClusterResources map[string]time.Time
78
79
80 lastReconcileTime time.Time
81 }{
82 clusterResources: make(map[string]time.Time),
83 gkeClusterResources: make(map[string]time.Time),
84 lastReconcileTime: time.Now(),
85 }
86
87
88
89 func Run(config Config, o ...controller.Option) error {
90 registerPlugins(config)
91 mgr, log, err := Create(config, o...)
92 if err != nil {
93 return err
94 }
95
96 go func() {
97 log = log.WithName("clusterctl-state-monitor").WithValues("reconcileTimeLimit", reconcileTimeLimit.String(), "interval", stateMonitoringInterval.String())
98 log.Info("starting goroutine to monitor clusterctl's state")
99
100 for isClusterctlFunctioning(log) {
101 }
102
103
104
105 sb := &strings.Builder{}
106 err := pprof.Lookup("goroutine").WriteTo(sb, 1)
107 if err != nil {
108 log.Error(err, "failed to write pprof snapshot of the goroutine profile")
109 }
110 log.Info("logging stack traces of all current goroutines", "stacktrace", sb.String())
111
112 sb.Reset()
113 err = pprof.Lookup("block").WriteTo(sb, 1)
114 if err != nil {
115 log.Error(err, "failed to write pprof snapshot of the block profile")
116 }
117 log.Info("logging stack traces that led to blocking on synchronization primitives", "stacktrace", sb.String())
118
119 os.Exit(1)
120 }()
121
122 log.Info("starting manager")
123 if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
124 log.Error(err, "problem running manager")
125 return err
126 }
127
128 return nil
129 }
130
131
132
133 func Create(config Config, o ...controller.Option) (ctrl.Manager, logr.Logger, error) {
134 ctrl.SetLogger(logging.NewLogger().Logger)
135 log := ctrl.Log.WithName("setup")
136
137 cfg, opts := controller.ProcessOptions(o...)
138 opts.LeaderElectionID = "clusterctl.edge.ncr.com"
139 opts.Scheme = createScheme()
140
141 mgr, err := ctrl.NewManager(cfg, opts)
142 if err != nil {
143 log.Error(err, "failed to create manager")
144 return nil, logr.Logger{}, err
145 }
146
147 c, err := memory.New(config.HelmCacheLimit)
148 if err != nil {
149 log.Error(err, "failed to initialize helm cache")
150 return nil, logr.Logger{}, err
151 }
152
153 eventRecorder := events.NewRecorder(mgr, ctrl.Log, Name)
154 dbm := dbmetrics.New("clusterctl")
155 collectors := append(dbm.Collectors(),
156 pluginmetrics.PluginExecutionTimeMetric,
157 pluginmetrics.PluginFinalizerTimeMetric,
158 pluginmetrics.PluginExecutionErrorCountMetric,
159 pluginmetrics.PluginFinalizerErrorCountMetric,
160 pluginmetrics.RegisteredPluginsCountMetric)
161
162 var clusterReconciler = &ClusterReconciler{
163 manager: mgr,
164 Client: mgr.GetClient(),
165 EventRecorder: eventRecorder,
166 Log: ctrl.Log.WithName("cluster-reconciler"),
167 Metrics: metrics.New(mgr, "clusterctl", metrics.WithCollectors(collectors...)),
168 Config: &config,
169 DefaultRequeue: config.DefaultRequeue,
170 WaitForSetTimeout: config.WaitForSetTimeout,
171 Name: Name,
172 Conditions: clusterConditions,
173 EdgeDB: &edgedb.EdgeDB{DB: config.DB},
174 Recorder: dbm,
175 Concurrency: config.ClusterReconcilerConcurrency,
176 WaitForSetMap: &sync.Map{},
177 c: make(chan int, config.BackgroundWaitForSetConcurrency),
178 HelmCache: c,
179 }
180 if err := clusterReconciler.setResourceManager(); err != nil {
181 log.Error(err, "failed to set resource manager", "controller", "cluster-reconciler")
182 return nil, logr.Logger{}, err
183 }
184 if err := clusterReconciler.SetupWithManager(mgr); err != nil {
185 log.Error(err, "failed to create controller and set up with manager", "controller", "cluster-reconciler")
186 return nil, logr.Logger{}, err
187 }
188
189 var gkeclusterReconciler = &GKEClusterReconciler{
190 manager: mgr,
191 Client: mgr.GetClient(),
192 EventRecorder: eventRecorder,
193 Log: ctrl.Log.WithName("gkecluster-reconciler"),
194 Metrics: metrics.New(mgr, "gke_clusterctl"),
195 Scheme: mgr.GetScheme(),
196 WaitForSetTimeout: config.WaitForSetTimeout,
197 EdgeAPI: config.EdgeAPI,
198 CreateClient: config.CreateClient,
199 IPRangerClient: config.IPRangerClient,
200 DefaultRequeue: config.DefaultRequeue,
201 TopLevelProject: config.TopLevelProjectID,
202 TopLevelCNRMSA: config.TopLevelCNRMSA,
203 TotpSecret: config.TotpSecret,
204 Name: "gke-cluster-controller",
205 Conditions: gkeClusterConditions,
206 Concurrency: config.GKEClusterReconcilerConcurrency,
207 }
208 if err := gkeclusterReconciler.setResourceManager(); err != nil {
209 log.Error(err, "unable to set GKEClusterReconciler resource manager", "controller", "gkecluster-reconciler")
210 return nil, logr.Logger{}, err
211 }
212 if err := gkeclusterReconciler.SetupWithManager(mgr); err != nil {
213 log.Error(err, "unable to set up GKEClusterReconciler", "controller", "gkecluster-reconciler")
214 return nil, logr.Logger{}, err
215 }
216
217 return mgr, log, nil
218 }
219
220 func createScheme() *runtime.Scheme {
221 scheme := runtime.NewScheme()
222
223 utilruntime.Must(clientgoscheme.AddToScheme(scheme))
224 utilruntime.Must(clusterApi.AddToScheme(scheme))
225 utilruntime.Must(containerAPI.AddToScheme(scheme))
226 utilruntime.Must(syncedobjectApi.AddToScheme(scheme))
227 utilruntime.Must(gkeClusterApi.AddToScheme(scheme))
228 utilruntime.Must(iamv1beta1.AddToScheme(scheme))
229 utilruntime.Must(sourceApi.AddToScheme(scheme))
230 utilruntime.Must(kustomizeApi.AddToScheme(scheme))
231 utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
232 utilruntime.Must(kccAPI.AddToScheme(scheme))
233 utilruntime.Must(whv1.AddToScheme(scheme))
234 utilruntime.Must(v1vpnconfig.AddToScheme(scheme))
235 return scheme
236 }
237
238
239 func registerPlugins(config Config) {
240
241 artifactRegistrySvc := services.NewArtifactRegistryService(config.DB)
242 bannerSvc := services.NewBannerService(nil, nil, nil, config.TopLevelProjectID, config.DB, &types.Config{})
243 bootstrapSvc := services.NewBootstrapService("", nil, config.DB)
244 bslSiteSvc := services.NewBSLSiteService(config.BSLClient, config.DB)
245 clusterConfigSvc := services.NewClusterConfigService(config.DB)
246 helmSvc := services.NewHelmService(nil, nil, config.GCPService, config.DB, nil, nil)
247 labelSvc := services.NewLabelService(config.ArtifactsService, config.DB)
248 logReplaySvc := services.NewLogReplayService(config.DB)
249 storeClusterSvc := services.NewStoreClusterService(nil, nil, config.DB, nil, nil, nil)
250 channelSvc := channels.NewChannelService(config.DB, config.TopLevelProjectID, nil)
251 terminalSvc := services.NewTerminalService(config.DB, labelSvc)
252 activationCodeSvc := edgenode.NewActivationCodeService(config.DB, terminalSvc, storeClusterSvc, clusterConfigSvc, nil, nil, bannerSvc)
253 secretManagerProvider := func(ctx context.Context, projectID string) (types.SecretManagerService, error) {
254 return secretMgrApi.NewWithOptions(ctx, projectID)
255 }
256
257
258 plugins.Register(clustersecrets.Plugin{
259 SecretManagerProvider: secretManagerProvider,
260 TopLevelProjectID: config.TopLevelProjectID,
261 })
262
263 plugins.Register(multikustomization.NewPlugin(storeClusterSvc))
264
265 plugins.Register(plugins.OktaClientPlugin{
266 SecretManagerProvider: secretManagerProvider,
267 TopLevelProjectID: config.TopLevelProjectID,
268 DB: config.DB,
269 })
270
271 plugins.Register(emissarycert.EmissaryCertsPlugin{
272 SecretManagerProvider: secretManagerProvider,
273 })
274
275 plugins.Register(loglevels.LogLevelsPlugin{
276 DB: config.DB,
277 })
278
279 plugins.Register(logreplay.Plugin{
280 LogReplayService: logReplaySvc,
281 })
282
283 plugins.Register(plugins.RemoteAccessIPPlugin{
284 DB: config.DB,
285 })
286
287 plugins.Register(plugins.BootstrapTokensPlugin{
288 BootstrapService: bootstrapSvc,
289 })
290
291 plugins.Register(helmreleases.NewPlugin(config.DB, helmSvc, labelSvc, config.GCPService, storeClusterSvc, channelSvc))
292
293 plugins.Register(spegelconfig.Plugin{
294 ArtifactRegistryService: artifactRegistrySvc,
295 })
296
297 plugins.Register(infoconfigmaps.Plugin{
298 BannerService: bannerSvc,
299 BSLSiteService: bslSiteSvc,
300 StoreClusterService: storeClusterSvc,
301 BSLEndpoint: config.BSLConfig.Endpoint,
302 EdgeAPI: config.EdgeAPI,
303 TopLevelProjectID: config.TopLevelProjectID,
304 })
305
306 plugins.Register(vpnconfig.NewPlugin(clusterConfigSvc))
307
308 plugins.Register(clusternetworkservice.NewPlugin(storeClusterSvc))
309
310 plugins.Register(dnsconfig.NewPlugin(storeClusterSvc))
311
312 plugins.Register(activationcode.NewPlugin(activationCodeSvc, clusterConfigSvc, terminalSvc))
313 }
314
315
316 func isClusterctlFunctioning(log logr.Logger) bool {
317
318 time.Sleep(stateMonitoringInterval)
319 now := time.Now()
320
321 reconcileMetadata.Lock()
322 defer reconcileMetadata.Unlock()
323
324
325
326 var diff time.Duration
327 for name, start := range reconcileMetadata.clusterResources {
328 diff = now.Sub(start)
329 if diff > reconcileTimeLimit {
330 log.Info("stalled resource found", "cluster_name", name, "start_time", start, "duration", diff.String())
331 }
332 }
333 diff = now.Sub(reconcileMetadata.lastReconcileTime)
334
335
336
337 if diff > reconcileTimeLimit {
338 err := errors.New("clusterctl has stopped reconciling resources")
339 log.Error(err, "exiting to force a restart",
340 "lastReconcileTime", reconcileMetadata.lastReconcileTime,
341 "clusters", reconcileMetadata.clusterResources,
342 "gkeClusters", reconcileMetadata.gkeClusterResources,
343 )
344 return false
345 }
346
347 return true
348 }
349
350
351
352 func updateReconcileMetadata(ctx context.Context, obj conditions.Getter, reconcileStart time.Time) {
353 log := ctrl.LoggerFrom(ctx).WithName("update-reconcile-metadata").WithValues("lastReconcileTime", reconcileStart)
354 kind := obj.GetObjectKind().GroupVersionKind().Kind
355 reconcileMetadata.Lock()
356 defer reconcileMetadata.Unlock()
357
358 switch kind {
359 case clusterApi.Kind:
360 reconcileMetadata.clusterResources[obj.GetName()] = reconcileStart
361 case gkeClusterApi.Kind:
362 reconcileMetadata.gkeClusterResources[obj.GetName()] = reconcileStart
363 }
364 reconcileMetadata.lastReconcileTime = reconcileStart
365
366 log.Info("reconcile metadata updated", "clusters", reconcileMetadata.clusterResources, "gkeClusters", reconcileMetadata.gkeClusterResources)
367 }
368
369
370
371 func deleteResourceEntry(obj conditions.Getter) {
372 kind := obj.GetObjectKind().GroupVersionKind().Kind
373 reconcileMetadata.Lock()
374 defer reconcileMetadata.Unlock()
375
376 switch kind {
377 case clusterApi.Kind:
378 delete(reconcileMetadata.clusterResources, obj.GetName())
379 case gkeClusterApi.Kind:
380 delete(reconcileMetadata.gkeClusterResources, obj.GetName())
381 }
382 }
383
View as plain text