1 package cmd
2
3 import (
4 "bufio"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "io"
10 "net"
11 "os"
12 "sort"
13 "strings"
14 "time"
15
16 sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
17 pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
18 "github.com/linkerd/linkerd2/pkg/healthcheck"
19 "github.com/linkerd/linkerd2/pkg/k8s"
20 "github.com/linkerd/linkerd2/pkg/profiles"
21 "github.com/linkerd/linkerd2/pkg/protohttp"
22 "github.com/linkerd/linkerd2/viz/pkg/api"
23 hc "github.com/linkerd/linkerd2/viz/pkg/healthcheck"
24 vizutil "github.com/linkerd/linkerd2/viz/pkg/util"
25 pb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
26 "github.com/linkerd/linkerd2/viz/tap/pkg"
27 log "github.com/sirupsen/logrus"
28 "github.com/spf13/cobra"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/util/validation"
31 "sigs.k8s.io/yaml"
32 )
33
34 type profileOptions struct {
35 name string
36 namespace string
37 tap string
38 tapDuration time.Duration
39 tapRouteLimit uint
40 output string
41 }
42
43 func newProfileOptions() *profileOptions {
44 return &profileOptions{
45 tapDuration: 5 * time.Second,
46 tapRouteLimit: 20,
47 output: "yaml",
48 }
49 }
50
51 func (options *profileOptions) validate() error {
52 if options.tap == "" {
53 return errors.New("The --tap flag must be specified")
54 }
55
56
57 if errs := validation.IsDNS1035Label(options.name); len(errs) != 0 {
58 return fmt.Errorf("invalid service %q: %v", options.name, errs)
59 }
60
61
62 if errs := validation.IsDNS1123Label(options.namespace); len(errs) != 0 {
63 return fmt.Errorf("invalid namespace %q: %v", options.namespace, errs)
64 }
65 return nil
66 }
67
68
69
70 func newCmdProfile() *cobra.Command {
71 options := newProfileOptions()
72
73 cmd := &cobra.Command{
74 Use: "profile [flags] --tap resource (SERVICE)",
75 Short: "Output service profile config for Kubernetes based off tap data",
76 Long: "Output service profile config for Kubernetes based off tap data.",
77 Example: ` # Generate a profile by watching live traffic.
78 linkerd viz profile -n emojivoto web-svc --tap deploy/web --tap-duration 10s --tap-route-limit 5
79 `,
80 Args: cobra.ExactArgs(1),
81 ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
82
83
84
85 if len(args) > 0 {
86 return nil, cobra.ShellCompDirectiveError
87 }
88
89 k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
90 if err != nil {
91 return nil, cobra.ShellCompDirectiveError
92 }
93
94 if options.namespace == "" {
95 options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
96 }
97
98 cc := k8s.NewCommandCompletion(k8sAPI, options.namespace)
99
100
101
102 results, err := cc.Complete([]string{k8s.Service}, toComplete)
103 if err != nil {
104 return nil, cobra.ShellCompDirectiveError
105 }
106
107 return results, cobra.ShellCompDirectiveDefault
108 },
109 RunE: func(cmd *cobra.Command, args []string) error {
110 api.CheckClientOrExit(hc.VizOptions{
111 Options: &healthcheck.Options{
112 ControlPlaneNamespace: controlPlaneNamespace,
113 KubeConfig: kubeconfigPath,
114 Impersonate: impersonate,
115 ImpersonateGroup: impersonateGroup,
116 KubeContext: kubeContext,
117 APIAddr: apiAddr,
118 },
119 VizNamespaceOverride: vizNamespace,
120 })
121 if options.namespace == "" {
122 options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
123 }
124 options.name = args[0]
125 clusterDomain := "cluster.local"
126 var k8sAPI *k8s.KubernetesAPI
127 err := options.validate()
128 if err != nil {
129 return err
130 }
131 k8sAPI, err = k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
132 if err != nil {
133 return err
134 }
135 _, values, err := healthcheck.FetchCurrentConfiguration(cmd.Context(), k8sAPI, controlPlaneNamespace)
136 if err != nil {
137 return err
138 }
139 if cd := values.ClusterDomain; cd != "" {
140 clusterDomain = cd
141 }
142 return renderTapOutputProfile(cmd.Context(), k8sAPI, options.tap, options.namespace, options.name, clusterDomain, options.tapDuration, int(options.tapRouteLimit), options.output, os.Stdout)
143 },
144 }
145 cmd.PersistentFlags().StringVar(&options.tap, "tap", options.tap, "Output a service profile based on tap data for the given target resource")
146 cmd.PersistentFlags().DurationVar(&options.tapDuration, "tap-duration", options.tapDuration, "Duration over which tap data is collected (for example: \"10s\", \"1m\", \"10m\")")
147 cmd.PersistentFlags().UintVar(&options.tapRouteLimit, "tap-route-limit", options.tapRouteLimit, "Max number of routes to add to the profile")
148 cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace, "Namespace of the service")
149 cmd.PersistentFlags().StringVarP(&options.output, "output", "o", options.output, "Output format. One of: yaml, json")
150
151 pkgcmd.ConfigureNamespaceFlagCompletion(
152 cmd, []string{"namespace"},
153 kubeconfigPath, impersonate, impersonateGroup, kubeContext)
154 return cmd
155 }
156
157
158
159
160 func renderTapOutputProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapResource, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int, format string, w io.Writer) error {
161 requestParams := pkg.TapRequestParams{
162 Resource: tapResource,
163 Namespace: namespace,
164 }
165 log.Debugf("Running `linkerd tap %s --namespace %s`", tapResource, namespace)
166 req, err := pkg.BuildTapByResourceRequest(requestParams)
167 if err != nil {
168 return err
169 }
170 profile, err := tapToServiceProfile(ctx, k8sAPI, req, namespace, name, clusterDomain, tapDuration, routeLimit)
171 if err != nil {
172 return err
173 }
174 var output []byte
175 if format == "yaml" {
176 output, err = yaml.Marshal(profile)
177 } else if format == "json" {
178 output, err = json.Marshal(profile)
179 } else {
180 return errors.New("output format must be one of yaml or json")
181 }
182 if err != nil {
183 return fmt.Errorf("Error writing Service Profile: %w", err)
184 }
185 w.Write(output)
186 return nil
187 }
188
189 func tapToServiceProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequest, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) {
190 profile := sp.ServiceProfile{
191 ObjectMeta: metav1.ObjectMeta{
192 Name: fmt.Sprintf("%s.%s.svc.%s", name, namespace, clusterDomain),
193 Namespace: namespace,
194 },
195 TypeMeta: profiles.ServiceProfileMeta,
196 }
197 ctxWithTime, cancel := context.WithTimeout(ctx, tapDuration)
198 defer cancel()
199 reader, body, err := pkg.Reader(ctxWithTime, k8sAPI, tapReq)
200 if err != nil {
201 return profile, err
202 }
203 defer body.Close()
204 routes := routeSpecFromTap(reader, routeLimit)
205 profile.Spec.Routes = routes
206 return profile, nil
207 }
208
209 func routeSpecFromTap(tapByteStream *bufio.Reader, routeLimit int) []*sp.RouteSpec {
210 routes := make([]*sp.RouteSpec, 0)
211 routesMap := make(map[string]*sp.RouteSpec)
212 for {
213 log.Debug("Waiting for data...")
214 event := pb.TapEvent{}
215 err := protohttp.FromByteStreamToProtocolBuffers(tapByteStream, &event)
216 if err != nil {
217
218 var e net.Error
219 if !errors.Is(err, io.EOF) &&
220 !(errors.As(err, &e) && e.Timeout()) &&
221 !errors.Is(err, context.DeadlineExceeded) &&
222 !strings.HasSuffix(err.Error(), pkg.ErrClosedResponseBody) {
223 fmt.Fprintln(os.Stderr, err)
224 }
225 break
226 }
227 routeSpec := getPathDataFromTap(&event)
228 log.Debugf("Created route spec: %v", routeSpec)
229 if routeSpec != nil {
230 routesMap[routeSpec.Name] = routeSpec
231 if len(routesMap) >= routeLimit {
232 break
233 }
234 }
235 }
236 for _, path := range sortMapKeys(routesMap) {
237 routes = append(routes, routesMap[path])
238 }
239 return routes
240 }
241
242 func sortMapKeys(m map[string]*sp.RouteSpec) (keys []string) {
243 for key := range m {
244 keys = append(keys, key)
245 }
246 sort.Strings(keys)
247 return
248 }
249
250 func getPathDataFromTap(event *pb.TapEvent) *sp.RouteSpec {
251 if event.GetProxyDirection() != pb.TapEvent_INBOUND {
252 return nil
253 }
254 switch ev := event.GetHttp().GetEvent().(type) {
255 case *pb.TapEvent_Http_RequestInit_:
256 path := ev.RequestInit.GetPath()
257 if path == "/" {
258 return nil
259 }
260
261 return profiles.MkRouteSpec(
262 path,
263 profiles.PathToRegex(path),
264 vizutil.HTTPMethodToString(ev.RequestInit.GetMethod()),
265 nil)
266 default:
267 return nil
268 }
269 }
270
View as plain text