package instances import ( "context" "errors" "fmt" "net/http" "regexp" "github.com/gin-gonic/gin" corev1 "k8s.io/api/core/v1" toolscache "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/lib/logging" "edge-infra.dev/pkg/sds/interlock/internal/config" errs "edge-infra.dev/pkg/sds/interlock/internal/errors" "edge-infra.dev/pkg/sds/interlock/topic" "edge-infra.dev/pkg/sds/interlock/websocket" ) var ( // Instances exposes information about the interlock services running on the // cluster TopicName = "instances" Path = "/v1/instances" ) // Instances exposes information about the Interlock services running on the // cluster type Instances struct { topic topic.Topic } // New returns a new Instances topic and configures the topic with the name, // initializes state and websocket manager. It will also setup informers to keep // Interlock-managed read-only fields up to date func New(ctx context.Context, cfg *config.Config, wm *websocket.Manager) (*Instances, error) { state, err := newState(ctx, cfg) if err != nil { return nil, err } topic := topic.NewTopic( TopicName, state, cfg, wm, ) instances := &Instances{ topic: topic, } err = instances.SetupAPIInformers(ctx, cfg) return instances, err } // SetupAPIInformers adds the PodEventHandler to the Pod informer present in the // config that is used in the Interlock API server. This allows the Interlock's // instances topic to respond to Pod events appropriately func (i *Instances) SetupAPIInformers(ctx context.Context, cfg *config.Config) error { // Get ConfigMap informer informer, err := cfg.Cache.GetInformer(ctx, &corev1.Pod{}) if err != nil { return fmt.Errorf("failed to get pod resource informer: %w", err) } // Create event handler for the cluster name podFilteredEventHandler := toolscache.FilteringResourceEventHandler{ FilterFunc: IsInterlockPod, Handler: &PodEventHandler{i.topic.UpdateState}, } if _, err := informer.AddEventHandler(podFilteredEventHandler); err != nil { return fmt.Errorf("failed to add Interlock pod event handler: %w", err) } return nil } // IsInterlockPod returns true if the provided object is an Interlock pod, // otherwise returns false func IsInterlockPod(obj interface{}) bool { pod, ok := obj.(*corev1.Pod) if !ok { return false } if pod.Namespace != "interlock" { return false } match, err := regexp.MatchString("^interlock-\\w{5}$", pod.Name) if err != nil { return false } return match } // PodEventHandler handles keeping the Interlock pod info up to date by watching // Kubernetes pod events type PodEventHandler struct { UpdateFunc func(topic.UpdateFunc) error } // OnAdd updates the instance info on Pod create events func (p *PodEventHandler) OnAdd(obj interface{}, _ bool) { pod := obj.(*corev1.Pod) updateInstance(p.UpdateFunc, pod, false) } // OnUpdate updates the instance info on Pod update events func (p *PodEventHandler) OnUpdate(_, newObj interface{}) { pod := newObj.(*corev1.Pod) updateInstance(p.UpdateFunc, pod, false) } // OnDelete deletes the instance info on Pod delete events func (p *PodEventHandler) OnDelete(obj interface{}) { pod := obj.(*corev1.Pod) updateInstance(p.UpdateFunc, pod, true) } // updateInstance updates the state information for the provided Interlock pod func updateInstance(updateFunc func(topic.UpdateFunc) error, pod *corev1.Pod, deleted bool) { // if the pod is not yet scheduled to a node then ignore the event if pod.Spec.NodeName == "" { return } // if the pod has been deleted if deleted { err := updateFunc(func(obj interface{}) error { state, ok := obj.(*State) if !ok { return errors.New("state within discover topic is not of type *State") } // delete the URL key for the node that the pod was scheduled to delete(state.Instances, pod.Spec.NodeName) return nil }) if err != nil { logging.NewLogger().Error(err, "failed to delete Interlock URL entry in informer hook") } return } // if the pod has not been assigned an IP address yet then ignore the event if pod.Status.PodIP == "" { return } err := updateFunc(func(obj interface{}) error { state, ok := obj.(*State) if !ok { return errors.New("state within discover topic is not of type *State") } // update the URL key for the node that the pod is scheduled to state.Instances[pod.Spec.NodeName] = Instance{URL: fmt.Sprintf("http://%s:%s", pod.Status.PodIP, "80")} return nil }) if err != nil { logging.NewLogger().Error(err, "failed to update cluster name in informer hook") } } // RegistersEndpoints registers the Instances topic endpoints on the provided // router func (i *Instances) RegisterEndpoints(r *gin.Engine) { v1 := r.Group(Path) v1.GET("", i.topic.DefaultGet) v1.GET("/:hostname", i.infoFromHostname) } // infoFromHostname returns the info for the provided hosts Interlock service func (i *Instances) infoFromHostname(c *gin.Context) { log := fog.FromContext(c.Request.Context()) target := c.Param("hostname") state, ok := i.topic.State().(*State) if !ok { log.Error(fmt.Errorf("state within discover topic is not of type *State"), "failed to cast state to State") c.JSON(http.StatusInternalServerError, errs.NewErrorResponse(errs.NewError(http.StatusText(http.StatusInternalServerError)))) } // Loop through the list of interlock instances and return the URL for the // hostname that matches the target for hostname, info := range state.Instances { if hostname == target { c.JSON(http.StatusOK, info) return } } log.Info("hostname not found", "hostname", target) c.IndentedJSON(http.StatusNotFound, errs.NewErrorResponse(errs.NewError(http.StatusText(http.StatusNotFound)))) } // State contains information about the Interlock services running on the // cluster // // swagger:model InstancesState type State struct { // Information about the different Interlock pods running on the cluster. // This field is read only and will be kept up to date internally by // Interlock // // readOnly: true // example: {"node-name":{"url":"http://1.2.3.4:80"}} Instances map[string]Instance `json:"instances"` } // Instance contains information for a specific Interlock pod // // swagger:model Instance type Instance struct { // URL of the Interlock pod // // readOnly: true // example: http://1.2.3.4:80 URL string `json:"url"` } // newState returns a new instances State with the URLs of the Interlock pods func newState(ctx context.Context, cfg *config.Config) (*State, error) { pods := &corev1.PodList{} if err := cfg.KubeRetryClient.SafeList(ctx, pods, client.InNamespace("interlock")); err != nil { return nil, fmt.Errorf("failed to list pods: %w", err) } state := &State{ Instances: map[string]Instance{}, } for i := range pods.Items { if IsInterlockPod(&pods.Items[i]) { state.Instances[pods.Items[i].Spec.NodeName] = Instance{URL: fmt.Sprintf("http://%s:%s", pods.Items[i].Status.PodIP, "80")} } } return state, nil } // Everything below this point is used for documentation purposes and exists for // the purpose of generating the swagger spec only. // swagger:route GET /v1/instances instances GetInstancesState // Retrieve the current instances state // responses: // 200: InstancesStateResponse // 404: ErrorResponse // 405: ErrorResponse // 500: ErrorResponse // swagger:route GET /v1/instances/{hostname} instances GetInstanceState // Retrieve the URL for the Interlock pod on the target host node // responses: // 200: InstanceResponse // 404: ErrorResponse // 405: ErrorResponse // 500: ErrorResponse // HELLO MY NAME IS JEFF // // swagger:parameters GetInstanceState type InstanceParams struct { // Hostname of the target interlock instance // // in: path // required: true Hostname string `json:"hostname"` } // The request was successful // // swagger:response InstancesStateResponse type StateResponseWrapper struct { // The current interlock state // // in:body InstancesState State `json:"state"` } // The request was successful // // swagger:response InstanceResponse type InstanceWrapper struct { // Information about a specific Interlock pod // // in:body Instance Instance `json:"url-response"` }