...

Package watcher

import "github.com/linkerd/linkerd2/controller/api/destination/watcher"
Overview
Index

Overview ▾

Index ▾

Constants
func CreateMockDecoder(configs ...string) configDecoder
func GetAnnotatedOpaquePorts(pod *corev1.Pod, defaultPorts map[uint32]struct{}) map[uint32]struct{}
func GetAnnotatedOpaquePortsForExternalWorkload(ew *ext.ExternalWorkload, defaultPorts map[uint32]struct{}) map[uint32]struct{}
func InitializeIndexers(k8sAPI *k8s.API) error
func SetToServerProtocol(k8sAPI *k8s.API, address *Address) error
func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address) error
type Address
type AddressSet
type BufferingProfileListener
    func NewBufferingProfileListener() *BufferingProfileListener
    func (bpl *BufferingProfileListener) Update(profile *sp.ServiceProfile)
type ClusterStore
    func NewClusterStore(client kubernetes.Interface, namespace string, enableEndpointSlices bool) (*ClusterStore, error)
    func NewClusterStoreWithDecoder(client kubernetes.Interface, namespace string, enableEndpointSlices bool, decodeFn configDecoder) (*ClusterStore, error)
    func (cs *ClusterStore) Get(clusterName string) (*EndpointsWatcher, clusterConfig, bool)
    func (cs *ClusterStore) Sync(stopCh <-chan struct{})
    func (cs *ClusterStore) UnregisterGauges()
type DeletingProfileListener
    func NewDeletingProfileListener() *DeletingProfileListener
    func (dpl *DeletingProfileListener) Update(profile *sp.ServiceProfile)
type EndpointUpdateListener
type EndpointsWatcher
    func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, cluster string) (*EndpointsWatcher, error)
    func (ew *EndpointsWatcher) Subscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener) error
    func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener)
type ExternalWorkloadID
type ID
    func (i ID) String() string
type IPPort
type InvalidService
    func (is InvalidService) Error() string
type OpaquePortsUpdateListener
type OpaquePortsWatcher
    func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error)
    func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdateListener) error
    func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpdateListener)
type PodID
type Port
type ProfileID
type ProfileUpdateListener
type ProfileWatcher
    func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) (*ProfileWatcher, error)
    func (pw *ProfileWatcher) Subscribe(id ProfileID, listener ProfileUpdateListener) error
    func (pw *ProfileWatcher) Unsubscribe(id ProfileID, listener ProfileUpdateListener)
type ServiceID
    func (id ServiceID) Labels() prometheus.Labels
type WorkloadUpdateListener
type WorkloadWatcher
    func NewWorkloadWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, defaultOpaquePorts map[uint32]struct{}) (*WorkloadWatcher, error)
    func (ww *WorkloadWatcher) Subscribe(service *ServiceID, hostname, ip string, port Port, listener WorkloadUpdateListener) (string, error)
    func (ww *WorkloadWatcher) Unsubscribe(ip string, port Port, listener WorkloadUpdateListener)

Package files

cluster_store.go endpoints_watcher.go k8s.go opaque_ports_watcher.go profile_watcher.go prometheus.go test_util.go workload_watcher.go

Constants

const (
    // PodIPIndex is the key for the index based on Pod IPs
    PodIPIndex = "ip"
    // HostIPIndex is the key for the index based on Host IP of pods with host network enabled
    HostIPIndex = "hostIP"
    // ExternalWorkloadIPIndex is the key for the index based on IP of externalworkloads
    ExternalWorkloadIPIndex = "externalWorkloadIP"
)

func CreateMockDecoder

func CreateMockDecoder(configs ...string) configDecoder

func GetAnnotatedOpaquePorts

func GetAnnotatedOpaquePorts(pod *corev1.Pod, defaultPorts map[uint32]struct{}) map[uint32]struct{}

GetAnnotatedOpaquePorts returns the opaque ports for the pod given its annotations, or the default opaque ports if it's not annotated

func GetAnnotatedOpaquePortsForExternalWorkload

func GetAnnotatedOpaquePortsForExternalWorkload(ew *ext.ExternalWorkload, defaultPorts map[uint32]struct{}) map[uint32]struct{}

GetAnnotatedOpaquePortsForExternalWorkload returns the opaque ports for the external workload given its annotations, or the default opaque ports if it's not annotated

func InitializeIndexers

func InitializeIndexers(k8sAPI *k8s.API) error

InitializeIndexers is used to initialize indexers on k8s informers, to be used across watchers

func SetToServerProtocol

func SetToServerProtocol(k8sAPI *k8s.API, address *Address) error

SetToServerProtocol sets the address's OpaqueProtocol field based off any Servers that select it and override the expected protocol.

func SetToServerProtocolExternalWorkload

func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address) error

setToServerProtocolExternalWorkload sets the address's OpaqueProtocol field based off any Servers that select it and override the expected protocol for ExternalWorkloads.

