package couchctl import ( "bytes" "context" "encoding/json" "errors" "fmt" "net/http" "strings" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/go-logr/logr" "github.com/google/uuid" ) type NodeResourcePredicate interface { ShouldReconcile(*Config, client.Object) bool } type NodeResourcePredicateFunc func(*Config, client.Object) bool func (f NodeResourcePredicateFunc) ShouldReconcile(cfg *Config, obj client.Object) bool { return f(cfg, obj) } // couchServer for a given couchdb server return a couchdb client func couchServer(ctx context.Context, cl client.Client, serverRef dsapi.ServerRef) (*dsapi.CouchDBServer, error) { server := &dsapi.CouchDBServer{} nn := serverRef.ServerRef() err := cl.Get(ctx, types.NamespacedName{Namespace: nn.Namespace, Name: nn.Name}, server) if err != nil { return nil, err } return server, nil } // couchDBClient for a given couchdb server return a couchdb client func couchDBClient(ctx context.Context, cl client.Client, config *Config, serverRef dsapi.ServerRef) (*couchdb.CouchDB, error) { server, err := couchServer(ctx, cl, serverRef) if err != nil { return nil, err } return couchDBServerClient(ctx, cl, config, server) } // couchDBServerClient get a couchdb client from the server resource func couchDBServerClient(ctx context.Context, cl client.Client, config *Config, server *dsapi.CouchDBServer) (*couchdb.CouchDB, error) { // get / check if the admin creds exist adminCreds := &couchdb.AdminCredentials{} creds := server.AdminCredentials() adminNN := types.NamespacedName{Name: creds.Name, Namespace: creds.Namespace} _, err := adminCreds.FromSecret(ctx, cl, adminNN) if err != nil { return nil, err } // create a couchdb client cc := &couchdb.CouchDB{} err = cc.New(couchdb.Driver, string(adminCreds.Username), string(adminCreds.Password), server.Spec.URI, config.CouchDBPort) if err != nil { return nil, err } return cc, err } func couchDBLeader(ctx context.Context, cl client.Client, config *Config) (*dsapi.CouchDBServer, error) { log := logr.FromContextOrDiscard(ctx) servers := &dsapi.CouchDBServerList{} listOption := client.MatchingLabels{couchdb.NodeLeaderLabel: couchdb.LabelValueTrue} if err := cl.List(ctx, servers, client.InNamespace(config.CouchNamespace), listOption); err != nil { log.Error(err, "failed to list couchdb servers") return nil, err } if len(servers.Items) != 1 { err := fmt.Errorf("invalid couchdb server") log.Error(err, "failed to list couchdb servers", "count", len(servers.Items)) return nil, err } server := servers.Items[0] return &server, nil } func couchDBLeaderClient(ctx context.Context, cl client.Client, config *Config) (*couchdb.CouchDB, error) { server, err := couchDBLeader(ctx, cl, config) if err != nil { return nil, err } return couchDBServerClient(ctx, cl, config, server) } func serverNotFound(err error) bool { return kerrors.IsNotFound(err) || couchdb.IsNotFound(err) } // couchDBNotReadyOrNotFound avoid unnecessary logging for couchdb when a node is down func couchDBNotReadyOrNotFound(err error) bool { e := err var re recerr.Error if errors.As(err, &re) { e = re.Unwrap() } return errors.Is(e, ErrPodsNotReady) || errors.Is(e, ErrServerNotReady) || couchdb.IsBadGateway(e) || serverNotFound(e) } func logError(ctx context.Context, err error, msg string, keysAndValues ...interface{}) { if couchDBNotReadyOrNotFound(err) { return } logr.FromContextOrDiscard(ctx).Error(err, msg, keysAndValues...) } // checkIfServerIsReady get the associated couchdb server for a couchdb resource func checkIfServerIsReady(ctx context.Context, client client.Client, serverRef dsapi.ServerRef) (bool, *dsapi.CouchDBServer, error) { server := &dsapi.CouchDBServer{} nn := serverRef.ServerRef() err := client.Get(ctx, types.NamespacedName{Namespace: nn.Namespace, Name: nn.Name}, server) if err != nil { return false, nil, err } return conditions.IsReady(server), server, nil } func labelOrEmpty(obj client.Object, key string) string { labels := obj.GetLabels() if len(labels) == 0 { return "" } return labels[key] } func CheckPod(ctx context.Context, cl client.Client, ns string, name string) (bool, error) { pod := &corev1.Pod{} err := cl.Get(ctx, types.NamespacedName{Name: name, Namespace: ns}, pod) if err != nil { return false, err } return IsCondition(pod.Status.Conditions, "Ready", "True"), nil } // IsCondition determines if a K8s resource has a status matching the input type and status func IsCondition(conditions []corev1.PodCondition, cType string, cStatus corev1.ConditionStatus) bool { for _, condition := range conditions { if string(condition.Type) == cType { return condition.Status == cStatus } } return false } func checkFinishStatus(url string) (bool, error) { // get the status resp := &ServerSetupResponse{} err := httpRequest("GET", url, Payload{}, &resp) if err != nil { return false, err } // if the response has an error return it if resp.Error != "" { return false, fmt.Errorf("error getting finished status: %s - %s", resp.Error, resp.Reason) } // if the response state isnt cluster_finished then return false if resp.State != "cluster_finished" { return false, nil } // otherwise theres no error and the server was finished return true, nil } func httpRequest(verb, url string, data Payload, t interface{}) error { payloadBytes, err := json.Marshal(data) if err != nil { return err } body := bytes.NewReader(payloadBytes) req, err := http.NewRequest(verb, url, body) if err != nil { return err } req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() return json.NewDecoder(resp.Body).Decode(t) } func getServerURL(config *Config, server *dsapi.CouchDBServer) string { if server.IsCloud() { return config.CloudURL() } return fmt.Sprintf("http://%s:%s", server.Spec.URI, config.CouchDBPort) } // getCouchDBUserURL returns the server URL to be consumed by workload team // TODO update BSL SHIMS to take the full URL, so we can use getServerURL instead func getCouchDBUserURL(config *Config, server *dsapi.CouchDBServer) string { if server.IsCloud() { return config.CloudURL() } serviceName := couchdb.Name // add support for generic cluster! if config.IsDSDS() { serviceName = couchdb.ServiceName } return fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, server.Namespace) } // oldPVCsSuffixes handles backward compatibility support for PVCs, return mapping of node to pvc func oldPVCsSuffixes(ctx context.Context, cl client.Client) (map[string]map[string]string, error) { nsSuffixes := map[string]map[string]string{} for ns, name := range oldPVCs { pvcs := &corev1.PersistentVolumeClaimList{} if err := cl.List(ctx, pvcs, client.InNamespace(ns)); client.IgnoreNotFound(err) != nil { return nil, err } if len(pvcs.Items) == 0 { continue } suffixes := map[string]string{} nsSuffixes[ns] = suffixes pvcPrefix := fmt.Sprintf("%s-%s-", name, ns) pvcPrefixLen := len(pvcPrefix) for _, pvc := range pvcs.Items { pvcName := pvc.Name // only old pvcs are selected if !strings.HasPrefix(pvcName, pvcPrefix) || len(pvcName) <= pvcPrefixLen { continue } pvName := pvc.Spec.VolumeName pv := &corev1.PersistentVolume{} err := cl.Get(ctx, types.NamespacedName{Name: pvName}, pv) if err != nil && kerrors.IsNotFound(err) { continue } else if err != nil { return nil, fmt.Errorf("fail to get pv for pvc %s: %w", pvName, err) } nodeName := pvNodeName(pv) if nodeName == "" { continue } suffix := pvcName[pvcPrefixLen:] if strings.HasSuffix(suffix, "-0") && len(suffix) > 2 { suffix = suffix[:len(suffix)-2] } suffixes[nodeName] = suffix } } return nsSuffixes, nil } func pvNodeName(pv *corev1.PersistentVolume) string { if pv.Spec.NodeAffinity == nil { return "" } if pv.Spec.NodeAffinity.Required == nil { return "" } var nodeName string outer: for _, term := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms { for _, ex := range term.MatchExpressions { if ex.Key == "kubernetes.io/hostname" && ex.Operator == corev1.NodeSelectorOpIn && len(ex.Values) > 0 { nodeName = ex.Values[0] break outer } } } return nodeName } func sameNode(cfg *Config, obj client.Object) bool { uid, err := nodeUID(obj) if err != nil { return false } return cfg.NodeUID == uid } func nodeUID(obj client.Object) (string, error) { uid, err := nodeUIDFromLabel(obj) if err == nil { return uid, nil } return nodeUIDFromServerRef(obj.(dsapi.ServerRef)) } func nodeUIDFromLabel(obj client.Object) (string, error) { if obj == nil { return "", fmt.Errorf("couch resource is invalid: nil") } labels := obj.GetLabels() if labels == nil || labels[couchdb.NodeUIDLabel] == "" { return "", fmt.Errorf("node uid label not found") } return labels[couchdb.NodeUIDLabel], nil } func nodeUIDFromServerRef(ref dsapi.ServerRef) (string, error) { if ref == nil { return "", fmt.Errorf("couch server ref is invalid: nil") } nn := ref.ServerRef() uid := strings.TrimPrefix(nn.Name, "couchdb-") if _, err := uuid.Parse(uid); err != nil { return "", fmt.Errorf("invalid server name %s", nn.Name) } return uid, nil }