1 package multicluster
2
3 import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "strconv"
9 "strings"
10 "time"
11
12 "github.com/linkerd/linkerd2/pkg/k8s"
13 corev1 "k8s.io/api/core/v1"
14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
16 "k8s.io/apimachinery/pkg/runtime/schema"
17 "k8s.io/client-go/dynamic"
18 )
19
20 type (
21
22
23
24 ProbeSpec struct {
25 Path string
26 Port uint32
27 Period time.Duration
28 }
29
30
31
32
33
34 Link struct {
35 Name string
36 Namespace string
37 TargetClusterName string
38 TargetClusterDomain string
39 TargetClusterLinkerdNamespace string
40 ClusterCredentialsSecret string
41 GatewayAddress string
42 GatewayPort uint32
43 GatewayIdentity string
44 ProbeSpec ProbeSpec
45 Selector metav1.LabelSelector
46 RemoteDiscoverySelector metav1.LabelSelector
47 }
48 )
49
50
51 var LinkGVR = schema.GroupVersionResource{
52 Group: k8s.LinkAPIGroup,
53 Version: k8s.LinkAPIVersion,
54 Resource: "links",
55 }
56
57 func (ps ProbeSpec) String() string {
58 return fmt.Sprintf("ProbeSpec: {path: %s, port: %d, period: %s}", ps.Path, ps.Port, ps.Period)
59 }
60
61
62
63 func NewLink(u unstructured.Unstructured) (Link, error) {
64
65 spec, ok := u.Object["spec"]
66 if !ok {
67 return Link{}, errors.New("Field 'spec' is missing")
68 }
69 specObj, ok := spec.(map[string]interface{})
70 if !ok {
71 return Link{}, errors.New("Field 'spec' is not an object")
72 }
73
74 ps, ok := specObj["probeSpec"]
75 if !ok {
76 return Link{}, errors.New("Field 'probeSpec' is missing")
77 }
78 psObj, ok := ps.(map[string]interface{})
79 if !ok {
80 return Link{}, errors.New("Field 'probeSpec' it not an object")
81 }
82
83 probeSpec, err := newProbeSpec(psObj)
84 if err != nil {
85 return Link{}, err
86 }
87
88 targetClusterName, err := stringField(specObj, "targetClusterName")
89 if err != nil {
90 return Link{}, err
91 }
92
93 targetClusterDomain, err := stringField(specObj, "targetClusterDomain")
94 if err != nil {
95 return Link{}, err
96 }
97
98 targetClusterLinkerdNamespace, err := stringField(specObj, "targetClusterLinkerdNamespace")
99 if err != nil {
100 return Link{}, err
101 }
102
103 clusterCredentialsSecret, err := stringField(specObj, "clusterCredentialsSecret")
104 if err != nil {
105 return Link{}, err
106 }
107
108 gatewayAddress, err := stringField(specObj, "gatewayAddress")
109 if err != nil {
110 return Link{}, err
111 }
112
113 portStr, err := stringField(specObj, "gatewayPort")
114 if err != nil {
115 return Link{}, err
116 }
117 gatewayPort, err := strconv.ParseUint(portStr, 10, 32)
118 if err != nil {
119 return Link{}, err
120 }
121
122 gatewayIdentity, err := stringField(specObj, "gatewayIdentity")
123 if err != nil {
124 return Link{}, err
125 }
126
127 selector := metav1.LabelSelector{}
128 if selectorObj, ok := specObj["selector"]; ok {
129 bytes, err := json.Marshal(selectorObj)
130 if err != nil {
131 return Link{}, err
132 }
133 err = json.Unmarshal(bytes, &selector)
134 if err != nil {
135 return Link{}, err
136 }
137 }
138
139 remoteDiscoverySelector := metav1.LabelSelector{}
140 if selectorObj, ok := specObj["remoteDiscoverySelector"]; ok {
141 bytes, err := json.Marshal(selectorObj)
142 if err != nil {
143 return Link{}, err
144 }
145 err = json.Unmarshal(bytes, &remoteDiscoverySelector)
146 if err != nil {
147 return Link{}, err
148 }
149 }
150
151 return Link{
152 Name: u.GetName(),
153 Namespace: u.GetNamespace(),
154 TargetClusterName: targetClusterName,
155 TargetClusterDomain: targetClusterDomain,
156 TargetClusterLinkerdNamespace: targetClusterLinkerdNamespace,
157 ClusterCredentialsSecret: clusterCredentialsSecret,
158 GatewayAddress: gatewayAddress,
159 GatewayPort: uint32(gatewayPort),
160 GatewayIdentity: gatewayIdentity,
161 ProbeSpec: probeSpec,
162 Selector: selector,
163 RemoteDiscoverySelector: remoteDiscoverySelector,
164 }, nil
165 }
166
167
168
169 func (l Link) ToUnstructured() (unstructured.Unstructured, error) {
170 spec := map[string]interface{}{
171 "targetClusterName": l.TargetClusterName,
172 "targetClusterDomain": l.TargetClusterDomain,
173 "targetClusterLinkerdNamespace": l.TargetClusterLinkerdNamespace,
174 "clusterCredentialsSecret": l.ClusterCredentialsSecret,
175 "gatewayAddress": l.GatewayAddress,
176 "gatewayPort": fmt.Sprintf("%d", l.GatewayPort),
177 "gatewayIdentity": l.GatewayIdentity,
178 "probeSpec": map[string]interface{}{
179 "path": l.ProbeSpec.Path,
180 "port": fmt.Sprintf("%d", l.ProbeSpec.Port),
181 "period": l.ProbeSpec.Period.String(),
182 },
183 }
184
185 data, err := json.Marshal(l.Selector)
186 if err != nil {
187 return unstructured.Unstructured{}, err
188 }
189 selector := make(map[string]interface{})
190 err = json.Unmarshal(data, &selector)
191 if err != nil {
192 return unstructured.Unstructured{}, err
193 }
194 spec["selector"] = selector
195
196 data, err = json.Marshal(l.RemoteDiscoverySelector)
197 if err != nil {
198 return unstructured.Unstructured{}, err
199 }
200 remoteDiscoverySelector := make(map[string]interface{})
201 err = json.Unmarshal(data, &remoteDiscoverySelector)
202 if err != nil {
203 return unstructured.Unstructured{}, err
204 }
205 spec["remoteDiscoverySelector"] = remoteDiscoverySelector
206
207 return unstructured.Unstructured{
208 Object: map[string]interface{}{
209 "apiVersion": k8s.LinkAPIGroupVersion,
210 "kind": k8s.LinkKind,
211 "metadata": map[string]interface{}{
212 "name": l.Name,
213 "namespace": l.Namespace,
214 },
215 "spec": spec,
216 },
217 }, nil
218 }
219
220
221 func ExtractProbeSpec(gateway *corev1.Service) (ProbeSpec, error) {
222 path := gateway.Annotations[k8s.GatewayProbePath]
223 if path == "" {
224 return ProbeSpec{}, errors.New("probe path is empty")
225 }
226
227 port, err := extractPort(gateway.Spec, k8s.ProbePortName)
228 if err != nil {
229 return ProbeSpec{}, err
230 }
231
232 period, err := strconv.ParseUint(gateway.Annotations[k8s.GatewayProbePeriod], 10, 32)
233 if err != nil {
234 return ProbeSpec{}, err
235 }
236
237 return ProbeSpec{
238 Path: path,
239 Port: port,
240 Period: time.Duration(period) * time.Second,
241 }, nil
242 }
243
244
245 func GetLinks(ctx context.Context, client dynamic.Interface) ([]Link, error) {
246 list, err := client.Resource(LinkGVR).List(ctx, metav1.ListOptions{})
247 if err != nil {
248 return nil, err
249 }
250 links := []Link{}
251 errs := []string{}
252 for _, u := range list.Items {
253 link, err := NewLink(u)
254 if err != nil {
255 errs = append(errs, fmt.Sprintf("failed to parse Link %s: %s", u.GetName(), err))
256 } else {
257 links = append(links, link)
258 }
259 }
260 if len(errs) > 0 {
261 return nil, errors.New(strings.Join(errs, "\n"))
262 }
263 return links, nil
264 }
265
266
267 func GetLink(ctx context.Context, client dynamic.Interface, namespace, name string) (Link, error) {
268 unstructured, err := client.Resource(LinkGVR).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
269 if err != nil {
270 return Link{}, err
271 }
272 return NewLink(*unstructured)
273 }
274
275 func extractPort(spec corev1.ServiceSpec, portName string) (uint32, error) {
276 for _, p := range spec.Ports {
277 if p.Name == portName {
278 if spec.Type == "NodePort" {
279 return uint32(p.NodePort), nil
280 }
281 return uint32(p.Port), nil
282 }
283 }
284 return 0, fmt.Errorf("could not find port with name %s", portName)
285 }
286
287 func newProbeSpec(obj map[string]interface{}) (ProbeSpec, error) {
288 periodStr, err := stringField(obj, "period")
289 if err != nil {
290 return ProbeSpec{}, err
291 }
292 period, err := time.ParseDuration(periodStr)
293 if err != nil {
294 return ProbeSpec{}, err
295 }
296
297 path, err := stringField(obj, "path")
298 if err != nil {
299 return ProbeSpec{}, err
300 }
301
302 portStr, err := stringField(obj, "port")
303 if err != nil {
304 return ProbeSpec{}, err
305 }
306 port, err := strconv.ParseUint(portStr, 10, 32)
307 if err != nil {
308 return ProbeSpec{}, err
309 }
310
311 return ProbeSpec{
312 Path: path,
313 Port: uint32(port),
314 Period: period,
315 }, nil
316 }
317
318 func stringField(obj map[string]interface{}, key string) (string, error) {
319 value, ok := obj[key]
320 if !ok {
321 return "", fmt.Errorf("Field '%s' is missing", key)
322 }
323 str, ok := value.(string)
324 if !ok {
325 return "", fmt.Errorf("Field '%s' is not a string", key)
326 }
327 return str, nil
328 }
329
View as plain text