...
1 package client
2
3 import (
4 "context"
5 b64 "encoding/base64"
6 "encoding/json"
7 "fmt"
8 "reflect"
9
10 "edge-infra.dev/pkg/edge/chariot"
11
12 "google.golang.org/api/option"
13
14 "edge-infra.dev/pkg/lib/gcp/pubsub"
15
16 "sigs.k8s.io/controller-runtime/pkg/client"
17
18 "edge-infra.dev/pkg/edge/k8objectsutils"
19 )
20
21 const (
22
23 Create Operation = iota
24
25 Delete
26 )
27
28 type Operation int
29
30
31
32 type ChariotMessage struct {
33
34 Objects []string `json:"objects"`
35
36 Banner string `json:"banner"`
37
38 Cluster string `json:"cluster"`
39
40 Operation string `json:"operation"`
41
42 Owner string `json:"owner"`
43
44 Dir string `json:"dir,omitempty"`
45
46 Notify bool `json:"notify,omitempty"`
47 }
48
49
50 func (s Operation) String() string {
51 switch s {
52 case Create:
53 return "CREATE"
54 case Delete:
55 return "DELETE"
56 default:
57 return "CREATE"
58 }
59 }
60
61
62 func NewChariotMessage() *ChariotMessage {
63 return &ChariotMessage{}
64 }
65
66
67 func (c *ChariotMessage) SetBanner(banner string) *ChariotMessage {
68 c.Banner = banner
69 return c
70 }
71
72
73 func (c *ChariotMessage) SetCluster(cluster string) *ChariotMessage {
74 c.Cluster = cluster
75 return c
76 }
77
78
79 func (c *ChariotMessage) SetDir(dir string) *ChariotMessage {
80 c.Dir = dir
81 return c
82 }
83
84
85 func (c *ChariotMessage) SetOperation(operation Operation) *ChariotMessage {
86 c.Operation = operation.String()
87 return c
88 }
89
90
91
92 func (c *ChariotMessage) SetOwner(owner string) *ChariotMessage {
93 c.Owner = owner
94 return c
95 }
96
97
98
99 func (c *ChariotMessage) SetObjects(objects []string) *ChariotMessage {
100 c.Objects = objects
101 return c
102 }
103
104
105 func (c *ChariotMessage) SetNotify(notify bool) *ChariotMessage {
106 c.Notify = notify
107 return c
108 }
109
110
111
112 func (c *ChariotMessage) AddObject(object ...string) *ChariotMessage {
113 c.Objects = append(c.Objects, object...)
114 return c
115 }
116
117
118
119 func (c *ChariotMessage) SetK8sBase64Objects(objects ...client.Object) (*ChariotMessage, error) {
120 var base64Strings []string
121 for _, o := range objects {
122 data, err := k8objectsutils.ToBase64JSONString(o)
123 if err != nil {
124 return nil, err
125 }
126 base64Strings = append(base64Strings, data)
127 }
128 return c.SetObjects(base64Strings), nil
129 }
130
131
132
133 func (c *ChariotMessage) AddK8sBase64Object(object ...client.Object) (*ChariotMessage, error) {
134 for _, o := range object {
135 data, err := k8objectsutils.ToBase64JSONString(o)
136 if err != nil {
137 return nil, err
138 }
139 c.AddObject(data)
140 }
141 return c, nil
142 }
143
144
145 func (c *ChariotMessage) ToByte() ([]byte, error) {
146 return json.Marshal(c)
147 }
148
149
150 func (c *ChariotMessage) Publish(ctx context.Context, projectID string, topicID string, attributes map[string]string, opts ...option.ClientOption) error {
151 pubSubService, err := pubsub.NewWithOptions(ctx, projectID, opts...)
152 if err != nil {
153 return err
154 }
155 chariotMessageByte, err := c.ToByte()
156 if err != nil {
157 return err
158 }
159 return pubSubService.Send(ctx, topicID, chariotMessageByte, attributes)
160 }
161
162 func (c *ChariotMessage) Matches(x interface{}) bool {
163 reflectedValue := reflect.ValueOf(x).Elem()
164
165 ls := reflectedValue.FieldByName("Objects").Interface().([]string)
166
167 if len(c.Objects) != len(ls) {
168 return false
169 }
170 map1 := map[string]bool{}
171 map2 := map[string]bool{}
172 for i := range ls {
173 str1, _ := b64.StdEncoding.DecodeString(ls[i])
174 gvk1, err := chariot.ParseYamlGVKNN(str1)
175 if err != nil {
176 return false
177 }
178 str2, _ := b64.StdEncoding.DecodeString(c.Objects[i])
179 gvk2, err := chariot.ParseYamlGVKNN(str2)
180 if err != nil {
181 return false
182 }
183
184
185 if gvk1.Hash() == gvk2.Hash() {
186 continue
187 }
188
189 _, isItem2InMap1 := map1[gvk2.Hash()]
190 _, isItem1InMap2 := map2[gvk1.Hash()]
191
192
193 if !isItem2InMap1 {
194 map2[gvk2.Hash()] = true
195 } else {
196 delete(map1, gvk2.Hash())
197 }
198
199 if !isItem1InMap2 {
200 map1[gvk1.Hash()] = true
201 } else {
202 delete(map2, gvk1.Hash())
203 }
204 }
205
206 if len(map1) != 0 || len(map2) != 0 {
207 return false
208 }
209 if c.Banner != reflectedValue.FieldByName("Banner").String() {
210 return false
211 } else if c.Cluster != reflectedValue.FieldByName("Cluster").String() {
212 return false
213 } else if c.Operation != reflectedValue.FieldByName("Operation").String() {
214 return false
215 } else if c.Owner != reflectedValue.FieldByName("Owner").String() {
216 return false
217 } else if c.Dir != reflectedValue.FieldByName("Dir").String() {
218 return false
219 }
220 return true
221 }
222
223 func (c *ChariotMessage) String() string {
224 return fmt.Sprintf("{ChariotMessage - Banner:%s Cluster:%s Operation:%s Owner:%s Dir:%s}", c.Banner, c.Cluster, c.Operation, c.Owner, c.Dir)
225 }
226
View as plain text