1 package cmd
2
3 import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "os"
10 "sort"
11 "strings"
12 "sync"
13 "text/tabwriter"
14
15 destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination"
16 netPb "github.com/linkerd/linkerd2-proxy-api/go/net"
17 "github.com/linkerd/linkerd2/controller/api/destination"
18 "github.com/linkerd/linkerd2/pkg/addr"
19 pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
20 "github.com/linkerd/linkerd2/pkg/k8s"
21 log "github.com/sirupsen/logrus"
22 "github.com/spf13/cobra"
23 "google.golang.org/grpc"
24 "google.golang.org/grpc/status"
25 )
26
27 type endpointsOptions struct {
28 outputFormat string
29 destinationPod string
30 contextToken string
31 }
32
33 type (
34
35 endpointsInfo map[string]map[uint32][]podData
36 podData struct {
37 name string
38 address string
39 ip string
40 weight uint32
41 labels map[string]string
42 http2 *destinationPb.Http2ClientParams
43 }
44 )
45
46 const (
47 podHeader = "POD"
48 namespaceHeader = "NAMESPACE"
49 padding = 3
50 )
51
52
53
54 func (o *endpointsOptions) validate() error {
55 if o.outputFormat == tableOutput || o.outputFormat == jsonOutput {
56 return nil
57 }
58
59 return fmt.Errorf("--output currently only supports %s and %s", tableOutput, jsonOutput)
60 }
61
62 func newEndpointsOptions() *endpointsOptions {
63 return &endpointsOptions{
64 outputFormat: tableOutput,
65 }
66 }
67
68 func newCmdEndpoints() *cobra.Command {
69 options := newEndpointsOptions()
70
71 example := ` # get all endpoints for the authorities emoji-svc.emojivoto.svc.cluster.local:8080 and web-svc.emojivoto.svc.cluster.local:80
72 linkerd diagnostics endpoints emoji-svc.emojivoto.svc.cluster.local:8080 web-svc.emojivoto.svc.cluster.local:80
73
74 # get that same information in json format
75 linkerd diagnostics endpoints -o json emoji-svc.emojivoto.svc.cluster.local:8080 web-svc.emojivoto.svc.cluster.local:80
76
77 # get the endpoints for authorities in Linkerd's control-plane itself
78 linkerd diagnostics endpoints web.linkerd-viz.svc.cluster.local:8084`
79
80 cmd := &cobra.Command{
81 Use: "endpoints [flags] authorities",
82 Aliases: []string{"ep"},
83 Short: "Introspect Linkerd's service discovery state",
84 Long: `Introspect Linkerd's service discovery state.
85
86 This command provides debug information about the internal state of the
87 control-plane's destination container. It queries the same Destination service
88 endpoint as the linkerd-proxy's, and returns the addresses associated with that
89 destination.`,
90 Example: example,
91 Args: cobra.MinimumNArgs(1),
92 RunE: func(cmd *cobra.Command, args []string) error {
93 err := options.validate()
94 if err != nil {
95 return err
96 }
97
98 var client destinationPb.DestinationClient
99 var conn *grpc.ClientConn
100 if apiAddr != "" {
101 client, conn, err = destination.NewClient(apiAddr)
102 if err != nil {
103 fmt.Fprintf(os.Stderr, "Error creating destination client: %s\n", err)
104 os.Exit(1)
105 }
106 } else {
107 k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
108 if err != nil {
109 return err
110 }
111
112 client, conn, err = destination.NewExternalClient(cmd.Context(), controlPlaneNamespace, k8sAPI, options.destinationPod)
113 if err != nil {
114 fmt.Fprintf(os.Stderr, "Error creating destination client: %s\n", err)
115 os.Exit(1)
116 }
117 }
118
119 defer conn.Close()
120
121 endpoints, err := requestEndpointsFromAPI(client, options.contextToken, args)
122 if err != nil {
123 fmt.Fprintf(os.Stderr, "Destination API error: %s\n", err)
124 os.Exit(1)
125 }
126
127 output := renderEndpoints(endpoints, options)
128 _, err = fmt.Print(output)
129
130 return err
131 },
132 }
133
134 cmd.PersistentFlags().StringVarP(&options.outputFormat, "output", "o", options.outputFormat, fmt.Sprintf("Output format; one of: \"%s\" or \"%s\"", tableOutput, jsonOutput))
135 cmd.PersistentFlags().StringVar(&options.destinationPod, "destination-pod", "", "Target a specific destination Pod when there are multiple running")
136 cmd.PersistentFlags().StringVar(&options.contextToken, "token", "", "The context token to use when making the request to the destination API")
137
138 pkgcmd.ConfigureOutputFlagCompletion(cmd)
139
140 return cmd
141 }
142
143 func requestEndpointsFromAPI(client destinationPb.DestinationClient, token string, authorities []string) (endpointsInfo, error) {
144 info := make(endpointsInfo)
145
146 events := make(chan *destinationPb.Update, len(authorities))
147 errs := make(chan error, len(authorities))
148 var wg sync.WaitGroup
149
150 for _, authority := range authorities {
151 wg.Add(1)
152 go func(authority string) {
153 defer wg.Done()
154 if len(errs) == 0 {
155 dest := &destinationPb.GetDestination{
156 Scheme: "http:",
157 Path: authority,
158 ContextToken: token,
159 }
160
161 rsp, err := client.Get(context.Background(), dest)
162 if err != nil {
163 errs <- err
164 return
165 }
166
167 event, err := rsp.Recv()
168 if err != nil {
169 if grpcError, ok := status.FromError(err); ok {
170 err = errors.New(grpcError.Message())
171 }
172 errs <- err
173 return
174 }
175 events <- event
176 }
177 }(authority)
178 }
179
180 wg.Wait()
181
182 for i := 0; i < len(authorities); i++ {
183 select {
184 case err := <-errs:
185
186 return nil, err
187 case event := <-events:
188 addressSet := event.GetAdd()
189 labels := addressSet.GetMetricLabels()
190 serviceID := labels["service"] + "." + labels["namespace"]
191 if _, ok := info[serviceID]; !ok {
192 info[serviceID] = make(map[uint32][]podData)
193 }
194
195 for _, addr := range addressSet.GetAddrs() {
196 tcpAddr := addr.GetAddr()
197 port := tcpAddr.GetPort()
198
199 if info[serviceID][port] == nil {
200 info[serviceID][port] = make([]podData, 0)
201 }
202
203 labels := addr.GetMetricLabels()
204 info[serviceID][port] = append(info[serviceID][port], podData{
205 name: labels["pod"],
206 address: tcpAddr.String(),
207 ip: getIP(tcpAddr),
208 weight: addr.GetWeight(),
209 labels: addr.GetMetricLabels(),
210 http2: addr.GetHttp2(),
211 })
212 }
213 }
214 }
215
216 return info, nil
217 }
218
219 func getIP(tcpAddr *netPb.TcpAddress) string {
220 ip := addr.FromProxyAPI(tcpAddr.GetIp())
221 if ip == nil {
222 return ""
223 }
224 return addr.PublicIPToString(ip)
225 }
226
227 func renderEndpoints(endpoints endpointsInfo, options *endpointsOptions) string {
228 var buffer bytes.Buffer
229 w := tabwriter.NewWriter(&buffer, 0, 0, padding, ' ', 0)
230 writeEndpointsToBuffer(endpoints, w, options)
231 w.Flush()
232
233 return buffer.String()
234 }
235
236 type rowEndpoint struct {
237 Namespace string `json:"namespace"`
238 IP string `json:"ip"`
239 Port uint32 `json:"port"`
240 Pod string `json:"pod"`
241 Service string `json:"service"`
242 Weight uint32 `json:"weight"`
243
244 Http2 *destinationPb.Http2ClientParams `json:"http2,omitempty"`
245
246 Labels map[string]string `json:"labels"`
247 }
248
249 func writeEndpointsToBuffer(endpoints endpointsInfo, w *tabwriter.Writer, options *endpointsOptions) {
250 maxPodLength := len(podHeader)
251 maxNamespaceLength := len(namespaceHeader)
252 endpointsTables := map[string][]rowEndpoint{}
253
254 for serviceID, servicePort := range endpoints {
255 namespace := ""
256 parts := strings.SplitN(serviceID, ".", 2)
257 namespace = parts[1]
258
259 for port, podAddrs := range servicePort {
260 for _, pod := range podAddrs {
261 name := pod.name
262 parts := strings.SplitN(name, "/", 2)
263 if len(parts) == 2 {
264 name = parts[1]
265 }
266 row := rowEndpoint{
267 Namespace: namespace,
268 IP: pod.ip,
269 Port: port,
270 Pod: name,
271 Service: serviceID,
272 Weight: pod.weight,
273 Labels: pod.labels,
274 Http2: pod.http2,
275 }
276
277 endpointsTables[namespace] = append(endpointsTables[namespace], row)
278
279 if len(name) > maxPodLength {
280 maxPodLength = len(name)
281 }
282 if len(namespace) > maxNamespaceLength {
283 maxNamespaceLength = len(namespace)
284 }
285 }
286
287 sort.Slice(endpointsTables[namespace], func(i, j int) bool {
288 return endpointsTables[namespace][i].Service < endpointsTables[namespace][j].Service
289 })
290 }
291 }
292
293 switch options.outputFormat {
294 case tableOutput:
295 if len(endpointsTables) == 0 {
296 fmt.Fprintln(os.Stderr, "No endpoints found.")
297 os.Exit(0)
298 }
299 printEndpointsTables(endpointsTables, w, maxPodLength, maxNamespaceLength)
300 case jsonOutput:
301 printEndpointsJSON(endpointsTables, w)
302 }
303 }
304
305 func printEndpointsTables(endpointsTables map[string][]rowEndpoint, w *tabwriter.Writer, maxPodLength int, maxNamespaceLength int) {
306 firstTable := true
307
308 for _, ns := range sortNamespaceKeys(endpointsTables) {
309 if !firstTable {
310 fmt.Fprint(w, "\n")
311 }
312 firstTable = false
313 printEndpointsTable(ns, endpointsTables[ns], w, maxPodLength, maxNamespaceLength)
314 }
315 }
316
317 func printEndpointsTable(namespace string, rows []rowEndpoint, w *tabwriter.Writer, maxPodLength int, maxNamespaceLength int) {
318 headers := make([]string, 0)
319 templateString := "%s\t%d\t%s\t%s\n"
320
321 headers = append(headers, namespaceHeader+strings.Repeat(" ", maxNamespaceLength-len(namespaceHeader)))
322 templateString = "%s\t" + templateString
323
324 headers = append(headers, []string{
325 "IP",
326 "PORT",
327 podHeader + strings.Repeat(" ", maxPodLength-len(podHeader)),
328 "SERVICE",
329 }...)
330 fmt.Fprintln(w, strings.Join(headers, "\t"))
331
332 for _, row := range rows {
333 values := []interface{}{
334 namespace + strings.Repeat(" ", maxNamespaceLength-len(namespace)),
335 row.IP,
336 row.Port,
337 row.Pod,
338 row.Service,
339 }
340
341 fmt.Fprintf(w, templateString, values...)
342 }
343 }
344
345 func printEndpointsJSON(endpointsTables map[string][]rowEndpoint, w *tabwriter.Writer) {
346 entries := []rowEndpoint{}
347
348 for _, ns := range sortNamespaceKeys(endpointsTables) {
349 entries = append(entries, endpointsTables[ns]...)
350 }
351
352 b, err := json.MarshalIndent(entries, "", " ")
353 if err != nil {
354 log.Error(err.Error())
355 return
356 }
357 fmt.Fprintf(w, "%s\n", b)
358 }
359
360 func sortNamespaceKeys(endpointsTables map[string][]rowEndpoint) []string {
361 var sortedKeys []string
362 for key := range endpointsTables {
363 sortedKeys = append(sortedKeys, key)
364 }
365 sort.Strings(sortedKeys)
366 return sortedKeys
367 }
368
View as plain text