1 package entrypoint
2
3 import (
4 "context"
5 "fmt"
6 "net"
7 "reflect"
8 "strconv"
9 "strings"
10
11 "github.com/datawire/dlib/dlog"
12 amb "github.com/emissary-ingress/emissary/v3/pkg/api/getambassador.io/v3alpha1"
13 "github.com/emissary-ingress/emissary/v3/pkg/kates"
14 snapshotTypes "github.com/emissary-ingress/emissary/v3/pkg/snapshot/v1"
15 )
16
17
18
19 type endpointRoutingInfo struct {
20
21 resolverTypes map[string]ResolverType
22 module moduleResolver
23 endpointWatches map[string]bool
24 previousWatches map[string]bool
25 }
26
27 type ResolverType int
28
29 const (
30 KubernetesServiceResolver ResolverType = iota
31 KubernetesEndpointResolver
32 ConsulResolver
33 )
34
35 func (rt ResolverType) String() string {
36 switch rt {
37 case KubernetesServiceResolver:
38 return "KubernetesServiceResolver"
39 case KubernetesEndpointResolver:
40 return "KubernetesEndpointResolver"
41 case ConsulResolver:
42 return "ConsulResolver"
43 default:
44 panic(fmt.Errorf("ResolverType.String: invalid enum value: %d", rt))
45 }
46 }
47
48
49
50 func newEndpointRoutingInfo() endpointRoutingInfo {
51 return endpointRoutingInfo{
52
53
54
55
56
57
58
59 resolverTypes: make(map[string]ResolverType),
60
61 endpointWatches: make(map[string]bool),
62 }
63 }
64
65 func (eri *endpointRoutingInfo) reconcileEndpointWatches(ctx context.Context, s *snapshotTypes.KubernetesSnapshot) {
66 envAmbID := GetAmbassadorID()
67
68
69
70 eri.resolverTypes = map[string]ResolverType{}
71 eri.module = moduleResolver{}
72 eri.previousWatches = eri.endpointWatches
73 eri.endpointWatches = map[string]bool{}
74
75
76
77
78 for _, list := range s.Annotations {
79 for _, a := range list {
80 if _, isInvalid := a.(*kates.Unstructured); isInvalid {
81 continue
82 }
83 if GetAmbID(ctx, a).Matches(envAmbID) {
84 eri.checkResourcePhase1(ctx, a, "annotation")
85 }
86 }
87 }
88
89
90
91
92
93 for _, m := range s.Modules {
94 if m.Spec.AmbassadorID.Matches(envAmbID) {
95 eri.checkModule(ctx, m, "CRD")
96 }
97 }
98
99 for _, r := range s.KubernetesServiceResolvers {
100 if r.Spec.AmbassadorID.Matches(envAmbID) {
101 eri.saveResolver(ctx, r.GetName(), KubernetesServiceResolver, "CRD")
102 }
103 }
104
105 for _, r := range s.KubernetesEndpointResolvers {
106 if r.Spec.AmbassadorID.Matches(envAmbID) {
107 eri.saveResolver(ctx, r.GetName(), KubernetesEndpointResolver, "CRD")
108 }
109 }
110
111 for _, r := range s.ConsulResolvers {
112 if r.Spec.AmbassadorID.Matches(envAmbID) {
113 eri.saveResolver(ctx, r.GetName(), ConsulResolver, "CRD")
114 }
115 }
116
117
118
119 for _, rName := range []string{"endpoint", "kubernetes-endpoint"} {
120 _, found := eri.resolverTypes[rName]
121
122 if !found {
123 dlog.Debugf(ctx, "WATCHER: endpoint resolver %s exists by default", rName)
124 eri.resolverTypes[rName] = KubernetesEndpointResolver
125 }
126 }
127
128 for _, list := range s.Annotations {
129 for _, a := range list {
130 if _, isInvalid := a.(*kates.Unstructured); isInvalid {
131 continue
132 }
133 if GetAmbID(ctx, a).Matches(envAmbID) {
134 eri.checkResourcePhase2(ctx, a, "annotation")
135 }
136 }
137 }
138
139 for _, m := range s.Mappings {
140 if m.Spec.AmbassadorID.Matches(envAmbID) {
141 eri.checkMapping(ctx, m, "CRD")
142 }
143 }
144
145 for _, t := range s.TCPMappings {
146 if t.Spec.AmbassadorID.Matches(envAmbID) {
147 eri.checkTCPMapping(ctx, t, "CRD")
148 }
149 }
150 }
151
152 func (eri *endpointRoutingInfo) watchesChanged() bool {
153 return !reflect.DeepEqual(eri.endpointWatches, eri.previousWatches)
154 }
155
156
157 func (eri *endpointRoutingInfo) checkResourcePhase1(ctx context.Context, obj kates.Object, source string) {
158 switch v := obj.(type) {
159 case *amb.Module:
160 eri.checkModule(ctx, v, source)
161 case *amb.KubernetesServiceResolver:
162 eri.saveResolver(ctx, v.GetName(), KubernetesServiceResolver, "CRD")
163 case *amb.KubernetesEndpointResolver:
164 eri.saveResolver(ctx, v.GetName(), KubernetesEndpointResolver, "CRD")
165 case *amb.ConsulResolver:
166 eri.saveResolver(ctx, v.GetName(), ConsulResolver, "CRD")
167 }
168 }
169
170
171 func (eri *endpointRoutingInfo) checkResourcePhase2(ctx context.Context, obj kates.Object, source string) {
172 switch v := obj.(type) {
173 case *amb.Mapping:
174 eri.checkMapping(ctx, v, source)
175 case *amb.TCPMapping:
176 eri.checkTCPMapping(ctx, v, source)
177 }
178 }
179
180 type moduleResolver struct {
181 Resolver string `json:"resolver"`
182 UseAmbassadorNamespaceForServiceResolution bool `json:"use_ambassador_namespace_for_service_resolution"`
183 }
184
185
186 func (eri *endpointRoutingInfo) checkModule(ctx context.Context, mod *amb.Module, source string) {
187 if mod.GetName() != "ambassador" {
188 return
189 }
190
191 mr := moduleResolver{}
192 err := convert(mod.Spec.Config, &mr)
193
194 if err != nil {
195 dlog.Errorf(ctx, "error parsing ambassador module: %v", err)
196 return
197 }
198
199
200 if mr.Resolver == "" {
201 mr.Resolver = "kubernetes-service"
202 }
203
204 eri.module = mr
205 }
206
207
208
209 func (eri *endpointRoutingInfo) saveResolver(ctx context.Context, name string, resType ResolverType, source string) {
210
211 eri.resolverTypes[name] = resType
212
213 dlog.Debugf(ctx, "WATCHER: %s resolver %s is active (%s)", resType.String(), name, source)
214 }
215
216
217 func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.Mapping, source string) {
218
219 name := mapping.GetName()
220 resolver := mapping.Spec.Resolver
221 service := mapping.Spec.Service
222
223 if resolver == "" {
224
225 resolver = eri.module.Resolver
226 dlog.Debugf(ctx, "WATCHER: Mapping %s uses the default resolver (%s)", name, source)
227 }
228
229 if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
230 svc, ns, _ := eri.module.parseService(ctx, mapping, service, mapping.GetNamespace())
231 eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
232 }
233 }
234
235
236 func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping *amb.TCPMapping, source string) {
237
238 name := tcpmapping.GetName()
239 resolver := tcpmapping.Spec.Resolver
240 service := tcpmapping.Spec.Service
241
242 if resolver == "" {
243
244 dlog.Debugf(ctx, "WATCHER: TCPMapping %s uses the default resolver (%s)", name, source)
245 resolver = eri.module.Resolver
246 }
247
248 if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
249 svc, ns, _ := eri.module.parseService(ctx, tcpmapping, service, tcpmapping.GetNamespace())
250 eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
251 }
252 }
253
254 func (m *moduleResolver) parseService(ctx context.Context, resource kates.Object, svcName, svcNamespace string) (name string, namespace string, port string) {
255
256 parts := strings.SplitN(svcName, "://", 2)
257 if len(parts) > 1 {
258 svcName = parts[1]
259 }
260
261
262 parts = strings.SplitN(svcName, ":", 2)
263 if len(parts) > 1 {
264 _, err := strconv.Atoi(parts[1])
265 if err == nil {
266 port = parts[1]
267 svcName = parts[0]
268 }
269 }
270
271
272 ip := net.ParseIP(svcName)
273 if ip != nil {
274 name = svcName
275 } else if strings.Contains(svcName, ".") {
276
277 parts := strings.Split(svcName, ".")
278 if len(parts) > 2 {
279 using := strings.Join(parts[:2], ".")
280 dlog.Errorf(ctx, "mapping %s in namespace %s: ignoring extra domain parts in service, using %q",
281 resource.GetName(), resource.GetNamespace(), using)
282 }
283 name = parts[0]
284 namespace = parts[1]
285 return
286 } else {
287 name = svcName
288 }
289
290 if m.UseAmbassadorNamespaceForServiceResolution || svcNamespace == "" {
291 namespace = GetAmbassadorNamespace()
292 } else {
293 namespace = svcNamespace
294 }
295
296 return
297 }
298
View as plain text