...
1 package topic
2
3 import (
4 "errors"
5 "net/http"
6 "reflect"
7 "sync"
8
9 "github.com/gin-gonic/gin"
10 "github.com/gin-gonic/gin/binding"
11 "github.com/go-playground/validator/v10"
12
13 "edge-infra.dev/pkg/lib/fog"
14 "edge-infra.dev/pkg/sds/interlock/internal/config"
15 errs "edge-infra.dev/pkg/sds/interlock/internal/errors"
16 "edge-infra.dev/pkg/sds/interlock/websocket"
17 )
18
19
20
21 type UpdateFunc func(interface{}) error
22
23
24 type Topic struct {
25 name string
26 state State
27 *config.Config
28 mutex *sync.RWMutex
29 eventSender eventSender
30 }
31
32 type eventSender interface {
33 Send(event websocket.Event)
34 }
35
36
37
38 func NewTopic(name string, state interface{}, cfg *config.Config, eventSender eventSender) Topic {
39 return Topic{
40 name: name,
41 state: State{
42 data: state,
43 },
44 Config: cfg,
45 mutex: &sync.RWMutex{},
46 eventSender: eventSender,
47 }
48 }
49
50
51
52 func (t *Topic) State() interface{} {
53 t.mutex.RLock()
54 defer t.mutex.RUnlock()
55 return t.state.DeepCopyData()
56 }
57
58
59
60 func (t *Topic) UpdateState(fn UpdateFunc) error {
61 t.mutex.Lock()
62 defer t.mutex.Unlock()
63
64
65 before := t.state.DeepCopyData()
66 if err := fn(t.state.data); err != nil {
67 return err
68 }
69
70 if !reflect.DeepEqual(before, t.state.data) {
71 t.SendEvent()
72 }
73 return nil
74 }
75
76
77
78
79 func (t *Topic) DefaultGet(c *gin.Context) {
80 c.JSON(http.StatusOK, t.State())
81 }
82
83
84
85
86
87 func (t *Topic) DefaultPatch(c *gin.Context) {
88 err := t.UpdateState(
89 func(state interface{}) error {
90 if err := c.ShouldBindBodyWith(&state, binding.JSON); err != nil {
91 return err
92 }
93 c.JSON(http.StatusAccepted, state)
94 return nil
95 },
96 )
97 if err != nil {
98 HandleBindingError(c, err)
99 return
100 }
101 }
102
103
104
105
106
107 func HandleBindingError(c *gin.Context, err error) {
108 log := fog.FromContext(c.Request.Context())
109 var ve validator.ValidationErrors
110 if errors.As(err, &ve) {
111 out := make([]*errs.Error, len(ve))
112 for i, fe := range ve {
113 out[i] = &errs.Error{}
114 out[i].FromFieldError(fe)
115 out[i] = &errs.Error{}
116 out[i].FromFieldError(fe)
117 }
118 c.JSON(http.StatusBadRequest, errs.NewErrorResponse(out...))
119 return
120 }
121
122 var sve binding.SliceValidationError
123 if errors.As(err, &sve) {
124 out := make([]*errs.Error, len(sve))
125 for i, e := range sve {
126 out[i] = errs.NewError(e.Error())
127 }
128 c.JSON(http.StatusBadRequest, errs.NewErrorResponse(out...))
129 return
130 }
131 log.Error(err, "failed to bind JSON")
132 c.JSON(http.StatusInternalServerError, errs.NewErrorResponse(errs.NewError(http.StatusText(http.StatusInternalServerError))))
133 }
134
135
136 func (t *Topic) SendEvent() {
137 t.eventSender.Send(websocket.Event{
138 Topic: t.name,
139 Data: t.state.data,
140 })
141 }
142
143
144 type State struct {
145 data interface{}
146 }
147
148
149 func (s *State) DeepCopyData() interface{} {
150
151 in := reflect.ValueOf(s.data)
152
153 out := reflect.New(in.Type()).Elem()
154 deepCopy(out, in)
155
156
157 return out.Interface()
158 }
159
160
161
162 func deepCopy(out, in reflect.Value) {
163 switch in.Kind() {
164
165
166
167 case reflect.Ptr:
168
169 if in.IsNil() {
170 return
171 }
172 inValue := in.Elem()
173
174 out.Set(reflect.New(inValue.Type()))
175 deepCopy(out.Elem(), inValue)
176
177
178
179
180 case reflect.Interface:
181
182 if in.IsNil() {
183 return
184 }
185 inValue := in.Elem()
186
187 outValue := reflect.New(inValue.Type()).Elem()
188 deepCopy(outValue, inValue)
189 out.Set(outValue)
190
191
192 case reflect.Struct:
193 for i := 0; i < in.NumField(); i++ {
194 deepCopy(out.Field(i), in.Field(i))
195 }
196
197
198
199 case reflect.Slice:
200
201 if in.IsNil() {
202 return
203 }
204
205 out.Set(reflect.MakeSlice(in.Type(), in.Len(), in.Cap()))
206 for i := 0; i < in.Len(); i++ {
207 deepCopy(out.Index(i), in.Index(i))
208 }
209
210
211
212 case reflect.Map:
213
214 if in.IsNil() {
215 return
216 }
217 out.Set(reflect.MakeMap(in.Type()))
218 for _, key := range in.MapKeys() {
219 inValue := in.MapIndex(key)
220
221 outValue := reflect.New(inValue.Type()).Elem()
222 deepCopy(outValue, inValue)
223 out.SetMapIndex(key, outValue)
224 }
225
226
227
228 default:
229 out.Set(in)
230 }
231 }
232
View as plain text