...

Source file src/edge-infra.dev/pkg/sds/k8s/iplookup/iplookup.go

Documentation: edge-infra.dev/pkg/sds/k8s/iplookup

     1  package iplookup
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"net"
     7  
     8  	"edge-infra.dev/pkg/lib/fog"
     9  
    10  	v1 "k8s.io/api/core/v1"
    11  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    12  	"k8s.io/client-go/kubernetes"
    13  	"k8s.io/client-go/rest"
    14  )
    15  
    16  const (
    17  	KIND string = "DaemonSet"
    18  )
    19  
    20  var (
    21  	ErrInvalidQuery = fmt.Errorf("query provided is invalid")
    22  	ErrNoReturn     = fmt.Errorf("nothing to return")
    23  	ErrInvalidIP    = fmt.Errorf("returned pod has invalid ip")
    24  	ErrNotInCluster = rest.ErrNotInCluster
    25  )
    26  
    27  // Struct containing details of the daemonset to be found
    28  type Query struct {
    29  	Namespace string
    30  	Daemonset string
    31  }
    32  
    33  type IPLookup struct {
    34  	clientset kubernetes.Interface
    35  }
    36  
    37  // Returns a new IPLookup struct which can be used to find the IP of kubernetes
    38  // resources
    39  // Must be run witin a kubernetes cluster
    40  func New() (*IPLookup, error) {
    41  	cfg, err := rest.InClusterConfig()
    42  	if err != nil {
    43  		return nil, err
    44  	}
    45  
    46  	cl, err := kubernetes.NewForConfig(cfg)
    47  	if err != nil {
    48  		return nil, err
    49  	}
    50  
    51  	return &IPLookup{
    52  		clientset: cl,
    53  	}, nil
    54  }
    55  
    56  // Returns the IP address of the pod on the given nodeID belonging to the
    57  // daemonset in query
    58  // Returns [ErrNoReturn] when no pods were found that matched the given node and
    59  // query
    60  func (ip *IPLookup) GetDaemonsetPodIP(ctx context.Context, nodeID string, query Query) (net.IP, error) {
    61  	if query.Daemonset == "" || query.Namespace == "" {
    62  		return nil, ErrInvalidQuery
    63  	}
    64  
    65  	return ip.daemonsetPodIPLookup(ctx, nodeID, query)
    66  }
    67  
    68  func (ip *IPLookup) daemonsetPodIPLookup(ctx context.Context, nodeID string, query Query) (net.IP, error) {
    69  	log := fog.FromContext(ctx)
    70  
    71  	log.Info("Querying kubernetes client",
    72  		"Node Name", nodeID,
    73  		"Namespace", query.Namespace,
    74  		"Daemonset", query.Daemonset,
    75  	)
    76  
    77  	// The clientset appears to do rate limiting before checking the context is done
    78  	// Manually check context done to exit sooner in the case of an expired context
    79  	select {
    80  	case <-ctx.Done():
    81  		return nil, fmt.Errorf("context done: %w", ctx.Err())
    82  	default:
    83  	}
    84  
    85  	pods, err := ip.clientset.CoreV1().Pods(query.Namespace).List(ctx, metav1.ListOptions{})
    86  	if err != nil {
    87  		return nil, fmt.Errorf("failed to query kubernetes client: %w", err)
    88  	}
    89  	log.Info("Query discovered pods.", "noPods", len(pods.Items))
    90  
    91  	for _, pod := range pods.Items {
    92  		podip, err := checkPod(pod, nodeID, query)
    93  		if err == nil {
    94  			log.Info("pod found satisfying query", "podName", pod.Name)
    95  			return podip, nil
    96  		} else if err == ErrInvalidIP {
    97  			return nil, err
    98  		}
    99  	}
   100  	return nil, ErrNoReturn
   101  }
   102  
   103  func checkPod(pod v1.Pod, nodeID string, query Query) (ip net.IP, err error) {
   104  	if pod.Spec.NodeName == nodeID && pod.ObjectMeta.Namespace == query.Namespace {
   105  		for _, own := range pod.OwnerReferences {
   106  			if own.Kind == KIND && own.Name == query.Daemonset {
   107  				ip = net.ParseIP(pod.Status.PodIP)
   108  				if ip != nil {
   109  					return ip, nil
   110  				}
   111  				return nil, ErrInvalidIP
   112  			}
   113  		}
   114  	}
   115  	return nil, ErrNoReturn
   116  }
   117  

View as plain text