const ( // PodIPIndex is the key for the index based on Pod IPs PodIPIndex = "ip" // HostIPIndex is the key for the index based on Host IP of pods with host network enabled HostIPIndex = "hostIP" // ExternalWorkloadIPIndex is the key for the index based on IP of externalworkloads ExternalWorkloadIPIndex = "externalWorkloadIP" )
func CreateMockDecoder(configs ...string) configDecoder
func GetAnnotatedOpaquePorts(pod *corev1.Pod, defaultPorts map[uint32]struct{}) map[uint32]struct{}
GetAnnotatedOpaquePorts returns the opaque ports for the pod given its annotations, or the default opaque ports if it's not annotated
func GetAnnotatedOpaquePortsForExternalWorkload(ew *ext.ExternalWorkload, defaultPorts map[uint32]struct{}) map[uint32]struct{}
GetAnnotatedOpaquePortsForExternalWorkload returns the opaque ports for the external workload given its annotations, or the default opaque ports if it's not annotated
func InitializeIndexers(k8sAPI *k8s.API) error
InitializeIndexers is used to initialize indexers on k8s informers, to be used across watchers
func SetToServerProtocol(k8sAPI *k8s.API, address *Address) error
SetToServerProtocol sets the address's OpaqueProtocol field based off any Servers that select it and override the expected protocol.
func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address) error
setToServerProtocolExternalWorkload sets the address's OpaqueProtocol field based off any Servers that select it and override the expected protocol for ExternalWorkloads.
Address represents an individual port on a specific endpoint. This endpoint might be the result of a the existence of a pod that is targeted by this service; alternatively it can be the case that this endpoint is not associated with a pod and maps to some other IP (i.e. a remote service gateway)
type Address struct { IP string Port Port Pod *corev1.Pod ExternalWorkload *ewv1beta1.ExternalWorkload OwnerName string OwnerKind string Identity string AuthorityOverride string Zone *string ForZones []discovery.ForZone OpaqueProtocol bool }
AddressSet is a set of Address, indexed by ID. The ID can be either: 1) A reference to service: id.Name contains both the service name and the target IP and port (see newServiceRefAddress) 2) A reference to a pod: id.Name refers to the pod's name, and id.IPFamily refers to the ES AddressType (see newPodRefAddress). 3) A reference to an ExternalWorkload: id.Name refers to the EW's name.
type AddressSet struct { Addresses map[ID]Address Labels map[string]string LocalTrafficPolicy bool }
BufferingProfileListener implements ProfileUpdateListener and stores updates in a slice. Useful for unit tests.
type BufferingProfileListener struct { Profiles []*sp.ServiceProfile // contains filtered or unexported fields }
func NewBufferingProfileListener() *BufferingProfileListener
NewBufferingProfileListener creates a new BufferingProfileListener.
func (bpl *BufferingProfileListener) Update(profile *sp.ServiceProfile)
Update stores the update in the internal buffer.
ClusterStore indexes clusters in which remote service discovery may be performed. For each store item, an EndpointsWatcher is created to read state directly from the respective cluster's API Server. In addition, each store item has some associated and immutable configuration that is required for service discovery.
type ClusterStore struct { // Protects against illegal accesses sync.RWMutex // contains filtered or unexported fields }
func NewClusterStore(client kubernetes.Interface, namespace string, enableEndpointSlices bool) (*ClusterStore, error)
NewClusterStore creates a new (empty) ClusterStore. It requires a Kubernetes API Server client instantiated for the local cluster.
When created, a pair of event handlers are registered for the local cluster's Secret informer. The event handlers are responsible for driving the discovery of remote clusters and their configuration
func NewClusterStoreWithDecoder(client kubernetes.Interface, namespace string, enableEndpointSlices bool, decodeFn configDecoder) (*ClusterStore, error)
newClusterStoreWithDecoder is a helper function that allows the creation of a store with an arbitrary `configDecoder` function.
func (cs *ClusterStore) Get(clusterName string) (*EndpointsWatcher, clusterConfig, bool)
Get safely retrieves a store item given a cluster name.
func (cs *ClusterStore) Sync(stopCh <-chan struct{})
func (cs *ClusterStore) UnregisterGauges()
DeletingProfileListener implements ProfileUpdateListener and registers deletions. Useful for unit testing
type DeletingProfileListener struct { NumDeletes int }
func NewDeletingProfileListener() *DeletingProfileListener
NewDeletingProfileListener creates a new NewDeletingProfileListener.
func (dpl *DeletingProfileListener) Update(profile *sp.ServiceProfile)
Update registers a deletion
EndpointUpdateListener is the interface that subscribers must implement.
type EndpointUpdateListener interface { Add(set AddressSet) Remove(set AddressSet) NoEndpoints(exists bool) }
EndpointsWatcher watches all endpoints and services in the Kubernetes cluster. Listeners can subscribe to a particular service and port and EndpointsWatcher will publish the address set and all future changes for that service:port.
type EndpointsWatcher struct { sync.RWMutex // This mutex protects modification of the map itself. // contains filtered or unexported fields }
func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, cluster string) (*EndpointsWatcher, error)
NewEndpointsWatcher creates an EndpointsWatcher and begins watching the k8sAPI for pod, service, and endpoint changes. An EndpointsWatcher will watch on Endpoints or EndpointSlice resources, depending on cluster configuration.
func (ew *EndpointsWatcher) Subscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener) error
Subscribe to an authority. The provided listener will be updated each time the address set for the given authority is changed.
func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener)
Unsubscribe removes a listener from the subscribers list for this authority.
PodID is the namespace-qualified name of an ExternalWorkload.
type ExternalWorkloadID = ID
ID is a namespace-qualified name.
type ID struct { Namespace string Name string // Only used for PodID IPFamily corev1.IPFamily }
func (i ID) String() string
IPPort holds the IP and port for some destination
type IPPort struct { IP string Port Port }
InvalidService is an error which indicates that the authority is not a valid service.
type InvalidService struct {
// contains filtered or unexported fields
}
func (is InvalidService) Error() string
OpaquePortsUpdateListener is the interface that subscribers must implement.
type OpaquePortsUpdateListener interface { UpdateService(ports map[uint32]struct{}) }
OpaquePortsWatcher watches all the services in the cluster. If the opaque ports annotation is added to a service, the watcher will update listeners—if any—subscribed to that service.
type OpaquePortsWatcher struct { sync.RWMutex // contains filtered or unexported fields }
func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error)
NewOpaquePortsWatcher creates a OpaquePortsWatcher and begins watching for k8sAPI for service changes.
func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdateListener) error
Subscribe subscribes a listener to a service; each time the service changes, the listener will be updated if the list of opaque ports changes.
func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpdateListener)
Unsubscribe unsubscribes a listener from service.
PodID is the namespace-qualified name of a pod.
type PodID = ID
Port is a numeric port.
type Port = uint32
ProfileID is the namespace-qualified name of a service profile.
type ProfileID = ID
ProfileUpdateListener is the interface that subscribers must implement.
type ProfileUpdateListener interface { Update(profile *sp.ServiceProfile) }
ProfileWatcher watches all service profiles in the Kubernetes cluster. Listeners can subscribe to a particular profile and profileWatcher will publish the service profile and all future changes for that profile.
type ProfileWatcher struct { sync.RWMutex // This mutex protects modification of the map itself. // contains filtered or unexported fields }
func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) (*ProfileWatcher, error)
NewProfileWatcher creates a ProfileWatcher and begins watching the k8sAPI for service profile changes.
func (pw *ProfileWatcher) Subscribe(id ProfileID, listener ProfileUpdateListener) error
Subscribe to an authority. The provided listener will be updated each time the service profile for the given authority is changed.
func (pw *ProfileWatcher) Unsubscribe(id ProfileID, listener ProfileUpdateListener)
Unsubscribe removes a listener from the subscribers list for this authority.
ServiceID is the namespace-qualified name of a service.
type ServiceID = ID
func (id ServiceID) Labels() prometheus.Labels
Labels returns the labels for prometheus metrics associated to the service
PodUpdateListener is the interface subscribers must implement.
type WorkloadUpdateListener interface { Update(*Address) error }
WorkloadWatcher watches all pods and externalworkloads in the cluster. It keeps a map of publishers keyed by IP and port.
type WorkloadWatcher struct {
// contains filtered or unexported fields
}
func NewWorkloadWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, defaultOpaquePorts map[uint32]struct{}) (*WorkloadWatcher, error)
func (ww *WorkloadWatcher) Subscribe(service *ServiceID, hostname, ip string, port Port, listener WorkloadUpdateListener) (string, error)
Subscribe notifies the listener on changes on any workload backing the passed host/ip:port or changes to its associated opaque protocol config. If service and hostname are empty then ip should be set and vice-versa. If ip is empty, the corresponding ip is found for the given service/hostname, and returned.
func (ww *WorkloadWatcher) Unsubscribe(ip string, port Port, listener WorkloadUpdateListener)
Subscribe stops notifying the listener on chages on any pod backing the passed ip:port or its associated protocol config