...

Source file src/edge-infra.dev/pkg/sds/interlock/topic/cluster/cluster.go

Documentation: edge-infra.dev/pkg/sds/interlock/topic/cluster

     1  package cluster
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	"github.com/gin-gonic/gin"
     8  	corev1 "k8s.io/api/core/v1"
     9  	toolscache "k8s.io/client-go/tools/cache"
    10  
    11  	"edge-infra.dev/pkg/edge/info"
    12  	"edge-infra.dev/pkg/lib/logging"
    13  	"edge-infra.dev/pkg/sds/interlock/internal/config"
    14  	"edge-infra.dev/pkg/sds/interlock/topic"
    15  	"edge-infra.dev/pkg/sds/interlock/topic/cluster/internal/middleware"
    16  	"edge-infra.dev/pkg/sds/interlock/websocket"
    17  )
    18  
    19  var (
    20  	// Cluster represents the D-SDS cluster.
    21  	TopicName = "cluster"
    22  
    23  	Path = "/v1/cluster"
    24  )
    25  
    26  // Cluster represents the D-SDS cluster
    27  type Cluster struct {
    28  	topic topic.Topic
    29  }
    30  
    31  // New returns a new Cluster and configures the topic with the name, initialized
    32  // state and websocket manager. It will also setup informers to keep
    33  // Interlock-managed read-only fields up to date
    34  func New(ctx context.Context, cfg *config.Config, wm *websocket.Manager) (*Cluster, error) {
    35  	state, err := newState(ctx, cfg)
    36  	if err != nil {
    37  		return nil, err
    38  	}
    39  	topic := topic.NewTopic(
    40  		TopicName,
    41  		state,
    42  		cfg,
    43  		wm,
    44  	)
    45  	cluster := &Cluster{
    46  		topic: topic,
    47  	}
    48  	err = cluster.SetupAPIInformers(ctx, cfg)
    49  	return cluster, err
    50  }
    51  
    52  // SetupAPIInformers adds the NameEventHandler to the ConfigMap informer
    53  // present in the config that is used in the Interlock API server.
    54  // This allows the Interlock's cluster topic to respond to ConfigMap events appropriately
    55  func (c *Cluster) SetupAPIInformers(ctx context.Context, cfg *config.Config) error {
    56  	// Get ConfigMap informer
    57  	informer, err := cfg.Cache.GetInformer(ctx, &corev1.ConfigMap{})
    58  	if err != nil {
    59  		return fmt.Errorf("failed to get configmap resource informer: %w", err)
    60  	}
    61  	// Create event handler for the cluster name
    62  	nameFilteredEventHandler := toolscache.FilteringResourceEventHandler{
    63  		FilterFunc: IsEdgeInfoCM,
    64  		Handler:    &NameEventHandler{c.topic.UpdateState},
    65  	}
    66  	if _, err := informer.AddEventHandler(nameFilteredEventHandler); err != nil {
    67  		return fmt.Errorf("failed to add name event handler: %w", err)
    68  	}
    69  	return nil
    70  }
    71  
    72  // IsEdgeInfoCM returns true if the provided object is the edge-info ConfigMap,
    73  // otherwise false
    74  func IsEdgeInfoCM(obj interface{}) bool {
    75  	cm, ok := obj.(*corev1.ConfigMap)
    76  	if !ok {
    77  		return false
    78  	}
    79  	if cm.Name != info.EdgeConfigMapName || cm.Namespace != info.EdgeConfigMapNS {
    80  		return false
    81  	}
    82  	return true
    83  }
    84  
    85  // NameEventHandler handles keeping the cluster name up to date by watching
    86  // Kubernetes events
    87  type NameEventHandler struct {
    88  	UpdateFunc func(topic.UpdateFunc) error
    89  }
    90  
    91  // OnAdd updates the cluster name on ConfigMap create events
    92  func (neh *NameEventHandler) OnAdd(obj interface{}, _ bool) {
    93  	cm := obj.(*corev1.ConfigMap)
    94  	updateName(neh.UpdateFunc, cm)
    95  }
    96  
    97  // OnUpdate updates the cluster name on ConfigMap update events
    98  func (neh *NameEventHandler) OnUpdate(_, newObj interface{}) {
    99  	cm := newObj.(*corev1.ConfigMap)
   100  	updateName(neh.UpdateFunc, cm)
   101  }
   102  
   103  // OnDelete ignores delete events
   104  func (neh *NameEventHandler) OnDelete(_ interface{}) {}
   105  
   106  // updateName retrieves the store name from the edge-info ConfigMap and assigns
   107  // it to the cluster name field
   108  func updateName(updateFunc func(topic.UpdateFunc) error, cm *corev1.ConfigMap) {
   109  	edgeinfo := info.EdgeInfo{}
   110  	clusterName := edgeinfo.FromConfigMap(cm).Store
   111  	err := updateFunc(func(obj interface{}) error {
   112  		state := obj.(*State)
   113  		state.Name = clusterName
   114  		return nil
   115  	})
   116  	if err != nil {
   117  		logging.NewLogger().Error(err, "failed to update cluster name in informer hook")
   118  	}
   119  }
   120  
   121  // RegistersEndpoints registers the Cluster topic endpoints on the provided
   122  // router
   123  func (c *Cluster) RegisterEndpoints(r *gin.Engine) {
   124  	v1 := r.Group(Path)
   125  	v1.GET("", c.topic.DefaultGet)
   126  	v1.PATCH("", middleware.ReadOnlyFieldValidation(), c.topic.DefaultPatch)
   127  }
   128  
   129  // State contains information about the D-SDS cluster
   130  //
   131  // swagger:model ClusterState
   132  type State struct {
   133  	// Name of the cluster. This field is read only and will be kept up to date internally by Interlock
   134  	//
   135  	// readOnly: true
   136  	// example: store-123
   137  	Name string `json:"name" binding:"required"`
   138  }
   139  
   140  // newState returns a new cluster State with initialized values
   141  func newState(ctx context.Context, cfg *config.Config) (*State, error) {
   142  	edgeinfo, err := info.FromClient(ctx, cfg.KubeRetryClient.Client())
   143  	if err != nil {
   144  		return nil, fmt.Errorf("failed to get edgeinfo configmap: %w", err)
   145  	}
   146  	state := &State{
   147  		Name: edgeinfo.Store,
   148  	}
   149  	return state, nil
   150  }
   151  
   152  // Everything below this point is used for documentation purposes and exists for
   153  // the purpose of generating the swagger spec only.
   154  
   155  // swagger:route GET /v1/cluster cluster GetClusterState
   156  // Retrieve the current cluster state
   157  // responses:
   158  //   200: ClusterStateResponse
   159  //   404: ErrorResponse
   160  //   405: ErrorResponse
   161  //   500: ErrorResponse
   162  
   163  // swagger:route PATCH /v1/cluster cluster PatchClusterState
   164  // Patch the cluster state
   165  // responses:
   166  //   202: ClusterStateResponse
   167  //   404: ErrorResponse
   168  //   405: ErrorResponse
   169  //   500: ErrorResponse
   170  
   171  // The PatchClusterState parameters
   172  //
   173  // swagger:parameters PatchClusterState
   174  type PatchParametersWrapper struct {
   175  	// The cluster state that the client wants to patch
   176  	//
   177  	// in:body
   178  	ClusterState State `json:"state"`
   179  }
   180  
   181  // The request was successful
   182  //
   183  // swagger:response ClusterStateResponse
   184  type StateResponseWrapper struct {
   185  	// The current cluster state
   186  	//
   187  	// in:body
   188  	ClusterState State `json:"state"`
   189  }
   190  

View as plain text