1 package entrypoint
2
3 import (
4 "encoding/json"
5 "fmt"
6 "io/ioutil"
7 "sort"
8 "strings"
9 "sync"
10
11 "github.com/emissary-ingress/emissary/v3/pkg/kates"
12 )
13
14
15
16
17
18
19 type K8sStore struct {
20
21 mutex sync.Mutex
22 resources map[K8sKey]kates.Object
23
24
25 deltas []*kates.Delta
26 }
27
28 type K8sKey struct {
29 Kind string
30 Namespace string
31 Name string
32 }
33
34 func (k K8sKey) sortKey() string {
35 return fmt.Sprintf("%s:%s:%s", k.Kind, k.Namespace, k.Name)
36 }
37
38
39 func NewK8sStore() *K8sStore {
40 return &K8sStore{resources: map[K8sKey]kates.Object{}}
41 }
42
43
44
45
46
47
48
49
50 func (k *K8sStore) Upsert(resource kates.Object) error {
51 var un *kates.Unstructured
52 bytes, err := json.Marshal(resource)
53 if err != nil {
54 return err
55 }
56 err = json.Unmarshal(bytes, &un)
57 if err != nil {
58 return err
59 }
60
61 kind, apiVersion, err := canonGVK(un.GetKind())
62 if err != nil {
63 return err
64 }
65 un.SetKind(kind)
66 un.SetAPIVersion(apiVersion)
67 if un.GetNamespace() == "" {
68 un.SetNamespace("default")
69 }
70
71 k.mutex.Lock()
72 defer k.mutex.Unlock()
73
74 key := K8sKey{un.GetKind(), un.GetNamespace(), un.GetName()}
75 _, ok := k.resources[key]
76 if ok {
77 k.deltas = append(k.deltas, kates.NewDelta(kates.ObjectUpdate, un))
78 } else {
79 k.deltas = append(k.deltas, kates.NewDelta(kates.ObjectAdd, un))
80 }
81 k.resources[key] = un
82 return nil
83 }
84
85
86 func (k *K8sStore) Delete(kind, namespace, name string) error {
87 k.mutex.Lock()
88 defer k.mutex.Unlock()
89
90 canonKind, err := canon(kind)
91 if err != nil {
92 return err
93 }
94 key := K8sKey{canonKind, namespace, name}
95 old, ok := k.resources[key]
96 if ok {
97 delta, err := kates.NewDeltaFromObject(kates.ObjectDelete, old)
98 if err != nil {
99 return err
100 }
101 k.deltas = append(k.deltas, delta)
102 }
103 delete(k.resources, key)
104 return nil
105 }
106
107
108
109 func (k *K8sStore) UpsertFile(filename string) error {
110 content, err := ioutil.ReadFile(filename)
111 if err != nil {
112 return err
113 }
114
115 return k.UpsertYAML(string(content))
116 }
117
118
119
120 func (k *K8sStore) UpsertYAML(yaml string) error {
121 objs, err := kates.ParseManifests(yaml)
122 if err != nil {
123 return err
124 }
125
126 for _, obj := range objs {
127 if err := k.Upsert(obj); err != nil {
128 return err
129 }
130 }
131 return nil
132 }
133
134
135
136
137 func (k *K8sStore) Cursor() *K8sStoreCursor {
138 k.mutex.Lock()
139 defer k.mutex.Unlock()
140 return &K8sStoreCursor{store: k, offset: -1}
141 }
142
143 type K8sStoreCursor struct {
144 store *K8sStore
145
146 offset int
147 }
148
149
150
151 func (kc *K8sStoreCursor) Get() (map[K8sKey]kates.Object, []*kates.Delta, error) {
152 kc.store.mutex.Lock()
153 defer kc.store.mutex.Unlock()
154
155 var deltas []*kates.Delta
156
157 resources := map[K8sKey]kates.Object{}
158 for _, key := range sortedKeys(kc.store.resources) {
159 resource := kc.store.resources[key]
160 resources[key] = resource
161
162
163 if kc.offset < 0 {
164 delta, err := kates.NewDeltaFromObject(kates.ObjectAdd, resource)
165 if err != nil {
166 return nil, nil, err
167 }
168 deltas = append(deltas, delta)
169 }
170 }
171
172 if kc.offset >= 0 {
173 deltas = append(deltas, kc.store.deltas[kc.offset:len(kc.store.deltas)]...)
174 }
175 kc.offset = len(kc.store.deltas)
176
177 return resources, deltas, nil
178 }
179
180 func sortedKeys(resources map[K8sKey]kates.Object) []K8sKey {
181 var keys []K8sKey
182 for k := range resources {
183 keys = append(keys, k)
184 }
185
186 sort.Slice(keys, func(i, j int) bool {
187 return keys[i].sortKey() < keys[j].sortKey()
188 })
189
190 return keys
191 }
192
193 func canonGVK(rawString string) (canonKind string, canonGroupVersion string, err error) {
194
195
196
197 rawParts := strings.SplitN(rawString, ".", 2)
198 var rawKind, rawVG string
199 switch len(rawParts) {
200 case 1:
201 rawKind = rawParts[0]
202 case 2:
203 rawKind = rawParts[0]
204 rawVG = rawParts[1]
205 }
206
207
208 switch strings.ToLower(rawKind) {
209
210 case "service", "services":
211 return "Service", "v1", nil
212 case "endpoints":
213 return "Endpoints", "v1", nil
214 case "secret", "secrets":
215 return "Secret", "v1", nil
216 case "configmap", "configmaps":
217 return "ConfigMap", "v1", nil
218 case "ingress", "ingresses":
219 if strings.HasSuffix(rawVG, ".knative.dev") {
220 return "Ingress", "networking.internal.knative.dev/v1alpha1", nil
221 }
222 return "Ingress", "networking.k8s.io/v1", nil
223 case "ingressclass", "ingressclasses":
224 return "IngressClass", "networking.k8s.io/v1", nil
225
226 case "gatewayclass", "gatewayclasses":
227 return "GatewayClass", "networking.x-k8s.io/v1alpha1", nil
228 case "gateway", "gateways":
229 return "Gateway", "networking.x-k8s.io/v1alpha1", nil
230 case "httproute", "httproutes":
231 return "HTTPRoute", "networking.x-k8s.io/v1alpha1", nil
232
233 case "clusteringress", "clusteringresses":
234 return "ClusterIngress", "networking.internal.knative.dev/v1alpha1", nil
235
236 case "authservice", "authservices":
237 return "AuthService", "getambassador.io/v3alpha1", nil
238 case "consulresolver", "consulresolvers":
239 return "ConsulResolver", "getambassador.io/v3alpha1", nil
240 case "devportal", "devportals":
241 return "DevPortal", "getambassador.io/v3alpha1", nil
242 case "host", "hosts":
243 return "Host", "getambassador.io/v3alpha1", nil
244 case "kubernetesendpointresolver", "kubernetesendpointresolvers":
245 return "KubernetesEndpointResolver", "getambassador.io/v3alpha1", nil
246 case "kubernetesserviceresolver", "kubernetesserviceresolvers":
247 return "KubernetesServiceResolver", "getambassador.io/v3alpha1", nil
248 case "listener", "listeners":
249 return "Listener", "getambassador.io/v3alpha1", nil
250 case "logservice", "logservices":
251 return "LogService", "getambassador.io/v3alpha1", nil
252 case "mapping", "mappings":
253 return "Mapping", "getambassador.io/v3alpha1", nil
254 case "module", "modules":
255 return "Module", "getambassador.io/v3alpha1", nil
256 case "ratelimitservice", "ratelimitservices":
257 return "RateLimitService", "getambassador.io/v3alpha1", nil
258 case "tcpmapping", "tcpmappings":
259 return "TCPMapping", "getambassador.io/v3alpha1", nil
260 case "tlscontext", "tlscontexts":
261 return "TLSContext", "getambassador.io/v3alpha1", nil
262 case "tracingservice", "tracingservices":
263 return "TracingService", "getambassador.io/v3alpha1", nil
264 case "filter", "filters":
265 return "Filter", "getambassador.io/v3alpha1", nil
266 case "filterpolicy", "filterpolicies":
267 return "Filterpolicy", "getambassador.io/v3alpha1", nil
268 default:
269 return "", "", fmt.Errorf("I don't know how to canonicalize kind: %q", rawString)
270 }
271 }
272
273 func canon(kind string) (string, error) {
274 canonKind, _, err := canonGVK(kind)
275 if err != nil {
276 return "", err
277 }
278 return canonKind, nil
279 }
280
View as plain text