type Address

Address represents an individual port on a specific endpoint. This endpoint might be the result of a the existence of a pod that is targeted by this service; alternatively it can be the case that this endpoint is not associated with a pod and maps to some other IP (i.e. a remote service gateway)

type Address struct {
    IP                string
    Port              Port
    Pod               *corev1.Pod
    ExternalWorkload  *ewv1beta1.ExternalWorkload
    OwnerName         string
    OwnerKind         string
    Identity          string
    AuthorityOverride string
    Zone              *string
    ForZones          []discovery.ForZone
    OpaqueProtocol    bool
}

type AddressSet

AddressSet is a set of Address, indexed by ID. The ID can be either: 1) A reference to service: id.Name contains both the service name and the target IP and port (see newServiceRefAddress) 2) A reference to a pod: id.Name refers to the pod's name, and id.IPFamily refers to the ES AddressType (see newPodRefAddress). 3) A reference to an ExternalWorkload: id.Name refers to the EW's name.

type AddressSet struct {
    Addresses          map[ID]Address
    Labels             map[string]string
    LocalTrafficPolicy bool
}

type BufferingProfileListener

BufferingProfileListener implements ProfileUpdateListener and stores updates in a slice. Useful for unit tests.

type BufferingProfileListener struct {
    Profiles []*sp.ServiceProfile
    // contains filtered or unexported fields
}

func NewBufferingProfileListener

func NewBufferingProfileListener() *BufferingProfileListener

NewBufferingProfileListener creates a new BufferingProfileListener.

func (*BufferingProfileListener) Update

func (bpl *BufferingProfileListener) Update(profile *sp.ServiceProfile)

Update stores the update in the internal buffer.

type ClusterStore

ClusterStore indexes clusters in which remote service discovery may be performed. For each store item, an EndpointsWatcher is created to read state directly from the respective cluster's API Server. In addition, each store item has some associated and immutable configuration that is required for service discovery.

type ClusterStore struct {
    // Protects against illegal accesses
    sync.RWMutex
    // contains filtered or unexported fields
}

func NewClusterStore

func NewClusterStore(client kubernetes.Interface, namespace string, enableEndpointSlices bool) (*ClusterStore, error)

NewClusterStore creates a new (empty) ClusterStore. It requires a Kubernetes API Server client instantiated for the local cluster.

When created, a pair of event handlers are registered for the local cluster's Secret informer. The event handlers are responsible for driving the discovery of remote clusters and their configuration

func NewClusterStoreWithDecoder

func NewClusterStoreWithDecoder(client kubernetes.Interface, namespace string, enableEndpointSlices bool, decodeFn configDecoder) (*ClusterStore, error)

newClusterStoreWithDecoder is a helper function that allows the creation of a store with an arbitrary `configDecoder` function.

func (*ClusterStore) Get

func (cs *ClusterStore) Get(clusterName string) (*EndpointsWatcher, clusterConfig, bool)

Get safely retrieves a store item given a cluster name.

func (*ClusterStore) Sync

func (cs *ClusterStore) Sync(stopCh <-chan struct{})

func (*ClusterStore) UnregisterGauges

func (cs *ClusterStore) UnregisterGauges()

type DeletingProfileListener

DeletingProfileListener implements ProfileUpdateListener and registers deletions. Useful for unit testing

type DeletingProfileListener struct {
    NumDeletes int
}

func NewDeletingProfileListener

func NewDeletingProfileListener() *DeletingProfileListener

NewDeletingProfileListener creates a new NewDeletingProfileListener.

func (*DeletingProfileListener) Update

func (dpl *DeletingProfileListener) Update(profile *sp.ServiceProfile)

Update registers a deletion

type EndpointUpdateListener

EndpointUpdateListener is the interface that subscribers must implement.

type EndpointUpdateListener interface {
    Add(set AddressSet)
    Remove(set AddressSet)
    NoEndpoints(exists bool)
}

type EndpointsWatcher

EndpointsWatcher watches all endpoints and services in the Kubernetes cluster. Listeners can subscribe to a particular service and port and EndpointsWatcher will publish the address set and all future changes for that service:port.

type EndpointsWatcher struct {
    sync.RWMutex // This mutex protects modification of the map itself.
    // contains filtered or unexported fields
}

func NewEndpointsWatcher

func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, cluster string) (*EndpointsWatcher, error)

NewEndpointsWatcher creates an EndpointsWatcher and begins watching the k8sAPI for pod, service, and endpoint changes. An EndpointsWatcher will watch on Endpoints or EndpointSlice resources, depending on cluster configuration.

func (*EndpointsWatcher) Subscribe

func (ew *EndpointsWatcher) Subscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener) error

Subscribe to an authority. The provided listener will be updated each time the address set for the given authority is changed.

func (*EndpointsWatcher) Unsubscribe

func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener)

Unsubscribe removes a listener from the subscribers list for this authority.

