1
16
17 package grpc
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "time"
24
25 "google.golang.org/grpc"
26 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/credentials/insecure"
28 grpchealth "google.golang.org/grpc/health/grpc_health_v1"
29 "google.golang.org/grpc/metadata"
30 "google.golang.org/grpc/status"
31 "k8s.io/component-base/version"
32 "k8s.io/klog/v2"
33 "k8s.io/kubernetes/pkg/probe"
34 )
35
36
37 type Prober interface {
38 Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error)
39 }
40
41 type grpcProber struct {
42 }
43
44
45 func New() Prober {
46 return grpcProber{}
47 }
48
49
50
51
52
53 func (p grpcProber) Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error) {
54 v := version.Get()
55
56 opts := []grpc.DialOption{
57 grpc.WithUserAgent(fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor)),
58 grpc.WithBlock(),
59 grpc.WithTransportCredentials(insecure.NewCredentials()),
60 grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
61 return probe.ProbeDialer().DialContext(ctx, "tcp", addr)
62 }),
63 }
64
65 ctx, cancel := context.WithTimeout(context.Background(), timeout)
66
67 defer cancel()
68
69 addr := net.JoinHostPort(host, fmt.Sprintf("%d", port))
70 conn, err := grpc.DialContext(ctx, addr, opts...)
71
72 if err != nil {
73 if err == context.DeadlineExceeded {
74 klog.V(4).ErrorS(err, "failed to connect grpc service due to timeout", "addr", addr, "service", service, "timeout", timeout)
75 return probe.Failure, fmt.Sprintf("timeout: failed to connect service %q within %v: %+v", addr, timeout, err), nil
76 } else {
77 klog.V(4).ErrorS(err, "failed to connect grpc service", "service", addr)
78 return probe.Failure, fmt.Sprintf("error: failed to connect service at %q: %+v", addr, err), nil
79 }
80 }
81
82 defer func() {
83 _ = conn.Close()
84 }()
85
86 client := grpchealth.NewHealthClient(conn)
87
88 resp, err := client.Check(metadata.NewOutgoingContext(ctx, make(metadata.MD)), &grpchealth.HealthCheckRequest{
89 Service: service,
90 })
91
92 if err != nil {
93 stat, ok := status.FromError(err)
94 if ok {
95 switch stat.Code() {
96 case codes.Unimplemented:
97 klog.V(4).ErrorS(err, "server does not implement the grpc health protocol (grpc.health.v1.Health)", "addr", addr, "service", service)
98 return probe.Failure, fmt.Sprintf("error: this server does not implement the grpc health protocol (grpc.health.v1.Health): %s", stat.Message()), nil
99 case codes.DeadlineExceeded:
100 klog.V(4).ErrorS(err, "rpc request not finished within timeout", "addr", addr, "service", service, "timeout", timeout)
101 return probe.Failure, fmt.Sprintf("timeout: health rpc did not complete within %v", timeout), nil
102 default:
103 klog.V(4).ErrorS(err, "rpc probe failed")
104 }
105 } else {
106 klog.V(4).ErrorS(err, "health rpc probe failed")
107 }
108
109 return probe.Failure, fmt.Sprintf("error: health rpc probe failed: %+v", err), nil
110 }
111
112 if resp.GetStatus() != grpchealth.HealthCheckResponse_SERVING {
113 return probe.Failure, fmt.Sprintf("service unhealthy (responded with %q)", resp.GetStatus().String()), nil
114 }
115
116 return probe.Success, fmt.Sprintf("service healthy"), nil
117 }
118
View as plain text