package cluster import ( "context" "fmt" "github.com/gin-gonic/gin" corev1 "k8s.io/api/core/v1" toolscache "k8s.io/client-go/tools/cache" "edge-infra.dev/pkg/edge/info" "edge-infra.dev/pkg/lib/logging" "edge-infra.dev/pkg/sds/interlock/internal/config" "edge-infra.dev/pkg/sds/interlock/topic" "edge-infra.dev/pkg/sds/interlock/topic/cluster/internal/middleware" "edge-infra.dev/pkg/sds/interlock/websocket" ) var ( // Cluster represents the D-SDS cluster. TopicName = "cluster" Path = "/v1/cluster" ) // Cluster represents the D-SDS cluster type Cluster struct { topic topic.Topic } // New returns a new Cluster and configures the topic with the name, initialized // state and websocket manager. It will also setup informers to keep // Interlock-managed read-only fields up to date func New(ctx context.Context, cfg *config.Config, wm *websocket.Manager) (*Cluster, error) { state, err := newState(ctx, cfg) if err != nil { return nil, err } topic := topic.NewTopic( TopicName, state, cfg, wm, ) cluster := &Cluster{ topic: topic, } err = cluster.SetupAPIInformers(ctx, cfg) return cluster, err } // SetupAPIInformers adds the NameEventHandler to the ConfigMap informer // present in the config that is used in the Interlock API server. // This allows the Interlock's cluster topic to respond to ConfigMap events appropriately func (c *Cluster) SetupAPIInformers(ctx context.Context, cfg *config.Config) error { // Get ConfigMap informer informer, err := cfg.Cache.GetInformer(ctx, &corev1.ConfigMap{}) if err != nil { return fmt.Errorf("failed to get configmap resource informer: %w", err) } // Create event handler for the cluster name nameFilteredEventHandler := toolscache.FilteringResourceEventHandler{ FilterFunc: IsEdgeInfoCM, Handler: &NameEventHandler{c.topic.UpdateState}, } if _, err := informer.AddEventHandler(nameFilteredEventHandler); err != nil { return fmt.Errorf("failed to add name event handler: %w", err) } return nil } // IsEdgeInfoCM returns true if the provided object is the edge-info ConfigMap, // otherwise false func IsEdgeInfoCM(obj interface{}) bool { cm, ok := obj.(*corev1.ConfigMap) if !ok { return false } if cm.Name != info.EdgeConfigMapName || cm.Namespace != info.EdgeConfigMapNS { return false } return true } // NameEventHandler handles keeping the cluster name up to date by watching // Kubernetes events type NameEventHandler struct { UpdateFunc func(topic.UpdateFunc) error } // OnAdd updates the cluster name on ConfigMap create events func (neh *NameEventHandler) OnAdd(obj interface{}, _ bool) { cm := obj.(*corev1.ConfigMap) updateName(neh.UpdateFunc, cm) } // OnUpdate updates the cluster name on ConfigMap update events func (neh *NameEventHandler) OnUpdate(_, newObj interface{}) { cm := newObj.(*corev1.ConfigMap) updateName(neh.UpdateFunc, cm) } // OnDelete ignores delete events func (neh *NameEventHandler) OnDelete(_ interface{}) {} // updateName retrieves the store name from the edge-info ConfigMap and assigns // it to the cluster name field func updateName(updateFunc func(topic.UpdateFunc) error, cm *corev1.ConfigMap) { edgeinfo := info.EdgeInfo{} clusterName := edgeinfo.FromConfigMap(cm).Store err := updateFunc(func(obj interface{}) error { state := obj.(*State) state.Name = clusterName return nil }) if err != nil { logging.NewLogger().Error(err, "failed to update cluster name in informer hook") } } // RegistersEndpoints registers the Cluster topic endpoints on the provided // router func (c *Cluster) RegisterEndpoints(r *gin.Engine) { v1 := r.Group(Path) v1.GET("", c.topic.DefaultGet) v1.PATCH("", middleware.ReadOnlyFieldValidation(), c.topic.DefaultPatch) } // State contains information about the D-SDS cluster // // swagger:model ClusterState type State struct { // Name of the cluster. This field is read only and will be kept up to date internally by Interlock // // readOnly: true // example: store-123 Name string `json:"name" binding:"required"` } // newState returns a new cluster State with initialized values func newState(ctx context.Context, cfg *config.Config) (*State, error) { edgeinfo, err := info.FromClient(ctx, cfg.KubeRetryClient.Client()) if err != nil { return nil, fmt.Errorf("failed to get edgeinfo configmap: %w", err) } state := &State{ Name: edgeinfo.Store, } return state, nil } // Everything below this point is used for documentation purposes and exists for // the purpose of generating the swagger spec only. // swagger:route GET /v1/cluster cluster GetClusterState // Retrieve the current cluster state // responses: // 200: ClusterStateResponse // 404: ErrorResponse // 405: ErrorResponse // 500: ErrorResponse // swagger:route PATCH /v1/cluster cluster PatchClusterState // Patch the cluster state // responses: // 202: ClusterStateResponse // 404: ErrorResponse // 405: ErrorResponse // 500: ErrorResponse // The PatchClusterState parameters // // swagger:parameters PatchClusterState type PatchParametersWrapper struct { // The cluster state that the client wants to patch // // in:body ClusterState State `json:"state"` } // The request was successful // // swagger:response ClusterStateResponse type StateResponseWrapper struct { // The current cluster state // // in:body ClusterState State `json:"state"` }