type ExternalWorkloadID

PodID is the namespace-qualified name of an ExternalWorkload.

type ExternalWorkloadID = ID

type ID

ID is a namespace-qualified name.

type ID struct {
    Namespace string
    Name      string

    // Only used for PodID
    IPFamily corev1.IPFamily
}

func (ID) String

func (i ID) String() string

type IPPort

IPPort holds the IP and port for some destination

type IPPort struct {
    IP   string
    Port Port
}

type InvalidService

InvalidService is an error which indicates that the authority is not a valid service.

type InvalidService struct {
    // contains filtered or unexported fields
}

func (InvalidService) Error

func (is InvalidService) Error() string

type OpaquePortsUpdateListener

OpaquePortsUpdateListener is the interface that subscribers must implement.

type OpaquePortsUpdateListener interface {
    UpdateService(ports map[uint32]struct{})
}

type OpaquePortsWatcher

OpaquePortsWatcher watches all the services in the cluster. If the opaque ports annotation is added to a service, the watcher will update listeners—if any—subscribed to that service.

type OpaquePortsWatcher struct {
    sync.RWMutex
    // contains filtered or unexported fields
}

func NewOpaquePortsWatcher

func NewOpaquePortsWatcher(k8sAPI *k8s.API, log *logging.Entry, opaquePorts map[uint32]struct{}) (*OpaquePortsWatcher, error)

NewOpaquePortsWatcher creates a OpaquePortsWatcher and begins watching for k8sAPI for service changes.

func (*OpaquePortsWatcher) Subscribe

func (opw *OpaquePortsWatcher) Subscribe(id ServiceID, listener OpaquePortsUpdateListener) error

Subscribe subscribes a listener to a service; each time the service changes, the listener will be updated if the list of opaque ports changes.

func (*OpaquePortsWatcher) Unsubscribe

func (opw *OpaquePortsWatcher) Unsubscribe(id ServiceID, listener OpaquePortsUpdateListener)

Unsubscribe unsubscribes a listener from service.

type PodID

PodID is the namespace-qualified name of a pod.

type PodID = ID

type Port

Port is a numeric port.

type Port = uint32

type ProfileID

ProfileID is the namespace-qualified name of a service profile.

type ProfileID = ID

type ProfileUpdateListener

ProfileUpdateListener is the interface that subscribers must implement.

type ProfileUpdateListener interface {
    Update(profile *sp.ServiceProfile)
}

type ProfileWatcher

ProfileWatcher watches all service profiles in the Kubernetes cluster. Listeners can subscribe to a particular profile and profileWatcher will publish the service profile and all future changes for that profile.

type ProfileWatcher struct {
    sync.RWMutex // This mutex protects modification of the map itself.
    // contains filtered or unexported fields
}

func NewProfileWatcher

func NewProfileWatcher(k8sAPI *k8s.API, log *logging.Entry) (*ProfileWatcher, error)

NewProfileWatcher creates a ProfileWatcher and begins watching the k8sAPI for service profile changes.

func (*ProfileWatcher) Subscribe

func (pw *ProfileWatcher) Subscribe(id ProfileID, listener ProfileUpdateListener) error

Subscribe to an authority. The provided listener will be updated each time the service profile for the given authority is changed.

func (*ProfileWatcher) Unsubscribe

func (pw *ProfileWatcher) Unsubscribe(id ProfileID, listener ProfileUpdateListener)

Unsubscribe removes a listener from the subscribers list for this authority.

type ServiceID

ServiceID is the namespace-qualified name of a service.

type ServiceID = ID

func (ServiceID) Labels

func (id ServiceID) Labels() prometheus.Labels

Labels returns the labels for prometheus metrics associated to the service

type WorkloadUpdateListener

PodUpdateListener is the interface subscribers must implement.

type WorkloadUpdateListener interface {
    Update(*Address) error
}

type WorkloadWatcher

WorkloadWatcher watches all pods and externalworkloads in the cluster. It keeps a map of publishers keyed by IP and port.

type WorkloadWatcher struct {
    // contains filtered or unexported fields
}

func NewWorkloadWatcher

func NewWorkloadWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, defaultOpaquePorts map[uint32]struct{}) (*WorkloadWatcher, error)

func (*WorkloadWatcher) Subscribe

func (ww *WorkloadWatcher) Subscribe(service *ServiceID, hostname, ip string, port Port, listener WorkloadUpdateListener) (string, error)

Subscribe notifies the listener on changes on any workload backing the passed host/ip:port or changes to its associated opaque protocol config. If service and hostname are empty then ip should be set and vice-versa. If ip is empty, the corresponding ip is found for the given service/hostname, and returned.

func (*WorkloadWatcher) Unsubscribe

func (ww *WorkloadWatcher) Unsubscribe(ip string, port Port, listener WorkloadUpdateListener)

Subscribe stops notifying the listener on chages on any pod backing the passed ip:port or its associated protocol config