package topic import ( "errors" "net/http" "reflect" "sync" "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/go-playground/validator/v10" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/sds/interlock/internal/config" errs "edge-infra.dev/pkg/sds/interlock/internal/errors" "edge-infra.dev/pkg/sds/interlock/websocket" ) // UpdateFunc represents a function that can be used to make direct changes to // state type UpdateFunc func(interface{}) error // Topic is a base structure for Interlock topics type Topic struct { name string state State *config.Config mutex *sync.RWMutex eventSender eventSender } type eventSender interface { Send(event websocket.Event) } // NewTopic returns a new topic with the provided name, state and websocket // manager, while also initializing the read-write mutex func NewTopic(name string, state interface{}, cfg *config.Config, eventSender eventSender) Topic { return Topic{ name: name, state: State{ data: state, }, Config: cfg, mutex: &sync.RWMutex{}, eventSender: eventSender, } } // State returns a deep copy of the state data with reads protected by a // read-write mutex func (t *Topic) State() interface{} { t.mutex.RLock() defer t.mutex.RUnlock() return t.state.DeepCopyData() } // UpdateState is a decorator that is used to append thread-safe locking and // event sending functionality to state updates func (t *Topic) UpdateState(fn UpdateFunc) error { t.mutex.Lock() defer t.mutex.Unlock() // create state reference to help determine if state has changed before := t.state.DeepCopyData() if err := fn(t.state.data); err != nil { return err } // if state has changed, send an event if !reflect.DeepEqual(before, t.state.data) { t.SendEvent() } return nil } // DefaultGet returns a StatusOK (200) response alongside the current topic // state data. The reading of the state is thread-safe and uses the read-write // mutex func (t *Topic) DefaultGet(c *gin.Context) { c.JSON(http.StatusOK, t.State()) } // DefaultPatch attempts to bind the request data to the current topic state. If // this is successful, then it returns a StatusAccepted (202) response alongside // the new topic state data. The updating of the state is thread-safe and uses // the read-write mutex func (t *Topic) DefaultPatch(c *gin.Context) { err := t.UpdateState( func(state interface{}) error { if err := c.ShouldBindBodyWith(&state, binding.JSON); err != nil { return err } c.JSON(http.StatusAccepted, state) return nil }, ) if err != nil { HandleBindingError(c, err) return } } // HandleBindingError will check to see if the provided error is a validation // error. If it is not, the error is returned as is. If it is a validation // error, then the error is parsed to provide a more human readable error // format func HandleBindingError(c *gin.Context, err error) { log := fog.FromContext(c.Request.Context()) var ve validator.ValidationErrors if errors.As(err, &ve) { out := make([]*errs.Error, len(ve)) for i, fe := range ve { out[i] = &errs.Error{} out[i].FromFieldError(fe) out[i] = &errs.Error{} out[i].FromFieldError(fe) } c.JSON(http.StatusBadRequest, errs.NewErrorResponse(out...)) return } // A validation error will be of this type if the payload is a slice var sve binding.SliceValidationError if errors.As(err, &sve) { out := make([]*errs.Error, len(sve)) for i, e := range sve { out[i] = errs.NewError(e.Error()) } c.JSON(http.StatusBadRequest, errs.NewErrorResponse(out...)) return } log.Error(err, "failed to bind JSON") c.JSON(http.StatusInternalServerError, errs.NewErrorResponse(errs.NewError(http.StatusText(http.StatusInternalServerError)))) } // SendEvent to the websocket manager channels func (t *Topic) SendEvent() { t.eventSender.Send(websocket.Event{ Topic: t.name, Data: t.state.data, }) } // State contains the state for the topic in the data field type State struct { data interface{} } // DeepCopyData returns a deep copy of the state data func (s *State) DeepCopyData() interface{} { // Wrap the original in a reflect.Value in := reflect.ValueOf(s.data) out := reflect.New(in.Type()).Elem() deepCopy(out, in) // Remove the reflection wrapper return out.Interface() } // deepCopy is a recursive function that deep copies the original value to the // copy value func deepCopy(out, in reflect.Value) { switch in.Kind() { // if in is a pointer we need to retrieve the value that is being // pointed at and set the value for the copied field with a recursive // deep copy on that value case reflect.Ptr: // check if the pointer is nil if in.IsNil() { return } inValue := in.Elem() // allocate a new pointer object and copy the in value to it out.Set(reflect.New(inValue.Type())) deepCopy(out.Elem(), inValue) // if in is an interface we need to retrieve the underlying struct and // set the value for the copied field with a recursive deep copy on that // value case reflect.Interface: // no-op for nil interface if in.IsNil() { return } inValue := in.Elem() // get the value of the struct and set the field on the out outValue := reflect.New(inValue.Type()).Elem() deepCopy(outValue, inValue) out.Set(outValue) // if in is a struct deep copy each field case reflect.Struct: for i := 0; i < in.NumField(); i++ { deepCopy(out.Field(i), in.Field(i)) } // if in is a slice then create a new slice and deep copy each // element case reflect.Slice: // no-op if input slice value is uninitialized if in.IsNil() { return } // create a zero-initialized slice value out.Set(reflect.MakeSlice(in.Type(), in.Len(), in.Cap())) for i := 0; i < in.Len(); i++ { deepCopy(out.Index(i), in.Index(i)) } // if in is a map we create a new map and deep copy each key-value // pair case reflect.Map: // no-op if input map value is uninitialized if in.IsNil() { return } out.Set(reflect.MakeMap(in.Type())) for _, key := range in.MapKeys() { inValue := in.MapIndex(key) // New gives us a pointer, but again we want the value outValue := reflect.New(inValue.Type()).Elem() deepCopy(outValue, inValue) out.SetMapIndex(key, outValue) } // for fields that are not nested (slices, maps, structs), pointers or // interfaces, we set the in field on the out default: out.Set(in) } }