...

Source file src/edge-infra.dev/pkg/sds/interlock/topic/instances/instances.go

Documentation: edge-infra.dev/pkg/sds/interlock/topic/instances

     1  package instances
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"net/http"
     8  	"regexp"
     9  
    10  	"github.com/gin-gonic/gin"
    11  	corev1 "k8s.io/api/core/v1"
    12  	toolscache "k8s.io/client-go/tools/cache"
    13  	"sigs.k8s.io/controller-runtime/pkg/client"
    14  
    15  	"edge-infra.dev/pkg/lib/fog"
    16  	"edge-infra.dev/pkg/lib/logging"
    17  	"edge-infra.dev/pkg/sds/interlock/internal/config"
    18  	errs "edge-infra.dev/pkg/sds/interlock/internal/errors"
    19  	"edge-infra.dev/pkg/sds/interlock/topic"
    20  	"edge-infra.dev/pkg/sds/interlock/websocket"
    21  )
    22  
    23  var (
    24  	// Instances exposes information about the interlock services running on the
    25  	// cluster
    26  	TopicName = "instances"
    27  
    28  	Path = "/v1/instances"
    29  )
    30  
    31  // Instances exposes information about the Interlock services running on the
    32  // cluster
    33  type Instances struct {
    34  	topic topic.Topic
    35  }
    36  
    37  // New returns a new Instances topic and configures the topic with the name,
    38  // initializes state and websocket manager. It will also setup informers to keep
    39  // Interlock-managed read-only fields up to date
    40  func New(ctx context.Context, cfg *config.Config, wm *websocket.Manager) (*Instances, error) {
    41  	state, err := newState(ctx, cfg)
    42  	if err != nil {
    43  		return nil, err
    44  	}
    45  
    46  	topic := topic.NewTopic(
    47  		TopicName,
    48  		state,
    49  		cfg,
    50  		wm,
    51  	)
    52  	instances := &Instances{
    53  		topic: topic,
    54  	}
    55  
    56  	err = instances.SetupAPIInformers(ctx, cfg)
    57  	return instances, err
    58  }
    59  
    60  // SetupAPIInformers adds the PodEventHandler to the Pod informer present in the
    61  // config that is used in the Interlock API server. This allows the Interlock's
    62  // instances topic to respond to Pod events appropriately
    63  func (i *Instances) SetupAPIInformers(ctx context.Context, cfg *config.Config) error {
    64  	// Get ConfigMap informer
    65  	informer, err := cfg.Cache.GetInformer(ctx, &corev1.Pod{})
    66  	if err != nil {
    67  		return fmt.Errorf("failed to get pod resource informer: %w", err)
    68  	}
    69  	// Create event handler for the cluster name
    70  	podFilteredEventHandler := toolscache.FilteringResourceEventHandler{
    71  		FilterFunc: IsInterlockPod,
    72  		Handler:    &PodEventHandler{i.topic.UpdateState},
    73  	}
    74  
    75  	if _, err := informer.AddEventHandler(podFilteredEventHandler); err != nil {
    76  		return fmt.Errorf("failed to add Interlock pod event handler: %w", err)
    77  	}
    78  
    79  	return nil
    80  }
    81  
    82  // IsInterlockPod returns true if the provided object is an Interlock pod,
    83  // otherwise returns false
    84  func IsInterlockPod(obj interface{}) bool {
    85  	pod, ok := obj.(*corev1.Pod)
    86  	if !ok {
    87  		return false
    88  	}
    89  
    90  	if pod.Namespace != "interlock" {
    91  		return false
    92  	}
    93  
    94  	match, err := regexp.MatchString("^interlock-\\w{5}$", pod.Name)
    95  	if err != nil {
    96  		return false
    97  	}
    98  
    99  	return match
   100  }
   101  
   102  // PodEventHandler handles keeping the Interlock pod info up to date by watching
   103  // Kubernetes pod events
   104  type PodEventHandler struct {
   105  	UpdateFunc func(topic.UpdateFunc) error
   106  }
   107  
   108  // OnAdd updates the instance info on Pod create events
   109  func (p *PodEventHandler) OnAdd(obj interface{}, _ bool) {
   110  	pod := obj.(*corev1.Pod)
   111  	updateInstance(p.UpdateFunc, pod, false)
   112  }
   113  
   114  // OnUpdate updates the instance info on Pod update events
   115  func (p *PodEventHandler) OnUpdate(_, newObj interface{}) {
   116  	pod := newObj.(*corev1.Pod)
   117  	updateInstance(p.UpdateFunc, pod, false)
   118  }
   119  
   120  // OnDelete deletes the instance info on Pod delete events
   121  func (p *PodEventHandler) OnDelete(obj interface{}) {
   122  	pod := obj.(*corev1.Pod)
   123  	updateInstance(p.UpdateFunc, pod, true)
   124  }
   125  
   126  // updateInstance updates the state information for the provided Interlock pod
   127  func updateInstance(updateFunc func(topic.UpdateFunc) error, pod *corev1.Pod, deleted bool) {
   128  	// if the pod is not yet scheduled to a node then ignore the event
   129  	if pod.Spec.NodeName == "" {
   130  		return
   131  	}
   132  	// if the pod has been deleted
   133  	if deleted {
   134  		err := updateFunc(func(obj interface{}) error {
   135  			state, ok := obj.(*State)
   136  			if !ok {
   137  				return errors.New("state within discover topic is not of type *State")
   138  			}
   139  			// delete the URL key for the node that the pod was scheduled to
   140  			delete(state.Instances, pod.Spec.NodeName)
   141  
   142  			return nil
   143  		})
   144  		if err != nil {
   145  			logging.NewLogger().Error(err, "failed to delete Interlock URL entry in informer hook")
   146  		}
   147  
   148  		return
   149  	}
   150  	// if the pod has not been assigned an IP address yet then ignore the event
   151  	if pod.Status.PodIP == "" {
   152  		return
   153  	}
   154  
   155  	err := updateFunc(func(obj interface{}) error {
   156  		state, ok := obj.(*State)
   157  		if !ok {
   158  			return errors.New("state within discover topic is not of type *State")
   159  		}
   160  		// update the URL key for the node that the pod is scheduled to
   161  		state.Instances[pod.Spec.NodeName] = Instance{URL: fmt.Sprintf("http://%s:%s", pod.Status.PodIP, "80")}
   162  
   163  		return nil
   164  	})
   165  	if err != nil {
   166  		logging.NewLogger().Error(err, "failed to update cluster name in informer hook")
   167  	}
   168  }
   169  
   170  // RegistersEndpoints registers the Instances topic endpoints on the provided
   171  // router
   172  func (i *Instances) RegisterEndpoints(r *gin.Engine) {
   173  	v1 := r.Group(Path)
   174  	v1.GET("", i.topic.DefaultGet)
   175  	v1.GET("/:hostname", i.infoFromHostname)
   176  }
   177  
   178  // infoFromHostname returns the info for the provided hosts Interlock service
   179  func (i *Instances) infoFromHostname(c *gin.Context) {
   180  	log := fog.FromContext(c.Request.Context())
   181  	target := c.Param("hostname")
   182  
   183  	state, ok := i.topic.State().(*State)
   184  	if !ok {
   185  		log.Error(fmt.Errorf("state within discover topic is not of type *State"), "failed to cast state to State")
   186  		c.JSON(http.StatusInternalServerError, errs.NewErrorResponse(errs.NewError(http.StatusText(http.StatusInternalServerError))))
   187  	}
   188  
   189  	// Loop through the list of interlock instances and return the URL for the
   190  	// hostname that matches the target
   191  	for hostname, info := range state.Instances {
   192  		if hostname == target {
   193  			c.JSON(http.StatusOK, info)
   194  			return
   195  		}
   196  	}
   197  
   198  	log.Info("hostname not found", "hostname", target)
   199  
   200  	c.IndentedJSON(http.StatusNotFound, errs.NewErrorResponse(errs.NewError(http.StatusText(http.StatusNotFound))))
   201  }
   202  
   203  // State contains information about the Interlock services running on the
   204  // cluster
   205  //
   206  // swagger:model InstancesState
   207  type State struct {
   208  	// Information about the different Interlock pods running on the cluster.
   209  	// This field is read only and will be kept up to date internally by
   210  	// Interlock
   211  	//
   212  	// readOnly: true
   213  	// example: {"node-name":{"url":"http://1.2.3.4:80"}}
   214  	Instances map[string]Instance `json:"instances"`
   215  }
   216  
   217  // Instance contains information for a specific Interlock pod
   218  //
   219  // swagger:model Instance
   220  type Instance struct {
   221  	// URL of the Interlock pod
   222  	//
   223  	// readOnly: true
   224  	// example: http://1.2.3.4:80
   225  	URL string `json:"url"`
   226  }
   227  
   228  // newState returns a new instances State with the URLs of the Interlock pods
   229  func newState(ctx context.Context, cfg *config.Config) (*State, error) {
   230  	pods := &corev1.PodList{}
   231  	if err := cfg.KubeRetryClient.SafeList(ctx, pods, client.InNamespace("interlock")); err != nil {
   232  		return nil, fmt.Errorf("failed to list pods: %w", err)
   233  	}
   234  
   235  	state := &State{
   236  		Instances: map[string]Instance{},
   237  	}
   238  
   239  	for i := range pods.Items {
   240  		if IsInterlockPod(&pods.Items[i]) {
   241  			state.Instances[pods.Items[i].Spec.NodeName] = Instance{URL: fmt.Sprintf("http://%s:%s", pods.Items[i].Status.PodIP, "80")}
   242  		}
   243  	}
   244  
   245  	return state, nil
   246  }
   247  
   248  // Everything below this point is used for documentation purposes and exists for
   249  // the purpose of generating the swagger spec only.
   250  
   251  // swagger:route GET /v1/instances instances GetInstancesState
   252  // Retrieve the current instances state
   253  // responses:
   254  //   200: InstancesStateResponse
   255  //   404: ErrorResponse
   256  //   405: ErrorResponse
   257  //   500: ErrorResponse
   258  
   259  // swagger:route GET /v1/instances/{hostname} instances GetInstanceState
   260  // Retrieve the URL for the Interlock pod on the target host node
   261  // responses:
   262  //   200: InstanceResponse
   263  //   404: ErrorResponse
   264  //   405: ErrorResponse
   265  //   500: ErrorResponse
   266  
   267  // HELLO MY NAME IS JEFF
   268  //
   269  // swagger:parameters GetInstanceState
   270  type InstanceParams struct {
   271  	// Hostname of the target interlock instance
   272  	//
   273  	// in: path
   274  	// required: true
   275  	Hostname string `json:"hostname"`
   276  }
   277  
   278  // The request was successful
   279  //
   280  // swagger:response InstancesStateResponse
   281  type StateResponseWrapper struct {
   282  	// The current interlock state
   283  	//
   284  	// in:body
   285  	InstancesState State `json:"state"`
   286  }
   287  
   288  // The request was successful
   289  //
   290  // swagger:response InstanceResponse
   291  type InstanceWrapper struct {
   292  	// Information about a specific Interlock pod
   293  	//
   294  	// in:body
   295  	Instance Instance `json:"url-response"`
   296  }
   297  

View as plain text