1 package cmd
2
3 import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "os"
10 "sort"
11 "strings"
12 "text/tabwriter"
13 "time"
14
15 pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
16 "github.com/linkerd/linkerd2/pkg/healthcheck"
17 "github.com/linkerd/linkerd2/pkg/k8s"
18 pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
19 "github.com/linkerd/linkerd2/viz/metrics-api/util"
20 "github.com/linkerd/linkerd2/viz/pkg/api"
21 hc "github.com/linkerd/linkerd2/viz/pkg/healthcheck"
22
23 pkgUtil "github.com/linkerd/linkerd2/viz/pkg/util"
24 log "github.com/sirupsen/logrus"
25 "github.com/spf13/cobra"
26 )
27
28 type routesOptions struct {
29 namespace string
30 statOptionsBase
31 toResource string
32 toNamespace string
33 dstIsService bool
34 labelSelector string
35 }
36
37 type routeRowStats struct {
38 rowStats
39 actualRequestRate float64
40 actualSuccessRate float64
41 hasRequestData bool
42 }
43
44 func newRoutesOptions() *routesOptions {
45 return &routesOptions{
46 statOptionsBase: *newStatOptionsBase(),
47 toResource: "",
48 toNamespace: "",
49 labelSelector: "",
50 }
51 }
52
53
54 func NewCmdRoutes() *cobra.Command {
55 options := newRoutesOptions()
56
57 cmd := &cobra.Command{
58 Use: "routes [flags] (RESOURCES)",
59 Short: "Display route stats",
60 Long: `Display route stats.
61
62 This command will only display traffic which is sent to a service that has a Service Profile defined.`,
63 Example: ` # Routes for the webapp service in the test namespace.
64 linkerd viz routes service/webapp -n test
65
66 # Routes for calls from the traffic deployment to the webapp service in the test namespace.
67 linkerd viz routes deploy/traffic -n test --to svc/webapp`,
68 Args: cobra.ExactArgs(1),
69 ValidArgs: pkgUtil.ValidTargets,
70 RunE: func(cmd *cobra.Command, args []string) error {
71 if options.namespace == "" {
72 options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
73 }
74 req, err := buildTopRoutesRequest(args[0], options)
75 if err != nil {
76 return fmt.Errorf("error creating metrics request while making routes request: %w", err)
77 }
78
79 output, err := requestRouteStatsFromAPI(
80 api.CheckClientOrExit(hc.VizOptions{
81 Options: &healthcheck.Options{
82 ControlPlaneNamespace: controlPlaneNamespace,
83 KubeConfig: kubeconfigPath,
84 Impersonate: impersonate,
85 ImpersonateGroup: impersonateGroup,
86 KubeContext: kubeContext,
87 APIAddr: apiAddr,
88 },
89 VizNamespaceOverride: vizNamespace,
90 }),
91 req,
92 options,
93 )
94 if err != nil {
95 fmt.Fprint(os.Stderr, err.Error())
96 os.Exit(1)
97 }
98
99 _, err = fmt.Print(output)
100
101 return err
102 },
103 }
104
105 cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace, "Namespace of the specified resource")
106 cmd.PersistentFlags().StringVarP(&options.timeWindow, "time-window", "t", options.timeWindow, "Stat window (for example: \"10s\", \"1m\", \"10m\", \"1h\")")
107 cmd.PersistentFlags().StringVar(&options.toResource, "to", options.toResource, "If present, shows outbound stats to the specified resource")
108 cmd.PersistentFlags().StringVar(&options.toNamespace, "to-namespace", options.toNamespace, "Sets the namespace used to lookup the \"--to\" resource; by default the current \"--namespace\" is used")
109 cmd.PersistentFlags().StringVarP(&options.outputFormat, "output", "o", options.outputFormat, fmt.Sprintf("Output format; one of: \"%s\", \"%s\", or \"%s\"", tableOutput, wideOutput, jsonOutput))
110 cmd.PersistentFlags().StringVarP(&options.labelSelector, "selector", "l", options.labelSelector, "Selector (label query) to filter on, supports '=', '==', and '!='")
111
112 pkgcmd.ConfigureNamespaceFlagCompletion(
113 cmd, []string{"namespace", "to-namespace"},
114 kubeconfigPath, impersonate, impersonateGroup, kubeContext)
115 return cmd
116 }
117
118 func requestRouteStatsFromAPI(client pb.ApiClient, req *pb.TopRoutesRequest, options *routesOptions) (string, error) {
119 resp, err := client.TopRoutes(context.Background(), req)
120 if err != nil {
121 return "", fmt.Errorf("TopRoutes API error: %w", err)
122 }
123 if e := resp.GetError(); e != nil {
124 return "", errors.New(e.Error)
125 }
126
127 return renderRouteStats(resp, options), nil
128 }
129
130 func renderRouteStats(resp *pb.TopRoutesResponse, options *routesOptions) string {
131 var buffer bytes.Buffer
132 w := tabwriter.NewWriter(&buffer, 0, 0, padding, ' ', tabwriter.AlignRight)
133 writeRouteStatsToBuffer(resp, w, options)
134 w.Flush()
135
136 return renderStats(buffer, &options.statOptionsBase)
137 }
138
139 func writeRouteStatsToBuffer(resp *pb.TopRoutesResponse, w *tabwriter.Writer, options *routesOptions) {
140
141 tables := make(map[string][]*routeRowStats)
142
143 for _, resourceTable := range resp.GetOk().GetRoutes() {
144
145 table := make([]*routeRowStats, 0)
146
147 for _, r := range resourceTable.GetRows() {
148 if r.Stats != nil {
149 route := r.GetRoute()
150 table = append(table, &routeRowStats{
151 rowStats: rowStats{
152 route: route,
153 dst: r.GetAuthority(),
154 requestRate: getRequestRate(r.Stats.GetSuccessCount(), r.Stats.GetFailureCount(), r.TimeWindow),
155 successRate: getSuccessRate(r.Stats.GetSuccessCount(), r.Stats.GetFailureCount()),
156 latencyP50: r.Stats.LatencyMsP50,
157 latencyP95: r.Stats.LatencyMsP95,
158 latencyP99: r.Stats.LatencyMsP99,
159 },
160 actualRequestRate: getRequestRate(r.Stats.GetActualSuccessCount(), r.Stats.GetActualFailureCount(), r.TimeWindow),
161 actualSuccessRate: getSuccessRate(r.Stats.GetActualSuccessCount(), r.Stats.GetActualFailureCount()),
162 hasRequestData: statHasRequestData(r.Stats),
163 })
164 }
165 }
166
167 sort.Slice(table, func(i, j int) bool {
168 return table[i].dst+table[i].route < table[j].dst+table[j].route
169 })
170
171 tables[resourceTable.GetResource()] = table
172 }
173
174 resources := make([]string, 0)
175 for resource := range tables {
176 resources = append(resources, resource)
177 }
178 sort.Strings(resources)
179
180 switch options.outputFormat {
181 case tableOutput, wideOutput:
182 for _, resource := range resources {
183 if len(tables) > 1 {
184 fmt.Fprintf(w, "==> %s <==\t\f", resource)
185 }
186 printRouteTable(tables[resource], w, options)
187 fmt.Fprintln(w)
188 }
189 case jsonOutput:
190 printRouteJSON(tables, w, options)
191 }
192 }
193
194 func printRouteTable(stats []*routeRowStats, w *tabwriter.Writer, options *routesOptions) {
195
196 routeTemplate := fmt.Sprintf("%%-%ds", routeWidth(stats))
197
198 authorityColumn := "AUTHORITY"
199 if options.dstIsService {
200 authorityColumn = "SERVICE"
201 }
202
203 headers := []string{
204 fmt.Sprintf(routeTemplate, "ROUTE"),
205 authorityColumn,
206 }
207 outputActual := options.toResource != "" && options.outputFormat == wideOutput
208 if outputActual {
209 headers = append(headers, []string{
210 "EFFECTIVE_SUCCESS",
211 "EFFECTIVE_RPS",
212 "ACTUAL_SUCCESS",
213 "ACTUAL_RPS",
214 }...)
215 } else {
216 headers = append(headers, []string{
217 "SUCCESS",
218 "RPS",
219 }...)
220 }
221
222 headers = append(headers, []string{
223 "LATENCY_P50",
224 "LATENCY_P95",
225 "LATENCY_P99\t",
226 }...)
227
228 fmt.Fprintln(w, strings.Join(headers, "\t"))
229
230
231 templateString := routeTemplate + "\t%s\t%.2f%%\t%.1frps\t"
232 if outputActual {
233
234 templateString += "%.2f%%\t%.1frps\t"
235 }
236
237 templateString += "%dms\t%dms\t%dms\t\n"
238
239 var emptyTemplateString string
240 if outputActual {
241 emptyTemplateString = routeTemplate + "\t%s\t-\t-\t-\t-\t-\t-\t-\t\n"
242 } else {
243 emptyTemplateString = routeTemplate + "\t%s\t-\t-\t-\t-\t-\t\n"
244 }
245
246 for _, row := range stats {
247
248 values := []interface{}{
249 row.route,
250 row.dst,
251 }
252
253 if row.hasRequestData {
254 values = append(values, []interface{}{
255 row.successRate * 100,
256 row.requestRate,
257 }...)
258
259 if outputActual {
260 values = append(values, []interface{}{
261 row.actualSuccessRate * 100,
262 row.actualRequestRate,
263 }...)
264 }
265 values = append(values, []interface{}{
266 row.latencyP50,
267 row.latencyP95,
268 row.latencyP99,
269 }...)
270
271 fmt.Fprintf(w, templateString, values...)
272 } else {
273 fmt.Fprintf(w, emptyTemplateString, values...)
274 }
275 }
276 }
277
278
279 func getRequestRate(success, failure uint64, timeWindow string) float64 {
280 windowLength, err := time.ParseDuration(timeWindow)
281 if err != nil {
282 log.Error(err.Error())
283 return 0.0
284 }
285 return float64(success+failure) / windowLength.Seconds()
286 }
287
288
289 func getSuccessRate(success, failure uint64) float64 {
290 if success+failure == 0 {
291 return 0.0
292 }
293 return float64(success) / float64(success+failure)
294 }
295
296
297
298 type JSONRouteStats struct {
299 Route string `json:"route"`
300 Authority string `json:"authority"`
301 Success *float64 `json:"success,omitempty"`
302 Rps *float64 `json:"rps,omitempty"`
303 EffectiveSuccess *float64 `json:"effective_success,omitempty"`
304 EffectiveRps *float64 `json:"effective_rps,omitempty"`
305 ActualSuccess *float64 `json:"actual_success,omitempty"`
306 ActualRps *float64 `json:"actual_rps,omitempty"`
307 LatencyMSp50 *uint64 `json:"latency_ms_p50"`
308 LatencyMSp95 *uint64 `json:"latency_ms_p95"`
309 LatencyMSp99 *uint64 `json:"latency_ms_p99"`
310 }
311
312 func printRouteJSON(tables map[string][]*routeRowStats, w *tabwriter.Writer, options *routesOptions) {
313
314 entries := map[string][]*JSONRouteStats{}
315 for resource, table := range tables {
316 for _, row := range table {
317 route := row.route
318 entry := &JSONRouteStats{
319 Route: route,
320 }
321
322 entry.Authority = row.dst
323 if options.toResource != "" {
324 entry.EffectiveSuccess = &row.successRate
325 entry.EffectiveRps = &row.requestRate
326 entry.ActualSuccess = &row.actualSuccessRate
327 entry.ActualRps = &row.actualRequestRate
328 } else {
329 entry.Success = &row.successRate
330 entry.Rps = &row.requestRate
331 }
332 entry.LatencyMSp50 = &row.latencyP50
333 entry.LatencyMSp95 = &row.latencyP95
334 entry.LatencyMSp99 = &row.latencyP99
335
336 entries[resource] = append(entries[resource], entry)
337 }
338 }
339 b, err := json.MarshalIndent(entries, "", " ")
340 if err != nil {
341 log.Error(err.Error())
342 return
343 }
344 fmt.Fprintf(w, "%s\n", b)
345 }
346
347 func (o *routesOptions) validateOutputFormat() error {
348 switch o.outputFormat {
349 case tableOutput, jsonOutput:
350 return nil
351 case wideOutput:
352 if o.toResource == "" {
353 return fmt.Errorf("%s output is only available when --to is specified", wideOutput)
354 }
355 return nil
356 default:
357 return fmt.Errorf("--output currently only supports %s, %s, and %s", tableOutput, wideOutput, jsonOutput)
358 }
359 }
360
361 func buildTopRoutesRequest(resource string, options *routesOptions) (*pb.TopRoutesRequest, error) {
362 err := options.validateOutputFormat()
363 if err != nil {
364 return nil, err
365 }
366
367 target, err := pkgUtil.BuildResource(options.namespace, resource)
368 if err != nil {
369 return nil, err
370 }
371
372 requestParams := util.TopRoutesRequestParams{
373 StatsBaseRequestParams: util.StatsBaseRequestParams{
374 TimeWindow: options.timeWindow,
375 ResourceName: target.Name,
376 ResourceType: target.Type,
377 Namespace: options.namespace,
378 },
379 LabelSelector: options.labelSelector,
380 }
381
382 options.dstIsService = target.GetType() != k8s.Authority
383
384 if options.toResource != "" {
385 if options.toNamespace == "" {
386 options.toNamespace = options.namespace
387 }
388 toRes, err := pkgUtil.BuildResource(options.toNamespace, options.toResource)
389 if err != nil {
390 return nil, err
391 }
392
393 options.dstIsService = toRes.GetType() != k8s.Authority
394
395 requestParams.ToName = toRes.Name
396 requestParams.ToNamespace = toRes.Namespace
397 requestParams.ToType = toRes.Type
398 }
399
400 return util.BuildTopRoutesRequest(requestParams)
401 }
402
403
404 func routeWidth(stats []*routeRowStats) int {
405 maxLength := 0
406 for _, row := range stats {
407 if len(row.route) > maxLength {
408 maxLength = len(row.route)
409 }
410 }
411 return maxLength
412 }
413
View as plain text