package envctl import ( "context" "fmt" "strings" "time" unstructuredutil "edge-infra.dev/pkg/k8s/unstructured" "github.com/fluxcd/pkg/ssa" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" k8errors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/cli-utils/pkg/kstatus/polling" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) const ( CMNamespaceAnnotation = "injector.edge.ncr.com/configmap" CMReplicatedAnnotation = "injector.edge.ncr.com/configmap-replication" ) var ( configMapMapping = map[string]types.NamespacedName{ "bsl-info": {Namespace: "kube-public", Name: "bsl-info"}, "edge-info": {Namespace: "kube-public", Name: "edge-info"}, } ) type ConfigMapReplicationReconciler struct { client.Client Name string RequeueTime time.Duration ResourceManager *ssa.ResourceManager } func (r *ConfigMapReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { r.ResourceManager = ssa.NewResourceManager( r.Client, polling.NewStatusPoller(r.Client, r.Client.RESTMapper(), polling.Options{}), ssa.Owner{Field: r.Name}, ) return ctrl.NewControllerManagedBy(mgr). For(&corev1.Namespace{}, namespacePredicates()). Watches( &corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(r.namespacesToEnqueue), builder.WithPredicates(isConfigMapCopyable())). Complete(r) } func namespacePredicates() builder.Predicates { return builder.WithPredicates( predicate.AnnotationChangedPredicate{}, predicate.NewPredicateFuncs(func(obj client.Object) bool { return obj.GetAnnotations()[CMNamespaceAnnotation] != "" })) } func isConfigMapCopyable() predicate.Funcs { return predicate.NewPredicateFuncs(func(obj client.Object) bool { for _, item := range configMapMapping { if item.Namespace == obj.GetNamespace() && item.Name == obj.GetName() { return true } } return false }) } func (r *ConfigMapReplicationReconciler) namespacesToEnqueue(_ context.Context, _ client.Object) []reconcile.Request { nss := &corev1.NamespaceList{} if err := r.Client.List(context.Background(), nss, &client.ListOptions{}); err != nil { return nil } var validNS []corev1.Namespace for _, ns := range nss.Items { if ns.GetAnnotations()[CMNamespaceAnnotation] != "" { validNS = append(validNS, ns) } } var reqs []reconcile.Request for _, ns := range validNS { reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: ns.Name}}) } return reqs } func (r *ConfigMapReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx).WithName(r.Name).WithValues("req", req) ns := &corev1.Namespace{} if err := r.Client.Get(ctx, req.NamespacedName, ns); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } log.Info("configmap replication started") ctx = logr.NewContext(ctx, log) if res, err := r.reconcile(ctx, ns); err != nil { log.Error(err, "fail to replicate configmaps") return res, err } log.Info("successfully replicated configmaps") return ctrl.Result{}, nil } func (r *ConfigMapReplicationReconciler) reconcile(ctx context.Context, ns *corev1.Namespace) (ctrl.Result, error) { //nolint:unparam log := ctrl.LoggerFrom(ctx) nns, err := parseAnnotation(ns) if err != nil { log.Error(err, "invalid annotation, will not retry") return ctrl.Result{Requeue: false}, nil } var uns []*unstructured.Unstructured for _, nn := range nns { cm := &corev1.ConfigMap{} err = r.Client.Get(ctx, nn, cm) if err != nil { if k8errors.IsNotFound(err) { log.Info("configmap not found, it will be ignored", "cm", nn) continue } log.Error(err, "cannot get configmap to replicate", "cm", nn) return ctrl.Result{Requeue: true, RequeueAfter: r.RequeueTime}, nil } un, err := r.copyCM(cm, types.NamespacedName{Name: cm.Name, Namespace: ns.Name}, nn) if err != nil { log.Error(err, "interval error: cannot convert to unstructured", "key", nn) return ctrl.Result{Requeue: false}, nil } uns = append(uns, un) } _, err = r.ResourceManager.ApplyAll(ctx, uns, ssa.ApplyOptions{}) if err != nil { log.Error(err, "applying configmaps failed") return ctrl.Result{Requeue: true, RequeueAfter: r.RequeueTime}, nil } return ctrl.Result{}, nil } func (r *ConfigMapReplicationReconciler) copyCM(existingCM *corev1.ConfigMap, nn, from types.NamespacedName) (*unstructured.Unstructured, error) { cm := &corev1.ConfigMap{ TypeMeta: metav1.TypeMeta{ APIVersion: "v1", Kind: "ConfigMap", }, ObjectMeta: metav1.ObjectMeta{ Name: nn.Name, Namespace: nn.Namespace, Annotations: map[string]string{ CMReplicatedAnnotation: fmt.Sprintf("%s/%s", from.Namespace, from.Name), }, }, Data: existingCM.Data, } return unstructuredutil.ToUnstructured(cm) } func parseAnnotation(ns *corev1.Namespace) (map[string]types.NamespacedName, error) { anno := ns.Annotations[CMNamespaceAnnotation] if anno == "" { return nil, fmt.Errorf("no annotation found for configmap replication") } values := strings.Split(anno, ",") result := make(map[string]types.NamespacedName) for _, val := range values { nn, ok := configMapMapping[val] if !ok { return nil, fmt.Errorf("invalid annotation found for comfigmap replication") } result[val] = nn } return result, nil }