...

Source file src/edge-infra.dev/pkg/sds/interlock/topic/example/example.go

Documentation: edge-infra.dev/pkg/sds/interlock/topic/example

     1  package example
     2  
     3  import (
     4  	"context"
     5  	"strconv"
     6  
     7  	"github.com/fsnotify/fsnotify"
     8  	"github.com/gin-gonic/gin"
     9  	"github.com/spf13/afero"
    10  	corev1 "k8s.io/api/core/v1"
    11  	toolscache "k8s.io/client-go/tools/cache"
    12  
    13  	"edge-infra.dev/pkg/lib/fog"
    14  	"edge-infra.dev/pkg/lib/logging"
    15  	"edge-infra.dev/pkg/sds/interlock/internal/config"
    16  	"edge-infra.dev/pkg/sds/interlock/topic"
    17  	"edge-infra.dev/pkg/sds/interlock/websocket"
    18  )
    19  
    20  // TopicName to be used for subscriptions
    21  var TopicName = "example"
    22  
    23  // Example represents an example topic
    24  type Example struct {
    25  	topic topic.Topic
    26  }
    27  
    28  // New returns a new Example and configures the topic with the name, initialized
    29  // state and websocket manager
    30  func New(ctx context.Context, cfg *config.Config, wm *websocket.Manager) (*Example, error) {
    31  	topic := topic.NewTopic(
    32  		TopicName,
    33  		newState(),
    34  		cfg,
    35  		wm,
    36  	)
    37  	example := &Example{
    38  		topic: topic,
    39  	}
    40  	// optionally setup Kubernetes API Server informers
    41  	if err := example.SetupAPIInformers(ctx, cfg); err != nil {
    42  		return nil, err
    43  	}
    44  	// optionally setup filesystem watchers
    45  	if err := example.SetupFSWatchers(ctx, cfg); err != nil {
    46  		return nil, err
    47  	}
    48  	return example, nil
    49  }
    50  
    51  // SetupAPIInformers shows how to register new event handlers on the Interlock
    52  // informer. You will also need to add your object to the reader cache in
    53  // config.go, otherwise events won't be captured
    54  func (e *Example) SetupAPIInformers(ctx context.Context, cfg *config.Config) error {
    55  	// get ConfigMap informer
    56  	informer, err := cfg.Cache.GetInformer(ctx, &corev1.ConfigMap{})
    57  	if err != nil {
    58  		return err
    59  	}
    60  	// create event handler for the IntField field
    61  	filteredEventHandler := toolscache.FilteringResourceEventHandler{
    62  		FilterFunc: FilterConfigMap,
    63  		Handler:    &EventHandler{e.topic.UpdateState},
    64  	}
    65  	if _, err := informer.AddEventHandler(filteredEventHandler); err != nil {
    66  		return err
    67  	}
    68  	return nil
    69  }
    70  
    71  // FilterConfigMap will filter the incoming objects to make sure that they are
    72  // the ConfigMap that we want to watch
    73  func FilterConfigMap(obj interface{}) bool {
    74  	cm, ok := obj.(*corev1.ConfigMap)
    75  	if !ok {
    76  		return false
    77  	}
    78  	if cm.Name != "example-name" || cm.Namespace != "example-namespace" {
    79  		return false
    80  	}
    81  	return true
    82  }
    83  
    84  // EventHandler handles keeping the IntField up to date based on the value in
    85  // the ConfigMap
    86  type EventHandler struct {
    87  	UpdateFunc func(topic.UpdateFunc) error
    88  }
    89  
    90  // OnAdd updates the IntField field on ConfigMap create events
    91  func (h *EventHandler) OnAdd(obj interface{}, _ bool) {
    92  	cm := obj.(*corev1.ConfigMap)
    93  	updateIntField(h.UpdateFunc, cm)
    94  }
    95  
    96  // OnUpdate updates the IntField field on ConfigMap update events
    97  func (h *EventHandler) OnUpdate(_, newObj interface{}) {
    98  	cm := newObj.(*corev1.ConfigMap)
    99  	updateIntField(h.UpdateFunc, cm)
   100  }
   101  
   102  // OnDelete ignores delete events. You could optionally reset values or add any
   103  // other behaviour here
   104  func (h *EventHandler) OnDelete(_ interface{}) {}
   105  
   106  // updateIntField retrieves the example field from the example ConfigMap
   107  // and assigns it to the IntField field
   108  func updateIntField(updateFunc func(topic.UpdateFunc) error, cm *corev1.ConfigMap) {
   109  	log := logging.NewLogger()
   110  	stringField := cm.Data["example"]
   111  	intField, err := strconv.Atoi(stringField)
   112  	if err != nil {
   113  		log.Error(err, "failed to convert str to int in API informer")
   114  		return
   115  	}
   116  	// use the topic update function passed in to update topic state in a thread
   117  	// safe way
   118  	err = updateFunc(func(obj interface{}) error {
   119  		state := obj.(*State)
   120  		state.IntField = intField
   121  		return nil
   122  	})
   123  	if err != nil {
   124  		log.Error(err, "failed to update IntField in API informer")
   125  	}
   126  }
   127  
   128  // SetupFSWatchers shows how to register new filesystem watchers. You will also
   129  // need to mount your target file/directory into Interlock otherwise it won't be
   130  // seen
   131  func (e *Example) SetupFSWatchers(ctx context.Context, cfg *config.Config) error {
   132  	// create new watcher
   133  	watcher, err := fsnotify.NewWatcher()
   134  	if err != nil {
   135  		return err
   136  	}
   137  	defer watcher.Close()
   138  
   139  	// start listening for events
   140  	go updateStringField(ctx, watcher, cfg.Fs, e.topic.UpdateState)
   141  
   142  	// add the filepath to the watcher
   143  	return watcher.Add("example-file.txt")
   144  }
   145  
   146  // updateStringField watches the events on the provided fsnotify.Watcher and
   147  // and updates the StringField field when the example file is written to
   148  func updateStringField(ctx context.Context, watcher *fsnotify.Watcher, fs afero.Fs, update func(topic.UpdateFunc) error) {
   149  	log := fog.FromContext(ctx)
   150  	for {
   151  		select {
   152  		case event, ok := <-watcher.Events:
   153  			if !ok {
   154  				return
   155  			}
   156  			// if a write event was observed, update the StringField field
   157  			// using the provided topic update function which allows you to
   158  			// update state in a thread safe way
   159  			if event.Has(fsnotify.Write) {
   160  				err := update(func(obj interface{}) error {
   161  					content, err := afero.ReadFile(fs, "example-file.txt")
   162  					if err != nil {
   163  						return err
   164  					}
   165  					state := obj.(*State)
   166  					state.StringField = string(content)
   167  					return nil
   168  				})
   169  				if err != nil {
   170  					log.Error(err, "failed to read example file in filesystem watcher")
   171  				}
   172  			}
   173  		case err, ok := <-watcher.Errors:
   174  			if !ok {
   175  				return
   176  			}
   177  			log.Error(err, "failed to update IntField in filesystem watcher hook")
   178  		}
   179  	}
   180  }
   181  
   182  // RegistersEndpoints registers the Example topic endpoints on the provided router
   183  func (e *Example) RegisterEndpoints(r *gin.Engine) {
   184  	v1 := r.Group("/v1/example")
   185  	v1.GET("", e.topic.DefaultGet)
   186  	v1.PATCH("", e.topic.DefaultPatch)
   187  }
   188  
   189  // State contains the topic state/data. The swagger:model must be set so that
   190  // the model is included in the swagger documentation
   191  //
   192  // swagger:model ExampleState
   193  type State struct {
   194  	// StringField value. This field is required and this is defined both in
   195  	// the comments below to notify swagger that it is required, as well as in
   196  	// the binding tags for the actual validation in the API
   197  	//
   198  	// required: true
   199  	// example: mystring
   200  	StringField string `json:"example-string" binding:"required"`
   201  
   202  	// IntField value. This field is required and this is defined both in
   203  	// the comments below to notify swagger that it is required, as well as in
   204  	// the binding tags for the actual validation in the API
   205  	//
   206  	// min: 3
   207  	// example: 4
   208  	IntField int `json:"example-int" binding:"min=3"`
   209  
   210  	// NestedFields struct for nested topic data
   211  	NestedFields NestedFields `json:"example-nested"`
   212  }
   213  
   214  // NestedFields can also be defined as a model for extra observability in the swagger
   215  // documentation displayed on Edge Docs
   216  //
   217  // swagger:model
   218  type NestedFields struct {
   219  	// BoolField value. This field has no validation requirements
   220  	BoolField bool `json:"example-bool"`
   221  }
   222  
   223  // newState returns a new State for the topic with initialized values
   224  func newState() *State {
   225  	return &State{
   226  		StringField: "",
   227  		IntField:    0,
   228  		NestedFields: NestedFields{
   229  			BoolField: true,
   230  		},
   231  	}
   232  }
   233  
   234  // Everything below this point is used for documetation purposes and exists for
   235  // the purpose of generating the swagger spec only.
   236  
   237  // swagger:route GET /v1/example example GetExampleState
   238  // Retrieve the current example state
   239  // responses:
   240  //   200: ExampleStateResponse
   241  //   404: ErrorResponse
   242  //   405: ErrorResponse
   243  //   500: ErrorResponse
   244  
   245  // swagger:route PATCH /v1/example example PatchExampleState
   246  // Patch the example state
   247  // responses:
   248  //   202: ExampleStateResponse
   249  //   404: ErrorResponse
   250  //   405: ErrorResponse
   251  //   500: ErrorResponse
   252  
   253  // The PatchExampleState parameters
   254  //
   255  // swagger:parameters PatchExampleState
   256  type PatchParametersWrapper struct {
   257  	// The example state that the client wants to patch
   258  	//
   259  	// in:body
   260  	State State `json:"state"`
   261  }
   262  
   263  // The request was successful
   264  //
   265  // swagger:response ExampleStateResponse
   266  type StateResponseWrapper struct {
   267  	// The current example state
   268  	//
   269  	// in:body
   270  	State State `json:"state"`
   271  }
   272  

View as plain text