1 package cmd
2
3 import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "os"
9 "sort"
10 "strings"
11 "text/tabwriter"
12
13 "github.com/fatih/color"
14 pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
15 "github.com/linkerd/linkerd2/pkg/healthcheck"
16 "github.com/linkerd/linkerd2/pkg/k8s"
17 pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
18 "github.com/linkerd/linkerd2/viz/metrics-api/util"
19 "github.com/linkerd/linkerd2/viz/pkg/api"
20 hc "github.com/linkerd/linkerd2/viz/pkg/healthcheck"
21 pkgUtil "github.com/linkerd/linkerd2/viz/pkg/util"
22 "github.com/spf13/cobra"
23 v1 "k8s.io/api/core/v1"
24 )
25
26 var (
27 okStatus = color.New(color.FgGreen, color.Bold).SprintFunc()("\u221A")
28
29 )
30
31 type edgesOptions struct {
32 namespace string
33 outputFormat string
34 allNamespaces bool
35 }
36
37 func newEdgesOptions() *edgesOptions {
38 return &edgesOptions{
39 outputFormat: tableOutput,
40 allNamespaces: false,
41 }
42 }
43
44 type indexedEdgeResults struct {
45 ix int
46 rows []*pb.Edge
47 err error
48 }
49
50
51 func NewCmdEdges() *cobra.Command {
52 options := newEdgesOptions()
53
54 cmd := &cobra.Command{
55 Use: "edges [flags] (RESOURCETYPE)",
56 Short: "Display connections between resources, and Linkerd proxy identities",
57 Long: `Display connections between resources, and Linkerd proxy identities.
58
59 The RESOURCETYPE argument specifies the type of resource to display edges within.
60
61 Examples:
62 * cronjob
63 * deploy
64 * ds
65 * job
66 * po
67 * rc
68 * rs
69 * sts
70
71 Valid resource types include:
72 * cronjobs
73 * daemonsets
74 * deployments
75 * jobs
76 * pods
77 * replicasets
78 * replicationcontrollers
79 * statefulsets`,
80 Example: ` # Get all edges between pods that either originate from or terminate in the test namespace.
81 linkerd viz edges po -n test
82
83 # Get all edges between pods that either originate from or terminate in the default namespace.
84 linkerd viz edges po
85
86 # Get all edges between pods in all namespaces.
87 linkerd viz edges po --all-namespaces`,
88 Args: cobra.ExactArgs(1),
89 ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
90
91
92
93 if len(args) > 0 {
94 return []string{}, cobra.ShellCompDirectiveError
95 }
96
97 k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
98 if err != nil {
99 return nil, cobra.ShellCompDirectiveError
100 }
101
102 if options.namespace == "" {
103 options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
104 }
105
106 if options.allNamespaces {
107 options.namespace = v1.NamespaceAll
108 }
109
110 cc := k8s.NewCommandCompletion(k8sAPI, options.namespace)
111
112 results, err := cc.Complete(args, toComplete)
113 if err != nil {
114 return nil, cobra.ShellCompDirectiveError
115 }
116
117 return results, cobra.ShellCompDirectiveDefault
118 },
119 RunE: func(cmd *cobra.Command, args []string) error {
120 if options.namespace == "" {
121 options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
122 }
123
124 reqs, err := buildEdgesRequests(args, options)
125 if err != nil {
126 return fmt.Errorf("Error creating edges request: %w", err)
127 }
128
129
130
131 client := api.CheckClientOrExit(hc.VizOptions{
132 Options: &healthcheck.Options{
133 ControlPlaneNamespace: controlPlaneNamespace,
134 KubeConfig: kubeconfigPath,
135 Impersonate: impersonate,
136 ImpersonateGroup: impersonateGroup,
137 KubeContext: kubeContext,
138 APIAddr: apiAddr,
139 },
140 VizNamespaceOverride: vizNamespace,
141 })
142
143 c := make(chan indexedEdgeResults, len(reqs))
144 for num, req := range reqs {
145 go func(num int, req *pb.EdgesRequest) {
146 resp, err := requestEdgesFromAPI(client, req)
147 rows := edgesRespToRows(resp)
148 c <- indexedEdgeResults{num, rows, err}
149 }(num, req)
150 }
151
152 totalRows := make([]*pb.Edge, 0)
153 i := 0
154 for res := range c {
155 if res.err != nil {
156 fmt.Fprint(os.Stderr, res.err.Error())
157 os.Exit(1)
158 }
159 totalRows = append(totalRows, res.rows...)
160 if i++; i == len(reqs) {
161 close(c)
162 }
163 }
164
165 output := renderEdgeStats(totalRows, options)
166 _, err = fmt.Print(output)
167
168 return err
169 },
170 }
171
172 cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace, "Namespace of the specified resource")
173 cmd.PersistentFlags().StringVarP(&options.outputFormat, "output", "o", options.outputFormat, "Output format; one of: \"table\" or \"json\" or \"wide\"")
174 cmd.PersistentFlags().BoolVarP(&options.allNamespaces, "all-namespaces", "A", options.allNamespaces, "If present, returns edges across all namespaces, ignoring the \"--namespace\" flag")
175
176 pkgcmd.ConfigureNamespaceFlagCompletion(
177 cmd, []string{"namespace"},
178 kubeconfigPath, impersonate, impersonateGroup, kubeContext)
179 return cmd
180 }
181
182
183
184 func validateEdgesRequestInputs(targets []*pb.Resource, options *edgesOptions) error {
185 for _, target := range targets {
186 if target.Name != "" {
187 return fmt.Errorf("Edges cannot be returned for a specific resource name; remove %s from query", target.Name)
188 }
189 switch target.Type {
190 case "authority", "service", "all":
191 return fmt.Errorf("Resource type is not supported: %s", target.Type)
192 }
193 }
194
195 switch options.outputFormat {
196 case tableOutput, jsonOutput, wideOutput:
197 return nil
198 default:
199 return fmt.Errorf("--output supports %s, %s and %s", tableOutput, jsonOutput, wideOutput)
200 }
201 }
202
203 func buildEdgesRequests(resources []string, options *edgesOptions) ([]*pb.EdgesRequest, error) {
204 targets, err := pkgUtil.BuildResources(options.namespace, resources)
205
206 if err != nil {
207 return nil, err
208 }
209 err = validateEdgesRequestInputs(targets, options)
210 if err != nil {
211 return nil, err
212 }
213
214 requests := make([]*pb.EdgesRequest, 0)
215 for _, target := range targets {
216 requestParams := util.EdgesRequestParams{
217 ResourceType: target.Type,
218 Namespace: options.namespace,
219 AllNamespaces: options.allNamespaces,
220 }
221
222 req, err := util.BuildEdgesRequest(requestParams)
223 if err != nil {
224 return nil, err
225 }
226 requests = append(requests, req)
227 }
228 return requests, nil
229 }
230
231 func edgesRespToRows(resp *pb.EdgesResponse) []*pb.Edge {
232 rows := make([]*pb.Edge, 0)
233 if resp != nil {
234 rows = append(rows, resp.GetOk().Edges...)
235 }
236 return rows
237 }
238
239 func requestEdgesFromAPI(client pb.ApiClient, req *pb.EdgesRequest) (*pb.EdgesResponse, error) {
240 resp, err := client.Edges(context.Background(), req)
241 if err != nil {
242 return nil, fmt.Errorf("Edges API error: %w", err)
243 }
244 if e := resp.GetError(); e != nil {
245 return nil, fmt.Errorf("Edges API response error: %s", e.Error)
246 }
247 return resp, nil
248 }
249
250 func renderEdgeStats(rows []*pb.Edge, options *edgesOptions) string {
251 var buffer bytes.Buffer
252 w := tabwriter.NewWriter(&buffer, 0, 0, padding, ' ', tabwriter.AlignRight)
253 writeEdgesToBuffer(rows, w, options)
254 w.Flush()
255
256 return renderEdges(buffer, options)
257 }
258
259 type edgeRow struct {
260 src string
261 srcNamespace string
262 dst string
263 dstNamespace string
264 client string
265 server string
266 msg string
267 }
268
269 const (
270 srcHeader = "SRC"
271 dstHeader = "DST"
272 srcNamespaceHeader = "SRC_NS"
273 dstNamespaceHeader = "DST_NS"
274 clientHeader = "CLIENT_ID"
275 serverHeader = "SERVER_ID"
276 msgHeader = "SECURED"
277 )
278
279 func writeEdgesToBuffer(rows []*pb.Edge, w *tabwriter.Writer, options *edgesOptions) {
280 maxSrcLength := len(srcHeader)
281 maxDstLength := len(dstHeader)
282 maxSrcNamespaceLength := len(srcNamespaceHeader)
283 maxDstNamespaceLength := len(dstNamespaceHeader)
284 maxClientLength := len(clientHeader)
285 maxServerLength := len(serverHeader)
286 maxMsgLength := len(msgHeader)
287
288 edgeRows := []edgeRow{}
289 for _, r := range rows {
290 clientID := r.ClientId
291 serverID := r.ServerId
292 msg := r.NoIdentityMsg
293 if msg == "" && options.outputFormat != jsonOutput {
294 msg = okStatus
295 }
296 if len(clientID) > 0 {
297 parts := strings.Split(clientID, ".")
298 clientID = parts[0] + "." + parts[1]
299 }
300 if len(serverID) > 0 {
301 parts := strings.Split(serverID, ".")
302 serverID = parts[0] + "." + parts[1]
303 }
304
305 row := edgeRow{
306 client: clientID,
307 server: serverID,
308 msg: msg,
309 src: r.Src.Name,
310 srcNamespace: r.Src.Namespace,
311 dst: r.Dst.Name,
312 dstNamespace: r.Dst.Namespace,
313 }
314
315 edgeRows = append(edgeRows, row)
316
317 if len(r.Src.Name) > maxSrcLength {
318 maxSrcLength = len(r.Src.Name)
319 }
320 if len(r.Src.Namespace) > maxSrcNamespaceLength {
321 maxSrcNamespaceLength = len(r.Src.Namespace)
322 }
323 if len(r.Dst.Name) > maxDstLength {
324 maxDstLength = len(r.Dst.Name)
325 }
326 if len(r.Dst.Namespace) > maxDstNamespaceLength {
327 maxDstNamespaceLength = len(r.Dst.Namespace)
328 }
329 if len(clientID) > maxClientLength {
330 maxClientLength = len(clientID)
331 }
332 if len(serverID) > maxServerLength {
333 maxServerLength = len(serverID)
334 }
335 if len(msg) > maxMsgLength {
336 maxMsgLength = len(msg)
337 }
338 }
339
340
341 sort.Slice(edgeRows, func(i, j int) bool {
342 keyI := edgeRows[i].srcNamespace + edgeRows[i].dstNamespace + edgeRows[i].src + edgeRows[i].dst
343 keyJ := edgeRows[j].srcNamespace + edgeRows[j].dstNamespace + edgeRows[j].src + edgeRows[j].dst
344 return keyI < keyJ
345 })
346
347 switch options.outputFormat {
348 case tableOutput, wideOutput:
349 if len(edgeRows) == 0 {
350 fmt.Fprintln(os.Stderr, "No edges found.")
351 os.Exit(0)
352 }
353 printEdgeTable(edgeRows, w, maxSrcLength, maxSrcNamespaceLength, maxDstLength, maxDstNamespaceLength, maxClientLength, maxServerLength, maxMsgLength, options.outputFormat)
354 case jsonOutput:
355 printEdgesJSON(edgeRows, w)
356 }
357 }
358
359 func printEdgeTable(edgeRows []edgeRow, w *tabwriter.Writer, maxSrcLength, maxSrcNamespaceLength, maxDstLength, maxDstNamespaceLength, maxClientLength, maxServerLength, maxMsgLength int, outputFormat string) {
360 srcTemplate := fmt.Sprintf("%%-%ds", maxSrcLength)
361 dstTemplate := fmt.Sprintf("%%-%ds", maxDstLength)
362 srcNamespaceTemplate := fmt.Sprintf("%%-%ds", maxSrcNamespaceLength)
363 dstNamespaceTemplate := fmt.Sprintf("%%-%ds", maxDstNamespaceLength)
364 msgTemplate := fmt.Sprintf("%%-%ds", maxMsgLength)
365 clientTemplate := fmt.Sprintf("%%-%ds", maxClientLength)
366 serverTemplate := fmt.Sprintf("%%-%ds", maxServerLength)
367
368 headers := []string{
369 fmt.Sprintf(srcTemplate, srcHeader),
370 fmt.Sprintf(dstTemplate, dstHeader),
371 fmt.Sprintf(srcNamespaceTemplate, srcNamespaceHeader),
372 fmt.Sprintf(dstNamespaceTemplate, dstNamespaceHeader),
373 }
374
375 if outputFormat == wideOutput {
376 headers = append(headers, fmt.Sprintf(clientTemplate, clientHeader), fmt.Sprintf(serverTemplate, serverHeader))
377 }
378
379 headers = append(headers, fmt.Sprintf(msgTemplate, msgHeader)+"\t")
380
381 fmt.Fprintln(w, strings.Join(headers, "\t"))
382
383 for _, row := range edgeRows {
384 values := []interface{}{
385 row.src,
386 row.dst,
387 row.srcNamespace,
388 row.dstNamespace,
389 }
390 templateString := fmt.Sprintf("%s\t%s\t%s\t%s\t", srcTemplate, dstTemplate, srcNamespaceTemplate, dstNamespaceTemplate)
391
392 if outputFormat == wideOutput {
393 templateString += fmt.Sprintf("%s\t%s\t", clientTemplate, serverTemplate)
394 values = append(values, row.client, row.server)
395 }
396
397 templateString += fmt.Sprintf("%s\t\n", msgTemplate)
398 values = append(values, row.msg)
399
400 fmt.Fprintf(w, templateString, values...)
401 }
402 }
403
404 func renderEdges(buffer bytes.Buffer, options *edgesOptions) string {
405 var out string
406 switch options.outputFormat {
407 case jsonOutput:
408 out = buffer.String()
409 default:
410
411 out = string(buffer.Bytes()[padding:])
412 out = strings.ReplaceAll(out, "\n"+strings.Repeat(" ", padding), "\n")
413 }
414
415 return out
416 }
417
418 type edgesJSONStats struct {
419 Src string `json:"src"`
420 SrcNamespace string `json:"src_namespace"`
421 Dst string `json:"dst"`
422 DstNamespace string `json:"dst_namespace"`
423 Client string `json:"client_id"`
424 Server string `json:"server_id"`
425 Msg string `json:"no_tls_reason"`
426 }
427
428 func printEdgesJSON(edgeRows []edgeRow, w *tabwriter.Writer) {
429
430 entries := []*edgesJSONStats{}
431
432 for _, row := range edgeRows {
433 entry := &edgesJSONStats{
434 Src: row.src,
435 SrcNamespace: row.srcNamespace,
436 Dst: row.dst,
437 DstNamespace: row.dstNamespace,
438 Client: row.client,
439 Server: row.server,
440 Msg: row.msg}
441 entries = append(entries, entry)
442 }
443
444 b, err := json.MarshalIndent(entries, "", " ")
445 if err != nil {
446 fmt.Fprintf(os.Stderr, "Error marshalling JSON: %s\n", err)
447 return
448 }
449 fmt.Fprintf(w, "%s\n", b)
450 }
451
View as plain text