1 package api
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/golang/protobuf/ptypes/duration"
10 "github.com/linkerd/linkerd2/controller/k8s"
11 pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
12 "github.com/linkerd/linkerd2/pkg/prometheus"
13 pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
14 "github.com/linkerd/linkerd2/viz/metrics-api/util"
15 promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
16 log "github.com/sirupsen/logrus"
17 "google.golang.org/grpc"
18 "gopkg.in/yaml.v2"
19 corev1 "k8s.io/api/core/v1"
20 "k8s.io/apimachinery/pkg/labels"
21 )
22
23
24 type Server interface {
25 pb.ApiServer
26 }
27
28 type grpcServer struct {
29 pb.UnimplementedApiServer
30 prometheusAPI promv1.API
31 k8sAPI *k8s.API
32 controllerNamespace string
33 clusterDomain string
34 ignoredNamespaces []string
35 }
36
37 type podReport struct {
38 lastReport time.Time
39 processStartTimeSeconds time.Time
40 }
41
42 const (
43 podQuery = "max(process_start_time_seconds{%s}) by (pod, namespace)"
44 k8sClientSubsystemName = "kubernetes"
45 k8sClientCheckDescription = "linkerd viz can talk to Kubernetes"
46 promClientSubsystemName = "prometheus"
47 promClientCheckDescription = "linkerd viz can talk to Prometheus"
48 )
49
50
51
52 func NewGrpcServer(
53 promAPI promv1.API,
54 k8sAPI *k8s.API,
55 controllerNamespace string,
56 clusterDomain string,
57 ignoredNamespaces []string,
58 ) *grpc.Server {
59
60 server := &grpcServer{
61 prometheusAPI: promAPI,
62 k8sAPI: k8sAPI,
63 controllerNamespace: controllerNamespace,
64 clusterDomain: clusterDomain,
65 ignoredNamespaces: ignoredNamespaces,
66 }
67
68 s := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
69 pb.RegisterApiServer(s, server)
70
71 return s
72 }
73
74 func (s *grpcServer) ListPods(ctx context.Context, req *pb.ListPodsRequest) (*pb.ListPodsResponse, error) {
75 log.Debugf("ListPods request: %+v", req)
76
77 targetOwner := req.GetSelector().GetResource()
78
79
80
81 reports := make(map[string]podReport)
82
83 labelSelector := labels.Everything()
84 if s := req.GetSelector().GetLabelSelector(); s != "" {
85 var err error
86 labelSelector, err = labels.Parse(s)
87 if err != nil {
88 return nil, fmt.Errorf("invalid label selector %q: %w", s, err)
89 }
90 }
91
92 nsQuery := ""
93 namespace := ""
94 if targetOwner.GetNamespace() != "" {
95 namespace = targetOwner.GetNamespace()
96 } else if targetOwner.GetType() == pkgK8s.Namespace {
97 namespace = targetOwner.GetName()
98 }
99 if namespace != "" {
100 nsQuery = fmt.Sprintf("namespace=\"%s\"", namespace)
101 }
102 processStartTimeQuery := fmt.Sprintf(podQuery, nsQuery)
103
104
105 vec, err := s.queryProm(ctx, processStartTimeQuery)
106 if err != nil && !errors.Is(err, ErrNoPrometheusInstance) {
107 return nil, err
108 }
109
110 for _, sample := range vec {
111 pod := string(sample.Metric["pod"])
112 timestamp := sample.Timestamp
113
114 reports[pod] = podReport{
115 lastReport: time.Unix(0, int64(timestamp)*int64(time.Millisecond)),
116 processStartTimeSeconds: time.Unix(0, int64(sample.Value)*int64(time.Second)),
117 }
118 }
119
120 var pods []*corev1.Pod
121 if namespace != "" {
122 pods, err = s.k8sAPI.Pod().Lister().Pods(namespace).List(labelSelector)
123 } else {
124 pods, err = s.k8sAPI.Pod().Lister().List(labelSelector)
125 }
126
127 if err != nil {
128 return nil, err
129 }
130 podList := make([]*pb.Pod, 0)
131
132 for _, pod := range pods {
133 if s.shouldIgnore(pod) {
134 continue
135 }
136
137 ownerKind, ownerName := s.k8sAPI.GetOwnerKindAndName(ctx, pod, false)
138
139 if targetOwner.GetNamespace() != "" && targetOwner.GetNamespace() != pod.GetNamespace() {
140 continue
141 }
142 if targetOwner.GetType() != "" && targetOwner.GetType() != ownerKind {
143 continue
144 }
145 if targetOwner.GetName() != "" && targetOwner.GetName() != ownerName {
146 continue
147 }
148
149 updated, added := reports[pod.Name]
150
151 item := util.K8sPodToPublicPod(*pod, ownerKind, ownerName)
152 item.Added = added
153
154 if added {
155 since := time.Since(updated.lastReport)
156 item.SinceLastReport = &duration.Duration{
157 Seconds: int64(since / time.Second),
158 Nanos: int32(since % time.Second),
159 }
160 sinceStarting := time.Since(updated.processStartTimeSeconds)
161 item.Uptime = &duration.Duration{
162 Seconds: int64(sinceStarting / time.Second),
163 Nanos: int32(sinceStarting % time.Second),
164 }
165 }
166
167 podList = append(podList, item)
168 }
169
170 rsp := pb.ListPodsResponse{Pods: podList}
171
172 log.Debugf("ListPods response: %s", rsp.String())
173
174 return &rsp, nil
175 }
176
177 func (s *grpcServer) SelfCheck(ctx context.Context, in *pb.SelfCheckRequest) (*pb.SelfCheckResponse, error) {
178 k8sClientCheck := &pb.CheckResult{
179 SubsystemName: k8sClientSubsystemName,
180 CheckDescription: k8sClientCheckDescription,
181 Status: pb.CheckStatus_OK,
182 }
183 _, err := s.k8sAPI.Pod().Lister().List(labels.Everything())
184 if err != nil {
185 k8sClientCheck.Status = pb.CheckStatus_ERROR
186 k8sClientCheck.FriendlyMessageToUser = fmt.Sprintf("Error calling the Kubernetes API: %s", err)
187 }
188
189 response := &pb.SelfCheckResponse{
190 Results: []*pb.CheckResult{
191 k8sClientCheck,
192 },
193 }
194
195 if s.prometheusAPI != nil {
196 promClientCheck := &pb.CheckResult{
197 SubsystemName: promClientSubsystemName,
198 CheckDescription: promClientCheckDescription,
199 Status: pb.CheckStatus_OK,
200 }
201 _, err = s.queryProm(ctx, fmt.Sprintf(podQuery, ""))
202 if err != nil {
203 promClientCheck.Status = pb.CheckStatus_ERROR
204 promClientCheck.FriendlyMessageToUser = fmt.Sprintf("Error calling Prometheus from the control plane: %s", err)
205 }
206
207 response.Results = append(response.Results, promClientCheck)
208 }
209
210 return response, nil
211 }
212
213 func (s *grpcServer) shouldIgnore(pod *corev1.Pod) bool {
214 for _, namespace := range s.ignoredNamespaces {
215 if pod.Namespace == namespace {
216 return true
217 }
218 }
219 return false
220 }
221
222 func (s *grpcServer) ListServices(ctx context.Context, req *pb.ListServicesRequest) (*pb.ListServicesResponse, error) {
223 log.Debugf("ListServices request: %+v", req)
224
225 services, err := s.k8sAPI.GetServices(req.Namespace, "")
226 if err != nil {
227 return nil, err
228 }
229
230 svcs := make([]*pb.Service, 0)
231 for _, svc := range services {
232 svcs = append(svcs, &pb.Service{
233 Name: svc.GetName(),
234 Namespace: svc.GetNamespace(),
235 })
236 }
237
238 return &pb.ListServicesResponse{Services: svcs}, nil
239 }
240
241
242
243
244
245 func (s *grpcServer) validateTimeWindow(ctx context.Context, window string) error {
246 config, err := s.prometheusAPI.Config(ctx)
247 if err != nil {
248 return nil
249 }
250
251 type PrometheusConfig struct {
252 Global map[string]string
253 }
254
255 var prom PrometheusConfig
256 err = yaml.Unmarshal([]byte(config.YAML), &prom)
257 if err != nil {
258 return nil
259 }
260
261 scrape_interval_str, found := prom.Global["scrape_interval"]
262 if !found {
263 return nil
264 }
265
266 scrape_interval, err := time.ParseDuration(scrape_interval_str)
267 if err != nil {
268 return nil
269 }
270
271 t, err := time.ParseDuration(window)
272 if err != nil {
273 return err
274 }
275
276 if t < scrape_interval {
277 return fmt.Errorf("time window (%s) must be at least as long as the Prometheus scrape interval (%s)", window, scrape_interval)
278 }
279 return nil
280 }
281
View as plain text