1 package instances
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "net/http"
8 "regexp"
9
10 "github.com/gin-gonic/gin"
11 corev1 "k8s.io/api/core/v1"
12 toolscache "k8s.io/client-go/tools/cache"
13 "sigs.k8s.io/controller-runtime/pkg/client"
14
15 "edge-infra.dev/pkg/lib/fog"
16 "edge-infra.dev/pkg/lib/logging"
17 "edge-infra.dev/pkg/sds/interlock/internal/config"
18 errs "edge-infra.dev/pkg/sds/interlock/internal/errors"
19 "edge-infra.dev/pkg/sds/interlock/topic"
20 "edge-infra.dev/pkg/sds/interlock/websocket"
21 )
22
23 var (
24
25
26 TopicName = "instances"
27
28 Path = "/v1/instances"
29 )
30
31
32
33 type Instances struct {
34 topic topic.Topic
35 }
36
37
38
39
40 func New(ctx context.Context, cfg *config.Config, wm *websocket.Manager) (*Instances, error) {
41 state, err := newState(ctx, cfg)
42 if err != nil {
43 return nil, err
44 }
45
46 topic := topic.NewTopic(
47 TopicName,
48 state,
49 cfg,
50 wm,
51 )
52 instances := &Instances{
53 topic: topic,
54 }
55
56 err = instances.SetupAPIInformers(ctx, cfg)
57 return instances, err
58 }
59
60
61
62
63 func (i *Instances) SetupAPIInformers(ctx context.Context, cfg *config.Config) error {
64
65 informer, err := cfg.Cache.GetInformer(ctx, &corev1.Pod{})
66 if err != nil {
67 return fmt.Errorf("failed to get pod resource informer: %w", err)
68 }
69
70 podFilteredEventHandler := toolscache.FilteringResourceEventHandler{
71 FilterFunc: IsInterlockPod,
72 Handler: &PodEventHandler{i.topic.UpdateState},
73 }
74
75 if _, err := informer.AddEventHandler(podFilteredEventHandler); err != nil {
76 return fmt.Errorf("failed to add Interlock pod event handler: %w", err)
77 }
78
79 return nil
80 }
81
82
83
84 func IsInterlockPod(obj interface{}) bool {
85 pod, ok := obj.(*corev1.Pod)
86 if !ok {
87 return false
88 }
89
90 if pod.Namespace != "interlock" {
91 return false
92 }
93
94 match, err := regexp.MatchString("^interlock-\\w{5}$", pod.Name)
95 if err != nil {
96 return false
97 }
98
99 return match
100 }
101
102
103
104 type PodEventHandler struct {
105 UpdateFunc func(topic.UpdateFunc) error
106 }
107
108
109 func (p *PodEventHandler) OnAdd(obj interface{}, _ bool) {
110 pod := obj.(*corev1.Pod)
111 updateInstance(p.UpdateFunc, pod, false)
112 }
113
114
115 func (p *PodEventHandler) OnUpdate(_, newObj interface{}) {
116 pod := newObj.(*corev1.Pod)
117 updateInstance(p.UpdateFunc, pod, false)
118 }
119
120
121 func (p *PodEventHandler) OnDelete(obj interface{}) {
122 pod := obj.(*corev1.Pod)
123 updateInstance(p.UpdateFunc, pod, true)
124 }
125
126
127 func updateInstance(updateFunc func(topic.UpdateFunc) error, pod *corev1.Pod, deleted bool) {
128
129 if pod.Spec.NodeName == "" {
130 return
131 }
132
133 if deleted {
134 err := updateFunc(func(obj interface{}) error {
135 state, ok := obj.(*State)
136 if !ok {
137 return errors.New("state within discover topic is not of type *State")
138 }
139
140 delete(state.Instances, pod.Spec.NodeName)
141
142 return nil
143 })
144 if err != nil {
145 logging.NewLogger().Error(err, "failed to delete Interlock URL entry in informer hook")
146 }
147
148 return
149 }
150
151 if pod.Status.PodIP == "" {
152 return
153 }
154
155 err := updateFunc(func(obj interface{}) error {
156 state, ok := obj.(*State)
157 if !ok {
158 return errors.New("state within discover topic is not of type *State")
159 }
160
161 state.Instances[pod.Spec.NodeName] = Instance{URL: fmt.Sprintf("http://%s:%s", pod.Status.PodIP, "80")}
162
163 return nil
164 })
165 if err != nil {
166 logging.NewLogger().Error(err, "failed to update cluster name in informer hook")
167 }
168 }
169
170
171
172 func (i *Instances) RegisterEndpoints(r *gin.Engine) {
173 v1 := r.Group(Path)
174 v1.GET("", i.topic.DefaultGet)
175 v1.GET("/:hostname", i.infoFromHostname)
176 }
177
178
179 func (i *Instances) infoFromHostname(c *gin.Context) {
180 log := fog.FromContext(c.Request.Context())
181 target := c.Param("hostname")
182
183 state, ok := i.topic.State().(*State)
184 if !ok {
185 log.Error(fmt.Errorf("state within discover topic is not of type *State"), "failed to cast state to State")
186 c.JSON(http.StatusInternalServerError, errs.NewErrorResponse(errs.NewError(http.StatusText(http.StatusInternalServerError))))
187 }
188
189
190
191 for hostname, info := range state.Instances {
192 if hostname == target {
193 c.JSON(http.StatusOK, info)
194 return
195 }
196 }
197
198 log.Info("hostname not found", "hostname", target)
199
200 c.IndentedJSON(http.StatusNotFound, errs.NewErrorResponse(errs.NewError(http.StatusText(http.StatusNotFound))))
201 }
202
203
204
205
206
207 type State struct {
208
209
210
211
212
213
214 Instances map[string]Instance `json:"instances"`
215 }
216
217
218
219
220 type Instance struct {
221
222
223
224
225 URL string `json:"url"`
226 }
227
228
229 func newState(ctx context.Context, cfg *config.Config) (*State, error) {
230 pods := &corev1.PodList{}
231 if err := cfg.KubeRetryClient.SafeList(ctx, pods, client.InNamespace("interlock")); err != nil {
232 return nil, fmt.Errorf("failed to list pods: %w", err)
233 }
234
235 state := &State{
236 Instances: map[string]Instance{},
237 }
238
239 for i := range pods.Items {
240 if IsInterlockPod(&pods.Items[i]) {
241 state.Instances[pods.Items[i].Spec.NodeName] = Instance{URL: fmt.Sprintf("http://%s:%s", pods.Items[i].Status.PodIP, "80")}
242 }
243 }
244
245 return state, nil
246 }
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270 type InstanceParams struct {
271
272
273
274
275 Hostname string `json:"hostname"`
276 }
277
278
279
280
281 type StateResponseWrapper struct {
282
283
284
285 InstancesState State `json:"state"`
286 }
287
288
289
290
291 type InstanceWrapper struct {
292
293
294
295 Instance Instance `json:"url-response"`
296 }
297
View as plain text