...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/couch_helper.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"encoding/json"
     7  	"errors"
     8  	"fmt"
     9  	"net/http"
    10  	"strings"
    11  
    12  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    13  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    14  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    15  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    16  
    17  	corev1 "k8s.io/api/core/v1"
    18  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    19  	"k8s.io/apimachinery/pkg/types"
    20  	"sigs.k8s.io/controller-runtime/pkg/client"
    21  
    22  	"github.com/go-logr/logr"
    23  	"github.com/google/uuid"
    24  )
    25  
    26  type NodeResourcePredicate interface {
    27  	ShouldReconcile(*Config, client.Object) bool
    28  }
    29  
    30  type NodeResourcePredicateFunc func(*Config, client.Object) bool
    31  
    32  func (f NodeResourcePredicateFunc) ShouldReconcile(cfg *Config, obj client.Object) bool {
    33  	return f(cfg, obj)
    34  }
    35  
    36  // couchServer for a given couchdb server return a couchdb client
    37  func couchServer(ctx context.Context, cl client.Client, serverRef dsapi.ServerRef) (*dsapi.CouchDBServer, error) {
    38  	server := &dsapi.CouchDBServer{}
    39  	nn := serverRef.ServerRef()
    40  	err := cl.Get(ctx, types.NamespacedName{Namespace: nn.Namespace, Name: nn.Name}, server)
    41  	if err != nil {
    42  		return nil, err
    43  	}
    44  	return server, nil
    45  }
    46  
    47  // couchDBClient for a given couchdb server return a couchdb client
    48  func couchDBClient(ctx context.Context, cl client.Client, config *Config, serverRef dsapi.ServerRef) (*couchdb.CouchDB, error) {
    49  	server, err := couchServer(ctx, cl, serverRef)
    50  	if err != nil {
    51  		return nil, err
    52  	}
    53  	return couchDBServerClient(ctx, cl, config, server)
    54  }
    55  
    56  // couchDBServerClient get a couchdb client from the server resource
    57  func couchDBServerClient(ctx context.Context, cl client.Client, config *Config, server *dsapi.CouchDBServer) (*couchdb.CouchDB, error) {
    58  	// get / check if the admin creds exist
    59  	adminCreds := &couchdb.AdminCredentials{}
    60  	creds := server.AdminCredentials()
    61  	adminNN := types.NamespacedName{Name: creds.Name, Namespace: creds.Namespace}
    62  	_, err := adminCreds.FromSecret(ctx, cl, adminNN)
    63  	if err != nil {
    64  		return nil, err
    65  	}
    66  
    67  	// create a couchdb client
    68  	cc := &couchdb.CouchDB{}
    69  	err = cc.New(couchdb.Driver, string(adminCreds.Username), string(adminCreds.Password), server.Spec.URI, config.CouchDBPort)
    70  	if err != nil {
    71  		return nil, err
    72  	}
    73  	return cc, err
    74  }
    75  
    76  func couchDBLeader(ctx context.Context, cl client.Client, config *Config) (*dsapi.CouchDBServer, error) {
    77  	log := logr.FromContextOrDiscard(ctx)
    78  	servers := &dsapi.CouchDBServerList{}
    79  	listOption := client.MatchingLabels{couchdb.NodeLeaderLabel: couchdb.LabelValueTrue}
    80  	if err := cl.List(ctx, servers, client.InNamespace(config.CouchNamespace), listOption); err != nil {
    81  		log.Error(err, "failed to list couchdb servers")
    82  		return nil, err
    83  	}
    84  	if len(servers.Items) != 1 {
    85  		err := fmt.Errorf("invalid couchdb server")
    86  		log.Error(err, "failed to list couchdb servers", "count", len(servers.Items))
    87  		return nil, err
    88  	}
    89  	server := servers.Items[0]
    90  	return &server, nil
    91  }
    92  
    93  func couchDBLeaderClient(ctx context.Context, cl client.Client, config *Config) (*couchdb.CouchDB, error) {
    94  	server, err := couchDBLeader(ctx, cl, config)
    95  	if err != nil {
    96  		return nil, err
    97  	}
    98  	return couchDBServerClient(ctx, cl, config, server)
    99  }
   100  
   101  func serverNotFound(err error) bool {
   102  	return kerrors.IsNotFound(err) || couchdb.IsNotFound(err)
   103  }
   104  
   105  // couchDBNotReadyOrNotFound avoid unnecessary logging for couchdb when a node is down
   106  func couchDBNotReadyOrNotFound(err error) bool {
   107  	e := err
   108  	var re recerr.Error
   109  	if errors.As(err, &re) {
   110  		e = re.Unwrap()
   111  	}
   112  	return errors.Is(e, ErrPodsNotReady) || errors.Is(e, ErrServerNotReady) || couchdb.IsBadGateway(e) || serverNotFound(e)
   113  }
   114  
   115  func logError(ctx context.Context, err error, msg string, keysAndValues ...interface{}) {
   116  	if couchDBNotReadyOrNotFound(err) {
   117  		return
   118  	}
   119  	logr.FromContextOrDiscard(ctx).Error(err, msg, keysAndValues...)
   120  }
   121  
   122  // checkIfServerIsReady get the associated couchdb server for a couchdb resource
   123  func checkIfServerIsReady(ctx context.Context, client client.Client, serverRef dsapi.ServerRef) (bool, *dsapi.CouchDBServer, error) {
   124  	server := &dsapi.CouchDBServer{}
   125  	nn := serverRef.ServerRef()
   126  	err := client.Get(ctx, types.NamespacedName{Namespace: nn.Namespace, Name: nn.Name}, server)
   127  	if err != nil {
   128  		return false, nil, err
   129  	}
   130  	return conditions.IsReady(server), server, nil
   131  }
   132  
   133  func labelOrEmpty(obj client.Object, key string) string {
   134  	labels := obj.GetLabels()
   135  	if len(labels) == 0 {
   136  		return ""
   137  	}
   138  	return labels[key]
   139  }
   140  
   141  func CheckPod(ctx context.Context, cl client.Client, ns string, name string) (bool, error) {
   142  	pod := &corev1.Pod{}
   143  	err := cl.Get(ctx, types.NamespacedName{Name: name, Namespace: ns}, pod)
   144  	if err != nil {
   145  		return false, err
   146  	}
   147  	return IsCondition(pod.Status.Conditions, "Ready", "True"), nil
   148  }
   149  
   150  // IsCondition determines if a K8s resource has a status matching the input type and status
   151  func IsCondition(conditions []corev1.PodCondition, cType string, cStatus corev1.ConditionStatus) bool {
   152  	for _, condition := range conditions {
   153  		if string(condition.Type) == cType {
   154  			return condition.Status == cStatus
   155  		}
   156  	}
   157  	return false
   158  }
   159  
   160  func checkFinishStatus(url string) (bool, error) {
   161  	// get the status
   162  	resp := &ServerSetupResponse{}
   163  	err := httpRequest("GET", url, Payload{}, &resp)
   164  	if err != nil {
   165  		return false, err
   166  	}
   167  
   168  	// if the response has an error return it
   169  	if resp.Error != "" {
   170  		return false, fmt.Errorf("error getting finished status: %s - %s", resp.Error, resp.Reason)
   171  	}
   172  
   173  	// if the response state isnt cluster_finished then return false
   174  	if resp.State != "cluster_finished" {
   175  		return false, nil
   176  	}
   177  
   178  	// otherwise theres no error and the server was finished
   179  	return true, nil
   180  }
   181  
   182  func httpRequest(verb, url string, data Payload, t interface{}) error {
   183  	payloadBytes, err := json.Marshal(data)
   184  	if err != nil {
   185  		return err
   186  	}
   187  	body := bytes.NewReader(payloadBytes)
   188  
   189  	req, err := http.NewRequest(verb, url, body)
   190  	if err != nil {
   191  		return err
   192  	}
   193  	req.Header.Set("Content-Type", "application/json")
   194  
   195  	resp, err := http.DefaultClient.Do(req)
   196  	if err != nil {
   197  		return err
   198  	}
   199  	defer resp.Body.Close()
   200  
   201  	return json.NewDecoder(resp.Body).Decode(t)
   202  }
   203  
   204  func getServerURL(config *Config, server *dsapi.CouchDBServer) string {
   205  	if server.IsCloud() {
   206  		return config.CloudURL()
   207  	}
   208  	return fmt.Sprintf("http://%s:%s", server.Spec.URI, config.CouchDBPort)
   209  }
   210  
   211  // getCouchDBUserURL returns the server URL to be consumed by workload team
   212  // TODO update BSL SHIMS to take the full URL, so we can use getServerURL instead
   213  func getCouchDBUserURL(config *Config, server *dsapi.CouchDBServer) string {
   214  	if server.IsCloud() {
   215  		return config.CloudURL()
   216  	}
   217  	serviceName := couchdb.Name // add support for generic cluster!
   218  	if config.IsDSDS() {
   219  		serviceName = couchdb.ServiceName
   220  	}
   221  	return fmt.Sprintf("%s.%s.svc.cluster.local", serviceName, server.Namespace)
   222  }
   223  
   224  // oldPVCsSuffixes handles backward compatibility support for PVCs, return mapping of node to pvc
   225  func oldPVCsSuffixes(ctx context.Context, cl client.Client) (map[string]map[string]string, error) {
   226  	nsSuffixes := map[string]map[string]string{}
   227  	for ns, name := range oldPVCs {
   228  		pvcs := &corev1.PersistentVolumeClaimList{}
   229  		if err := cl.List(ctx, pvcs, client.InNamespace(ns)); client.IgnoreNotFound(err) != nil {
   230  			return nil, err
   231  		}
   232  
   233  		if len(pvcs.Items) == 0 {
   234  			continue
   235  		}
   236  
   237  		suffixes := map[string]string{}
   238  		nsSuffixes[ns] = suffixes
   239  
   240  		pvcPrefix := fmt.Sprintf("%s-%s-", name, ns)
   241  		pvcPrefixLen := len(pvcPrefix)
   242  
   243  		for _, pvc := range pvcs.Items {
   244  			pvcName := pvc.Name
   245  			// only old pvcs are selected
   246  			if !strings.HasPrefix(pvcName, pvcPrefix) || len(pvcName) <= pvcPrefixLen {
   247  				continue
   248  			}
   249  
   250  			pvName := pvc.Spec.VolumeName
   251  			pv := &corev1.PersistentVolume{}
   252  			err := cl.Get(ctx, types.NamespacedName{Name: pvName}, pv)
   253  			if err != nil && kerrors.IsNotFound(err) {
   254  				continue
   255  			} else if err != nil {
   256  				return nil, fmt.Errorf("fail to get pv for pvc %s: %w", pvName, err)
   257  			}
   258  
   259  			nodeName := pvNodeName(pv)
   260  			if nodeName == "" {
   261  				continue
   262  			}
   263  
   264  			suffix := pvcName[pvcPrefixLen:]
   265  			if strings.HasSuffix(suffix, "-0") && len(suffix) > 2 {
   266  				suffix = suffix[:len(suffix)-2]
   267  			}
   268  			suffixes[nodeName] = suffix
   269  		}
   270  	}
   271  	return nsSuffixes, nil
   272  }
   273  
   274  func pvNodeName(pv *corev1.PersistentVolume) string {
   275  	if pv.Spec.NodeAffinity == nil {
   276  		return ""
   277  	}
   278  	if pv.Spec.NodeAffinity.Required == nil {
   279  		return ""
   280  	}
   281  	var nodeName string
   282  outer:
   283  	for _, term := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms {
   284  		for _, ex := range term.MatchExpressions {
   285  			if ex.Key == "kubernetes.io/hostname" &&
   286  				ex.Operator == corev1.NodeSelectorOpIn &&
   287  				len(ex.Values) > 0 {
   288  				nodeName = ex.Values[0]
   289  				break outer
   290  			}
   291  		}
   292  	}
   293  	return nodeName
   294  }
   295  
   296  func sameNode(cfg *Config, obj client.Object) bool {
   297  	uid, err := nodeUID(obj)
   298  	if err != nil {
   299  		return false
   300  	}
   301  	return cfg.NodeUID == uid
   302  }
   303  
   304  func nodeUID(obj client.Object) (string, error) {
   305  	uid, err := nodeUIDFromLabel(obj)
   306  	if err == nil {
   307  		return uid, nil
   308  	}
   309  	return nodeUIDFromServerRef(obj.(dsapi.ServerRef))
   310  }
   311  
   312  func nodeUIDFromLabel(obj client.Object) (string, error) {
   313  	if obj == nil {
   314  		return "", fmt.Errorf("couch resource is invalid: nil")
   315  	}
   316  	labels := obj.GetLabels()
   317  	if labels == nil || labels[couchdb.NodeUIDLabel] == "" {
   318  		return "", fmt.Errorf("node uid label not found")
   319  	}
   320  	return labels[couchdb.NodeUIDLabel], nil
   321  }
   322  
   323  func nodeUIDFromServerRef(ref dsapi.ServerRef) (string, error) {
   324  	if ref == nil {
   325  		return "", fmt.Errorf("couch server ref is invalid: nil")
   326  	}
   327  	nn := ref.ServerRef()
   328  	uid := strings.TrimPrefix(nn.Name, "couchdb-")
   329  	if _, err := uuid.Parse(uid); err != nil {
   330  		return "", fmt.Errorf("invalid server name %s", nn.Name)
   331  	}
   332  	return uid, nil
   333  }
   334  

View as plain text