package example import ( "context" "strconv" "github.com/fsnotify/fsnotify" "github.com/gin-gonic/gin" "github.com/spf13/afero" corev1 "k8s.io/api/core/v1" toolscache "k8s.io/client-go/tools/cache" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/lib/logging" "edge-infra.dev/pkg/sds/interlock/internal/config" "edge-infra.dev/pkg/sds/interlock/topic" "edge-infra.dev/pkg/sds/interlock/websocket" ) // TopicName to be used for subscriptions var TopicName = "example" // Example represents an example topic type Example struct { topic topic.Topic } // New returns a new Example and configures the topic with the name, initialized // state and websocket manager func New(ctx context.Context, cfg *config.Config, wm *websocket.Manager) (*Example, error) { topic := topic.NewTopic( TopicName, newState(), cfg, wm, ) example := &Example{ topic: topic, } // optionally setup Kubernetes API Server informers if err := example.SetupAPIInformers(ctx, cfg); err != nil { return nil, err } // optionally setup filesystem watchers if err := example.SetupFSWatchers(ctx, cfg); err != nil { return nil, err } return example, nil } // SetupAPIInformers shows how to register new event handlers on the Interlock // informer. You will also need to add your object to the reader cache in // config.go, otherwise events won't be captured func (e *Example) SetupAPIInformers(ctx context.Context, cfg *config.Config) error { // get ConfigMap informer informer, err := cfg.Cache.GetInformer(ctx, &corev1.ConfigMap{}) if err != nil { return err } // create event handler for the IntField field filteredEventHandler := toolscache.FilteringResourceEventHandler{ FilterFunc: FilterConfigMap, Handler: &EventHandler{e.topic.UpdateState}, } if _, err := informer.AddEventHandler(filteredEventHandler); err != nil { return err } return nil } // FilterConfigMap will filter the incoming objects to make sure that they are // the ConfigMap that we want to watch func FilterConfigMap(obj interface{}) bool { cm, ok := obj.(*corev1.ConfigMap) if !ok { return false } if cm.Name != "example-name" || cm.Namespace != "example-namespace" { return false } return true } // EventHandler handles keeping the IntField up to date based on the value in // the ConfigMap type EventHandler struct { UpdateFunc func(topic.UpdateFunc) error } // OnAdd updates the IntField field on ConfigMap create events func (h *EventHandler) OnAdd(obj interface{}, _ bool) { cm := obj.(*corev1.ConfigMap) updateIntField(h.UpdateFunc, cm) } // OnUpdate updates the IntField field on ConfigMap update events func (h *EventHandler) OnUpdate(_, newObj interface{}) { cm := newObj.(*corev1.ConfigMap) updateIntField(h.UpdateFunc, cm) } // OnDelete ignores delete events. You could optionally reset values or add any // other behaviour here func (h *EventHandler) OnDelete(_ interface{}) {} // updateIntField retrieves the example field from the example ConfigMap // and assigns it to the IntField field func updateIntField(updateFunc func(topic.UpdateFunc) error, cm *corev1.ConfigMap) { log := logging.NewLogger() stringField := cm.Data["example"] intField, err := strconv.Atoi(stringField) if err != nil { log.Error(err, "failed to convert str to int in API informer") return } // use the topic update function passed in to update topic state in a thread // safe way err = updateFunc(func(obj interface{}) error { state := obj.(*State) state.IntField = intField return nil }) if err != nil { log.Error(err, "failed to update IntField in API informer") } } // SetupFSWatchers shows how to register new filesystem watchers. You will also // need to mount your target file/directory into Interlock otherwise it won't be // seen func (e *Example) SetupFSWatchers(ctx context.Context, cfg *config.Config) error { // create new watcher watcher, err := fsnotify.NewWatcher() if err != nil { return err } defer watcher.Close() // start listening for events go updateStringField(ctx, watcher, cfg.Fs, e.topic.UpdateState) // add the filepath to the watcher return watcher.Add("example-file.txt") } // updateStringField watches the events on the provided fsnotify.Watcher and // and updates the StringField field when the example file is written to func updateStringField(ctx context.Context, watcher *fsnotify.Watcher, fs afero.Fs, update func(topic.UpdateFunc) error) { log := fog.FromContext(ctx) for { select { case event, ok := <-watcher.Events: if !ok { return } // if a write event was observed, update the StringField field // using the provided topic update function which allows you to // update state in a thread safe way if event.Has(fsnotify.Write) { err := update(func(obj interface{}) error { content, err := afero.ReadFile(fs, "example-file.txt") if err != nil { return err } state := obj.(*State) state.StringField = string(content) return nil }) if err != nil { log.Error(err, "failed to read example file in filesystem watcher") } } case err, ok := <-watcher.Errors: if !ok { return } log.Error(err, "failed to update IntField in filesystem watcher hook") } } } // RegistersEndpoints registers the Example topic endpoints on the provided router func (e *Example) RegisterEndpoints(r *gin.Engine) { v1 := r.Group("/v1/example") v1.GET("", e.topic.DefaultGet) v1.PATCH("", e.topic.DefaultPatch) } // State contains the topic state/data. The swagger:model must be set so that // the model is included in the swagger documentation // // swagger:model ExampleState type State struct { // StringField value. This field is required and this is defined both in // the comments below to notify swagger that it is required, as well as in // the binding tags for the actual validation in the API // // required: true // example: mystring StringField string `json:"example-string" binding:"required"` // IntField value. This field is required and this is defined both in // the comments below to notify swagger that it is required, as well as in // the binding tags for the actual validation in the API // // min: 3 // example: 4 IntField int `json:"example-int" binding:"min=3"` // NestedFields struct for nested topic data NestedFields NestedFields `json:"example-nested"` } // NestedFields can also be defined as a model for extra observability in the swagger // documentation displayed on Edge Docs // // swagger:model type NestedFields struct { // BoolField value. This field has no validation requirements BoolField bool `json:"example-bool"` } // newState returns a new State for the topic with initialized values func newState() *State { return &State{ StringField: "", IntField: 0, NestedFields: NestedFields{ BoolField: true, }, } } // Everything below this point is used for documetation purposes and exists for // the purpose of generating the swagger spec only. // swagger:route GET /v1/example example GetExampleState // Retrieve the current example state // responses: // 200: ExampleStateResponse // 404: ErrorResponse // 405: ErrorResponse // 500: ErrorResponse // swagger:route PATCH /v1/example example PatchExampleState // Patch the example state // responses: // 202: ExampleStateResponse // 404: ErrorResponse // 405: ErrorResponse // 500: ErrorResponse // The PatchExampleState parameters // // swagger:parameters PatchExampleState type PatchParametersWrapper struct { // The example state that the client wants to patch // // in:body State State `json:"state"` } // The request was successful // // swagger:response ExampleStateResponse type StateResponseWrapper struct { // The current example state // // in:body State State `json:"state"` }