1 package watcher
2
3 import (
4 "errors"
5 "fmt"
6 "net"
7
8 ext "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
9 "github.com/linkerd/linkerd2/controller/k8s"
10 "github.com/prometheus/client_golang/prometheus"
11 corev1 "k8s.io/api/core/v1"
12 "k8s.io/apimachinery/pkg/util/intstr"
13 "k8s.io/client-go/tools/cache"
14 )
15
16 const (
17
18 PodIPIndex = "ip"
19
20 HostIPIndex = "hostIP"
21
22 ExternalWorkloadIPIndex = "externalWorkloadIP"
23 )
24
25 type (
26
27 IPPort struct {
28 IP string
29 Port Port
30 }
31
32
33 ID struct {
34 Namespace string
35 Name string
36
37
38 IPFamily corev1.IPFamily
39 }
40
41 ServiceID = ID
42
43 PodID = ID
44
45 ProfileID = ID
46
47 ExternalWorkloadID = ID
48
49
50 Port = uint32
51 namedPort = intstr.IntOrString
52
53
54
55 InvalidService struct {
56 authority string
57 }
58 )
59
60
61 func (id ServiceID) Labels() prometheus.Labels {
62 return prometheus.Labels{"namespace": id.Namespace, "name": id.Name}
63 }
64
65 func (is InvalidService) Error() string {
66 return fmt.Sprintf("Invalid k8s service %s", is.authority)
67 }
68
69 func invalidService(authority string) InvalidService {
70 return InvalidService{authority}
71 }
72
73 func (i ID) String() string {
74 return fmt.Sprintf("%s/%s", i.Namespace, i.Name)
75 }
76
77
78 func InitializeIndexers(k8sAPI *k8s.API) error {
79 err := k8sAPI.Svc().Informer().AddIndexers(cache.Indexers{PodIPIndex: func(obj interface{}) ([]string, error) {
80 svc, ok := obj.(*corev1.Service)
81 if !ok {
82 return nil, errors.New("object is not a service")
83 }
84
85 if len(svc.Spec.ClusterIPs) != 0 {
86 return svc.Spec.ClusterIPs, nil
87 }
88
89 if svc.Spec.ClusterIP != "" {
90 return []string{svc.Spec.ClusterIP}, nil
91 }
92
93 return nil, nil
94 }})
95
96 if err != nil {
97 return fmt.Errorf("could not create an indexer for services: %w", err)
98 }
99
100 err = k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{PodIPIndex: func(obj interface{}) ([]string, error) {
101 if pod, ok := obj.(*corev1.Pod); ok {
102
103
104
105
106 if pod.Spec.HostNetwork {
107 return nil, nil
108 }
109 ips := []string{}
110 for _, pip := range pod.Status.PodIPs {
111 if pip.IP != "" {
112 ips = append(ips, pip.IP)
113 }
114 }
115 if len(ips) == 0 && pod.Status.PodIP != "" {
116 ips = append(ips, pod.Status.PodIP)
117 }
118 return ips, nil
119 }
120 return nil, fmt.Errorf("object is not a pod")
121 }})
122
123 if err != nil {
124 return fmt.Errorf("could not create an indexer for pods: %w", err)
125 }
126
127 err = k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{HostIPIndex: func(obj interface{}) ([]string, error) {
128 pod, ok := obj.(*corev1.Pod)
129 if !ok {
130 return nil, errors.New("object is not a pod")
131 }
132
133 ips := []string{}
134 for _, hip := range pod.Status.HostIPs {
135 ips = append(ips, hip.IP)
136 }
137 if len(ips) == 0 && pod.Status.HostIP != "" {
138 ips = append(ips, pod.Status.HostIP)
139 }
140 if len(ips) == 0 {
141 return []string{}, nil
142 }
143
144
145
146
147 addrs := []string{}
148 for _, c := range pod.Spec.Containers {
149 for _, p := range c.Ports {
150 if p.HostPort == 0 {
151 continue
152 }
153 for _, ip := range ips {
154 addrs = append(addrs, net.JoinHostPort(ip, fmt.Sprintf("%d", p.HostPort)))
155 }
156 }
157 }
158 return addrs, nil
159 }})
160
161 if err != nil {
162 return fmt.Errorf("could not create an indexer for pods: %w", err)
163 }
164
165 err = k8sAPI.ExtWorkload().Informer().AddIndexers(cache.Indexers{ExternalWorkloadIPIndex: func(obj interface{}) ([]string, error) {
166 ew, ok := obj.(*ext.ExternalWorkload)
167 if !ok {
168 return nil, errors.New("object is not an externalworkload")
169 }
170
171 addrs := []string{}
172 for _, ip := range ew.Spec.WorkloadIPs {
173 for _, port := range ew.Spec.Ports {
174 addrs = append(addrs, net.JoinHostPort(ip.Ip, fmt.Sprintf("%d", port.Port)))
175 }
176 }
177 return addrs, nil
178 }})
179
180 if err != nil {
181 return fmt.Errorf("could not create an indexer for externalworkloads: %w", err)
182 }
183
184 return nil
185 }
186
187 func getIndexedPods(k8sAPI *k8s.API, indexName string, key string) ([]*corev1.Pod, error) {
188 objs, err := k8sAPI.Pod().Informer().GetIndexer().ByIndex(indexName, key)
189 if err != nil {
190 return nil, fmt.Errorf("failed getting %s indexed pods: %w", indexName, err)
191 }
192 pods := make([]*corev1.Pod, 0)
193 for _, obj := range objs {
194 pod := obj.(*corev1.Pod)
195 if !podNotTerminating(pod) {
196 continue
197 }
198 pods = append(pods, pod)
199 }
200 return pods, nil
201 }
202
203 func getIndexedExternalWorkloads(k8sAPI *k8s.API, indexName string, key string) ([]*ext.ExternalWorkload, error) {
204 objs, err := k8sAPI.ExtWorkload().Informer().GetIndexer().ByIndex(indexName, key)
205 if err != nil {
206 return nil, fmt.Errorf("failed getting %s indexed externalworkloads: %w", indexName, err)
207 }
208 workloads := make([]*ext.ExternalWorkload, 0)
209 for _, obj := range objs {
210 workload := obj.(*ext.ExternalWorkload)
211 workloads = append(workloads, workload)
212 }
213 return workloads, nil
214 }
215
216 func podNotTerminating(pod *corev1.Pod) bool {
217 phase := pod.Status.Phase
218 podTerminated := phase == corev1.PodSucceeded || phase == corev1.PodFailed
219 podTerminating := pod.DeletionTimestamp != nil
220 return !podTerminating && !podTerminated
221 }
222
View as plain text