...
1 package cluster
2
3 import (
4 "context"
5 "fmt"
6
7 "github.com/gin-gonic/gin"
8 corev1 "k8s.io/api/core/v1"
9 toolscache "k8s.io/client-go/tools/cache"
10
11 "edge-infra.dev/pkg/edge/info"
12 "edge-infra.dev/pkg/lib/logging"
13 "edge-infra.dev/pkg/sds/interlock/internal/config"
14 "edge-infra.dev/pkg/sds/interlock/topic"
15 "edge-infra.dev/pkg/sds/interlock/topic/cluster/internal/middleware"
16 "edge-infra.dev/pkg/sds/interlock/websocket"
17 )
18
19 var (
20
21 TopicName = "cluster"
22
23 Path = "/v1/cluster"
24 )
25
26
27 type Cluster struct {
28 topic topic.Topic
29 }
30
31
32
33
34 func New(ctx context.Context, cfg *config.Config, wm *websocket.Manager) (*Cluster, error) {
35 state, err := newState(ctx, cfg)
36 if err != nil {
37 return nil, err
38 }
39 topic := topic.NewTopic(
40 TopicName,
41 state,
42 cfg,
43 wm,
44 )
45 cluster := &Cluster{
46 topic: topic,
47 }
48 err = cluster.SetupAPIInformers(ctx, cfg)
49 return cluster, err
50 }
51
52
53
54
55 func (c *Cluster) SetupAPIInformers(ctx context.Context, cfg *config.Config) error {
56
57 informer, err := cfg.Cache.GetInformer(ctx, &corev1.ConfigMap{})
58 if err != nil {
59 return fmt.Errorf("failed to get configmap resource informer: %w", err)
60 }
61
62 nameFilteredEventHandler := toolscache.FilteringResourceEventHandler{
63 FilterFunc: IsEdgeInfoCM,
64 Handler: &NameEventHandler{c.topic.UpdateState},
65 }
66 if _, err := informer.AddEventHandler(nameFilteredEventHandler); err != nil {
67 return fmt.Errorf("failed to add name event handler: %w", err)
68 }
69 return nil
70 }
71
72
73
74 func IsEdgeInfoCM(obj interface{}) bool {
75 cm, ok := obj.(*corev1.ConfigMap)
76 if !ok {
77 return false
78 }
79 if cm.Name != info.EdgeConfigMapName || cm.Namespace != info.EdgeConfigMapNS {
80 return false
81 }
82 return true
83 }
84
85
86
87 type NameEventHandler struct {
88 UpdateFunc func(topic.UpdateFunc) error
89 }
90
91
92 func (neh *NameEventHandler) OnAdd(obj interface{}, _ bool) {
93 cm := obj.(*corev1.ConfigMap)
94 updateName(neh.UpdateFunc, cm)
95 }
96
97
98 func (neh *NameEventHandler) OnUpdate(_, newObj interface{}) {
99 cm := newObj.(*corev1.ConfigMap)
100 updateName(neh.UpdateFunc, cm)
101 }
102
103
104 func (neh *NameEventHandler) OnDelete(_ interface{}) {}
105
106
107
108 func updateName(updateFunc func(topic.UpdateFunc) error, cm *corev1.ConfigMap) {
109 edgeinfo := info.EdgeInfo{}
110 clusterName := edgeinfo.FromConfigMap(cm).Store
111 err := updateFunc(func(obj interface{}) error {
112 state := obj.(*State)
113 state.Name = clusterName
114 return nil
115 })
116 if err != nil {
117 logging.NewLogger().Error(err, "failed to update cluster name in informer hook")
118 }
119 }
120
121
122
123 func (c *Cluster) RegisterEndpoints(r *gin.Engine) {
124 v1 := r.Group(Path)
125 v1.GET("", c.topic.DefaultGet)
126 v1.PATCH("", middleware.ReadOnlyFieldValidation(), c.topic.DefaultPatch)
127 }
128
129
130
131
132 type State struct {
133
134
135
136
137 Name string `json:"name" binding:"required"`
138 }
139
140
141 func newState(ctx context.Context, cfg *config.Config) (*State, error) {
142 edgeinfo, err := info.FromClient(ctx, cfg.KubeRetryClient.Client())
143 if err != nil {
144 return nil, fmt.Errorf("failed to get edgeinfo configmap: %w", err)
145 }
146 state := &State{
147 Name: edgeinfo.Store,
148 }
149 return state, nil
150 }
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 type PatchParametersWrapper struct {
175
176
177
178 ClusterState State `json:"state"`
179 }
180
181
182
183
184 type StateResponseWrapper struct {
185
186
187
188 ClusterState State `json:"state"`
189 }
190
View as plain text