1 package entrypoint
2
3 import (
4 "context"
5 "fmt"
6 "net"
7
8 "github.com/datawire/ambassador/v2/pkg/ambex"
9 "github.com/datawire/ambassador/v2/pkg/consulwatch"
10 "github.com/datawire/ambassador/v2/pkg/kates"
11 "github.com/datawire/ambassador/v2/pkg/snapshot/v1"
12 "github.com/datawire/dlib/dlog"
13 )
14
15 func makeEndpoints(ctx context.Context, ksnap *snapshot.KubernetesSnapshot, consulEndpoints map[string]consulwatch.Endpoints) *ambex.Endpoints {
16 k8sServices := map[string]*kates.Service{}
17 for _, svc := range ksnap.Services {
18 k8sServices[key(svc)] = svc
19 }
20
21 result := map[string][]*ambex.Endpoint{}
22
23 for _, k8sEp := range ksnap.Endpoints {
24 svc, ok := k8sServices[key(k8sEp)]
25 if !ok {
26 continue
27 }
28 for _, ep := range k8sEndpointsToAmbex(k8sEp, svc) {
29 result[ep.ClusterName] = append(result[ep.ClusterName], ep)
30 }
31 }
32
33 for _, consulEp := range consulEndpoints {
34 for _, ep := range consulEndpointsToAmbex(ctx, consulEp) {
35 result[ep.ClusterName] = append(result[ep.ClusterName], ep)
36 }
37 }
38
39 return &ambex.Endpoints{Entries: result}
40 }
41
42 func key(resource kates.Object) string {
43 return fmt.Sprintf("%s:%s", resource.GetNamespace(), resource.GetName())
44 }
45
46 func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*ambex.Endpoint) {
47 portmap := map[string][]string{}
48 for _, p := range svc.Spec.Ports {
49 port := fmt.Sprintf("%d", p.Port)
50 targetPort := p.TargetPort.String()
51 if targetPort == "" {
52 targetPort = fmt.Sprintf("%d", p.Port)
53 }
54
55 portmap[targetPort] = append(portmap[targetPort], port)
56 if p.Name != "" {
57 portmap[targetPort] = append(portmap[targetPort], p.Name)
58 portmap[p.Name] = append(portmap[p.Name], port)
59 }
60 if len(svc.Spec.Ports) == 1 {
61 portmap[targetPort] = append(portmap[targetPort], "")
62 portmap[""] = append(portmap[""], port)
63 portmap[""] = append(portmap[""], "")
64 }
65 }
66
67 for _, subset := range ep.Subsets {
68 for _, port := range subset.Ports {
69 if port.Protocol == kates.ProtocolTCP || port.Protocol == kates.ProtocolUDP {
70 portNames := map[string]bool{}
71 candidates := []string{fmt.Sprintf("%d", port.Port), port.Name, ""}
72 for _, c := range candidates {
73 if pns, ok := portmap[c]; ok {
74 for _, pn := range pns {
75 portNames[pn] = true
76 }
77 }
78 }
79 for _, addr := range subset.Addresses {
80 for pn := range portNames {
81 sep := "/"
82 if pn == "" {
83 sep = ""
84 }
85 result = append(result, &ambex.Endpoint{
86 ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", ep.Namespace, ep.Name, sep, pn),
87 Ip: addr.IP,
88 Port: uint32(port.Port),
89 Protocol: string(port.Protocol),
90 })
91 }
92 }
93 }
94 }
95 }
96
97 return
98 }
99
100 func consulEndpointsToAmbex(ctx context.Context, endpoints consulwatch.Endpoints) (result []*ambex.Endpoint) {
101 for _, ep := range endpoints.Endpoints {
102 addrs, err := net.LookupHost(ep.Address)
103 if err != nil {
104 dlog.Errorf(ctx, "error resolving consul address %s: %+v", ep.Address, err)
105 continue
106 }
107 for _, addr := range addrs {
108 result = append(result, &ambex.Endpoint{
109 ClusterName: fmt.Sprintf("consul/%s/%s", endpoints.Id, endpoints.Service),
110 Ip: addr,
111 Port: uint32(ep.Port),
112 Protocol: "TCP",
113 })
114 }
115 }
116
117 return
118 }
119
View as plain text