1 package entrypoint
2
3 import (
4 "context"
5 "fmt"
6 "net"
7 "reflect"
8 "strconv"
9 "strings"
10
11 amb "github.com/datawire/ambassador/v2/pkg/api/getambassador.io/v3alpha1"
12 "github.com/datawire/ambassador/v2/pkg/kates"
13 snapshotTypes "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
14 "github.com/datawire/dlib/dlog"
15 )
16
17
18
19 type endpointRoutingInfo struct {
20
21 resolverTypes map[string]ResolverType
22 module moduleResolver
23 endpointWatches map[string]bool
24 previousWatches map[string]bool
25 }
26
27 type ResolverType int
28
29 const (
30 KubernetesServiceResolver ResolverType = iota
31 KubernetesEndpointResolver
32 ConsulResolver
33 )
34
35 func (rt ResolverType) String() string {
36 switch rt {
37 case KubernetesServiceResolver:
38 return "KubernetesServiceResolver"
39 case KubernetesEndpointResolver:
40 return "KubernetesEndpointResolver"
41 case ConsulResolver:
42 return "ConsulResolver"
43 default:
44 panic(fmt.Errorf("ResolverType.String: invalid enum value: %d", rt))
45 }
46 }
47
48
49
50 func newEndpointRoutingInfo() endpointRoutingInfo {
51 return endpointRoutingInfo{
52
53
54
55
56
57
58
59 resolverTypes: make(map[string]ResolverType),
60
61 endpointWatches: make(map[string]bool),
62 }
63 }
64
65 func (eri *endpointRoutingInfo) reconcileEndpointWatches(ctx context.Context, s *snapshotTypes.KubernetesSnapshot) {
66
67
68 eri.resolverTypes = map[string]ResolverType{}
69 eri.module = moduleResolver{}
70 eri.previousWatches = eri.endpointWatches
71 eri.endpointWatches = map[string]bool{}
72
73
74
75
76 for _, list := range s.Annotations {
77 for _, a := range list {
78 if _, isInvalid := a.(*kates.Unstructured); isInvalid {
79 continue
80 }
81 if include(GetAmbID(ctx, a)) {
82 eri.checkResourcePhase1(ctx, a, "annotation")
83 }
84 }
85 }
86
87
88
89
90
91 for _, m := range s.Modules {
92 if include(m.Spec.AmbassadorID) {
93 eri.checkModule(ctx, m, "CRD")
94 }
95 }
96
97 for _, r := range s.KubernetesServiceResolvers {
98 if include(r.Spec.AmbassadorID) {
99 eri.saveResolver(ctx, r.GetName(), KubernetesServiceResolver, "CRD")
100 }
101 }
102
103 for _, r := range s.KubernetesEndpointResolvers {
104 if include(r.Spec.AmbassadorID) {
105 eri.saveResolver(ctx, r.GetName(), KubernetesEndpointResolver, "CRD")
106 }
107 }
108
109 for _, r := range s.ConsulResolvers {
110 if include(r.Spec.AmbassadorID) {
111 eri.saveResolver(ctx, r.GetName(), ConsulResolver, "CRD")
112 }
113 }
114
115
116
117 for _, rName := range []string{"endpoint", "kubernetes-endpoint"} {
118 _, found := eri.resolverTypes[rName]
119
120 if !found {
121 dlog.Debugf(ctx, "WATCHER: endpoint resolver %s exists by default", rName)
122 eri.resolverTypes[rName] = KubernetesEndpointResolver
123 }
124 }
125
126 for _, list := range s.Annotations {
127 for _, a := range list {
128 if _, isInvalid := a.(*kates.Unstructured); isInvalid {
129 continue
130 }
131 if include(GetAmbID(ctx, a)) {
132 eri.checkResourcePhase2(ctx, a, "annotation")
133 }
134 }
135 }
136
137 for _, m := range s.Mappings {
138 if include(m.Spec.AmbassadorID) {
139 eri.checkMapping(ctx, m, "CRD")
140 }
141 }
142
143 for _, t := range s.TCPMappings {
144 if include(t.Spec.AmbassadorID) {
145 eri.checkTCPMapping(ctx, t, "CRD")
146 }
147 }
148 }
149
150 func (eri *endpointRoutingInfo) watchesChanged() bool {
151 return !reflect.DeepEqual(eri.endpointWatches, eri.previousWatches)
152 }
153
154
155 func (eri *endpointRoutingInfo) checkResourcePhase1(ctx context.Context, obj kates.Object, source string) {
156 switch v := obj.(type) {
157 case *amb.Module:
158 eri.checkModule(ctx, v, source)
159 case *amb.KubernetesServiceResolver:
160 eri.saveResolver(ctx, v.GetName(), KubernetesServiceResolver, "CRD")
161 case *amb.KubernetesEndpointResolver:
162 eri.saveResolver(ctx, v.GetName(), KubernetesEndpointResolver, "CRD")
163 case *amb.ConsulResolver:
164 eri.saveResolver(ctx, v.GetName(), ConsulResolver, "CRD")
165 }
166 }
167
168
169 func (eri *endpointRoutingInfo) checkResourcePhase2(ctx context.Context, obj kates.Object, source string) {
170 switch v := obj.(type) {
171 case *amb.Mapping:
172 eri.checkMapping(ctx, v, source)
173 case *amb.TCPMapping:
174 eri.checkTCPMapping(ctx, v, source)
175 }
176 }
177
178 type moduleResolver struct {
179 Resolver string `json:"resolver"`
180 UseAmbassadorNamespaceForServiceResolution bool `json:"use_ambassador_namespace_for_service_resolution"`
181 }
182
183
184 func (eri *endpointRoutingInfo) checkModule(ctx context.Context, mod *amb.Module, source string) {
185 if mod.GetName() != "ambassador" {
186 return
187 }
188
189 mr := moduleResolver{}
190 err := convert(mod.Spec.Config, &mr)
191
192 if err != nil {
193 dlog.Errorf(ctx, "error parsing ambassador module: %v", err)
194 return
195 }
196
197
198 if mr.Resolver == "" {
199 mr.Resolver = "kubernetes-service"
200 }
201
202 eri.module = mr
203 }
204
205
206
207 func (eri *endpointRoutingInfo) saveResolver(ctx context.Context, name string, resType ResolverType, source string) {
208
209 eri.resolverTypes[name] = resType
210
211 dlog.Debugf(ctx, "WATCHER: %s resolver %s is active (%s)", resType.String(), name, source)
212 }
213
214
215 func (eri *endpointRoutingInfo) checkMapping(ctx context.Context, mapping *amb.Mapping, source string) {
216
217 name := mapping.GetName()
218 resolver := mapping.Spec.Resolver
219 service := mapping.Spec.Service
220
221 if resolver == "" {
222
223 resolver = eri.module.Resolver
224 dlog.Debugf(ctx, "WATCHER: Mapping %s uses the default resolver (%s)", name, source)
225 }
226
227 if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
228 svc, ns, _ := eri.module.parseService(ctx, mapping, service, mapping.GetNamespace())
229 eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
230 }
231 }
232
233
234 func (eri *endpointRoutingInfo) checkTCPMapping(ctx context.Context, tcpmapping *amb.TCPMapping, source string) {
235
236 name := tcpmapping.GetName()
237 resolver := tcpmapping.Spec.Resolver
238 service := tcpmapping.Spec.Service
239
240 if resolver == "" {
241
242 dlog.Debugf(ctx, "WATCHER: TCPMapping %s uses the default resolver (%s)", name, source)
243 resolver = eri.module.Resolver
244 }
245
246 if eri.resolverTypes[resolver] == KubernetesEndpointResolver {
247 svc, ns, _ := eri.module.parseService(ctx, tcpmapping, service, tcpmapping.GetNamespace())
248 eri.endpointWatches[fmt.Sprintf("%s:%s", ns, svc)] = true
249 }
250 }
251
252 func (m *moduleResolver) parseService(ctx context.Context, resource kates.Object, svcName, svcNamespace string) (name string, namespace string, port string) {
253
254 parts := strings.SplitN(svcName, "://", 2)
255 if len(parts) > 1 {
256 svcName = parts[1]
257 }
258
259
260 parts = strings.SplitN(svcName, ":", 2)
261 if len(parts) > 1 {
262 _, err := strconv.Atoi(parts[1])
263 if err == nil {
264 port = parts[1]
265 svcName = parts[0]
266 }
267 }
268
269
270 ip := net.ParseIP(svcName)
271 if ip != nil {
272 name = svcName
273 } else if strings.Contains(svcName, ".") {
274
275 parts := strings.Split(svcName, ".")
276 if len(parts) > 2 {
277 using := strings.Join(parts[:2], ".")
278 dlog.Errorf(ctx, "mapping %s in namespace %s: ignoring extra domain parts in service, using %q",
279 resource.GetName(), resource.GetNamespace(), using)
280 }
281 name = parts[0]
282 namespace = parts[1]
283 return
284 } else {
285 name = svcName
286 }
287
288 if m.UseAmbassadorNamespaceForServiceResolution || svcNamespace == "" {
289 namespace = GetAmbassadorNamespace()
290 } else {
291 namespace = svcNamespace
292 }
293
294 return
295 }
296
View as plain text