...

Source file src/edge-infra.dev/pkg/sds/interlock/topic/topic.go

Documentation: edge-infra.dev/pkg/sds/interlock/topic

     1  package topic
     2  
     3  import (
     4  	"errors"
     5  	"net/http"
     6  	"reflect"
     7  	"sync"
     8  
     9  	"github.com/gin-gonic/gin"
    10  	"github.com/gin-gonic/gin/binding"
    11  	"github.com/go-playground/validator/v10"
    12  
    13  	"edge-infra.dev/pkg/lib/fog"
    14  	"edge-infra.dev/pkg/sds/interlock/internal/config"
    15  	errs "edge-infra.dev/pkg/sds/interlock/internal/errors"
    16  	"edge-infra.dev/pkg/sds/interlock/websocket"
    17  )
    18  
    19  // UpdateFunc represents a function that can be used to make direct changes to
    20  // state
    21  type UpdateFunc func(interface{}) error
    22  
    23  // Topic is a base structure for Interlock topics
    24  type Topic struct {
    25  	name  string
    26  	state State
    27  	*config.Config
    28  	mutex       *sync.RWMutex
    29  	eventSender eventSender
    30  }
    31  
    32  type eventSender interface {
    33  	Send(event websocket.Event)
    34  }
    35  
    36  // NewTopic returns a new topic with the provided name, state and websocket
    37  // manager, while also initializing the read-write mutex
    38  func NewTopic(name string, state interface{}, cfg *config.Config, eventSender eventSender) Topic {
    39  	return Topic{
    40  		name: name,
    41  		state: State{
    42  			data: state,
    43  		},
    44  		Config:      cfg,
    45  		mutex:       &sync.RWMutex{},
    46  		eventSender: eventSender,
    47  	}
    48  }
    49  
    50  // State returns a deep copy of the state data with reads protected by a
    51  // read-write mutex
    52  func (t *Topic) State() interface{} {
    53  	t.mutex.RLock()
    54  	defer t.mutex.RUnlock()
    55  	return t.state.DeepCopyData()
    56  }
    57  
    58  // UpdateState is a decorator that is used to append thread-safe locking and
    59  // event sending functionality to state updates
    60  func (t *Topic) UpdateState(fn UpdateFunc) error {
    61  	t.mutex.Lock()
    62  	defer t.mutex.Unlock()
    63  
    64  	// create state reference to help determine if state has changed
    65  	before := t.state.DeepCopyData()
    66  	if err := fn(t.state.data); err != nil {
    67  		return err
    68  	}
    69  	// if state has changed, send an event
    70  	if !reflect.DeepEqual(before, t.state.data) {
    71  		t.SendEvent()
    72  	}
    73  	return nil
    74  }
    75  
    76  // DefaultGet returns a StatusOK (200) response alongside the current topic
    77  // state data. The reading of the state is thread-safe and uses the read-write
    78  // mutex
    79  func (t *Topic) DefaultGet(c *gin.Context) {
    80  	c.JSON(http.StatusOK, t.State())
    81  }
    82  
    83  // DefaultPatch attempts to bind the request data to the current topic state. If
    84  // this is successful, then it returns a StatusAccepted (202) response alongside
    85  // the new topic state data. The updating of the state is thread-safe and uses
    86  // the read-write mutex
    87  func (t *Topic) DefaultPatch(c *gin.Context) {
    88  	err := t.UpdateState(
    89  		func(state interface{}) error {
    90  			if err := c.ShouldBindBodyWith(&state, binding.JSON); err != nil {
    91  				return err
    92  			}
    93  			c.JSON(http.StatusAccepted, state)
    94  			return nil
    95  		},
    96  	)
    97  	if err != nil {
    98  		HandleBindingError(c, err)
    99  		return
   100  	}
   101  }
   102  
   103  // HandleBindingError will check to see if the provided error is a validation
   104  // error. If it is not, the error is returned as is. If it is a validation
   105  // error, then the error is parsed to provide a more human readable error
   106  // format
   107  func HandleBindingError(c *gin.Context, err error) {
   108  	log := fog.FromContext(c.Request.Context())
   109  	var ve validator.ValidationErrors
   110  	if errors.As(err, &ve) {
   111  		out := make([]*errs.Error, len(ve))
   112  		for i, fe := range ve {
   113  			out[i] = &errs.Error{}
   114  			out[i].FromFieldError(fe)
   115  			out[i] = &errs.Error{}
   116  			out[i].FromFieldError(fe)
   117  		}
   118  		c.JSON(http.StatusBadRequest, errs.NewErrorResponse(out...))
   119  		return
   120  	}
   121  	// A validation error will be of this type if the payload is a slice
   122  	var sve binding.SliceValidationError
   123  	if errors.As(err, &sve) {
   124  		out := make([]*errs.Error, len(sve))
   125  		for i, e := range sve {
   126  			out[i] = errs.NewError(e.Error())
   127  		}
   128  		c.JSON(http.StatusBadRequest, errs.NewErrorResponse(out...))
   129  		return
   130  	}
   131  	log.Error(err, "failed to bind JSON")
   132  	c.JSON(http.StatusInternalServerError, errs.NewErrorResponse(errs.NewError(http.StatusText(http.StatusInternalServerError))))
   133  }
   134  
   135  // SendEvent to the websocket manager channels
   136  func (t *Topic) SendEvent() {
   137  	t.eventSender.Send(websocket.Event{
   138  		Topic: t.name,
   139  		Data:  t.state.data,
   140  	})
   141  }
   142  
   143  // State contains the state for the topic in the data field
   144  type State struct {
   145  	data interface{}
   146  }
   147  
   148  // DeepCopyData returns a deep copy of the state data
   149  func (s *State) DeepCopyData() interface{} {
   150  	// Wrap the original in a reflect.Value
   151  	in := reflect.ValueOf(s.data)
   152  
   153  	out := reflect.New(in.Type()).Elem()
   154  	deepCopy(out, in)
   155  
   156  	// Remove the reflection wrapper
   157  	return out.Interface()
   158  }
   159  
   160  // deepCopy is a recursive function that deep copies the original value to the
   161  // copy value
   162  func deepCopy(out, in reflect.Value) {
   163  	switch in.Kind() {
   164  	// if in is a pointer we need to retrieve the value that is being
   165  	// pointed at and set the value for the copied field with a recursive
   166  	// deep copy on that value
   167  	case reflect.Ptr:
   168  		// check if the pointer is nil
   169  		if in.IsNil() {
   170  			return
   171  		}
   172  		inValue := in.Elem()
   173  		// allocate a new pointer object and copy the in value to it
   174  		out.Set(reflect.New(inValue.Type()))
   175  		deepCopy(out.Elem(), inValue)
   176  
   177  	// if in is an interface we need to retrieve the underlying struct and
   178  	// set the value for the copied field with a recursive deep copy on that
   179  	// value
   180  	case reflect.Interface:
   181  		// no-op for nil interface
   182  		if in.IsNil() {
   183  			return
   184  		}
   185  		inValue := in.Elem()
   186  		// get the value of the struct and set the field on the out
   187  		outValue := reflect.New(inValue.Type()).Elem()
   188  		deepCopy(outValue, inValue)
   189  		out.Set(outValue)
   190  
   191  	// if in is a struct deep copy each field
   192  	case reflect.Struct:
   193  		for i := 0; i < in.NumField(); i++ {
   194  			deepCopy(out.Field(i), in.Field(i))
   195  		}
   196  
   197  	// if in is a slice then create a new slice and deep copy each
   198  	// element
   199  	case reflect.Slice:
   200  		// no-op if input slice value is uninitialized
   201  		if in.IsNil() {
   202  			return
   203  		}
   204  		// create a zero-initialized slice value
   205  		out.Set(reflect.MakeSlice(in.Type(), in.Len(), in.Cap()))
   206  		for i := 0; i < in.Len(); i++ {
   207  			deepCopy(out.Index(i), in.Index(i))
   208  		}
   209  
   210  	// if in is a map we create a new map and deep copy each key-value
   211  	// pair
   212  	case reflect.Map:
   213  		// no-op if input map value is uninitialized
   214  		if in.IsNil() {
   215  			return
   216  		}
   217  		out.Set(reflect.MakeMap(in.Type()))
   218  		for _, key := range in.MapKeys() {
   219  			inValue := in.MapIndex(key)
   220  			// New gives us a pointer, but again we want the value
   221  			outValue := reflect.New(inValue.Type()).Elem()
   222  			deepCopy(outValue, inValue)
   223  			out.SetMapIndex(key, outValue)
   224  		}
   225  
   226  	// for fields that are not nested (slices, maps, structs), pointers or
   227  	// interfaces, we set the in field on the out
   228  	default:
   229  		out.Set(in)
   230  	}
   231  }
   232  

View as plain text