...

Source file src/github.com/datawire/ambassador/v2/cmd/entrypoint/endpoint_routing.go

Documentation: github.com/datawire/ambassador/v2/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"net"
     7  
     8  	"github.com/datawire/ambassador/v2/pkg/ambex"
     9  	"github.com/datawire/ambassador/v2/pkg/consulwatch"
    10  	"github.com/datawire/ambassador/v2/pkg/kates"
    11  	"github.com/datawire/ambassador/v2/pkg/snapshot/v1"
    12  	"github.com/datawire/dlib/dlog"
    13  )
    14  
    15  func makeEndpoints(ctx context.Context, ksnap *snapshot.KubernetesSnapshot, consulEndpoints map[string]consulwatch.Endpoints) *ambex.Endpoints {
    16  	k8sServices := map[string]*kates.Service{}
    17  	for _, svc := range ksnap.Services {
    18  		k8sServices[key(svc)] = svc
    19  	}
    20  
    21  	result := map[string][]*ambex.Endpoint{}
    22  
    23  	for _, k8sEp := range ksnap.Endpoints {
    24  		svc, ok := k8sServices[key(k8sEp)]
    25  		if !ok {
    26  			continue
    27  		}
    28  		for _, ep := range k8sEndpointsToAmbex(k8sEp, svc) {
    29  			result[ep.ClusterName] = append(result[ep.ClusterName], ep)
    30  		}
    31  	}
    32  
    33  	for _, consulEp := range consulEndpoints {
    34  		for _, ep := range consulEndpointsToAmbex(ctx, consulEp) {
    35  			result[ep.ClusterName] = append(result[ep.ClusterName], ep)
    36  		}
    37  	}
    38  
    39  	return &ambex.Endpoints{Entries: result}
    40  }
    41  
    42  func key(resource kates.Object) string {
    43  	return fmt.Sprintf("%s:%s", resource.GetNamespace(), resource.GetName())
    44  }
    45  
    46  func k8sEndpointsToAmbex(ep *kates.Endpoints, svc *kates.Service) (result []*ambex.Endpoint) {
    47  	portmap := map[string][]string{}
    48  	for _, p := range svc.Spec.Ports {
    49  		port := fmt.Sprintf("%d", p.Port)
    50  		targetPort := p.TargetPort.String()
    51  		if targetPort == "" {
    52  			targetPort = fmt.Sprintf("%d", p.Port)
    53  		}
    54  
    55  		portmap[targetPort] = append(portmap[targetPort], port)
    56  		if p.Name != "" {
    57  			portmap[targetPort] = append(portmap[targetPort], p.Name)
    58  			portmap[p.Name] = append(portmap[p.Name], port)
    59  		}
    60  		if len(svc.Spec.Ports) == 1 {
    61  			portmap[targetPort] = append(portmap[targetPort], "")
    62  			portmap[""] = append(portmap[""], port)
    63  			portmap[""] = append(portmap[""], "")
    64  		}
    65  	}
    66  
    67  	for _, subset := range ep.Subsets {
    68  		for _, port := range subset.Ports {
    69  			if port.Protocol == kates.ProtocolTCP || port.Protocol == kates.ProtocolUDP {
    70  				portNames := map[string]bool{}
    71  				candidates := []string{fmt.Sprintf("%d", port.Port), port.Name, ""}
    72  				for _, c := range candidates {
    73  					if pns, ok := portmap[c]; ok {
    74  						for _, pn := range pns {
    75  							portNames[pn] = true
    76  						}
    77  					}
    78  				}
    79  				for _, addr := range subset.Addresses {
    80  					for pn := range portNames {
    81  						sep := "/"
    82  						if pn == "" {
    83  							sep = ""
    84  						}
    85  						result = append(result, &ambex.Endpoint{
    86  							ClusterName: fmt.Sprintf("k8s/%s/%s%s%s", ep.Namespace, ep.Name, sep, pn),
    87  							Ip:          addr.IP,
    88  							Port:        uint32(port.Port),
    89  							Protocol:    string(port.Protocol),
    90  						})
    91  					}
    92  				}
    93  			}
    94  		}
    95  	}
    96  
    97  	return
    98  }
    99  
   100  func consulEndpointsToAmbex(ctx context.Context, endpoints consulwatch.Endpoints) (result []*ambex.Endpoint) {
   101  	for _, ep := range endpoints.Endpoints {
   102  		addrs, err := net.LookupHost(ep.Address)
   103  		if err != nil {
   104  			dlog.Errorf(ctx, "error resolving consul address %s: %+v", ep.Address, err)
   105  			continue
   106  		}
   107  		for _, addr := range addrs {
   108  			result = append(result, &ambex.Endpoint{
   109  				ClusterName: fmt.Sprintf("consul/%s/%s", endpoints.Id, endpoints.Service),
   110  				Ip:          addr,
   111  				Port:        uint32(ep.Port),
   112  				Protocol:    "TCP",
   113  			})
   114  		}
   115  	}
   116  
   117  	return
   118  }
   119  

View as plain text