...

Source file src/k8s.io/kubernetes/pkg/probe/grpc/grpc.go

Documentation: k8s.io/kubernetes/pkg/probe/grpc

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package grpc
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"time"
    24  
    25  	"google.golang.org/grpc"
    26  	"google.golang.org/grpc/codes"
    27  	"google.golang.org/grpc/credentials/insecure"
    28  	grpchealth "google.golang.org/grpc/health/grpc_health_v1"
    29  	"google.golang.org/grpc/metadata"
    30  	"google.golang.org/grpc/status"
    31  	"k8s.io/component-base/version"
    32  	"k8s.io/klog/v2"
    33  	"k8s.io/kubernetes/pkg/probe"
    34  )
    35  
    36  // Prober is an interface that defines the Probe function for doing GRPC readiness/liveness/startup checks.
    37  type Prober interface {
    38  	Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error)
    39  }
    40  
    41  type grpcProber struct {
    42  }
    43  
    44  // New Prober for execute grpc probe
    45  func New() Prober {
    46  	return grpcProber{}
    47  }
    48  
    49  // Probe executes a grpc call to check the liveness/readiness/startup of container.
    50  // Returns the Result status, command output, and errors if any.
    51  // Any failure is considered as a probe failure to mimic grpc_health_probe tool behavior.
    52  // err is always nil
    53  func (p grpcProber) Probe(host, service string, port int, timeout time.Duration) (probe.Result, string, error) {
    54  	v := version.Get()
    55  
    56  	opts := []grpc.DialOption{
    57  		grpc.WithUserAgent(fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor)),
    58  		grpc.WithBlock(),
    59  		grpc.WithTransportCredentials(insecure.NewCredentials()), //credentials are currently not supported
    60  		grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
    61  			return probe.ProbeDialer().DialContext(ctx, "tcp", addr)
    62  		}),
    63  	}
    64  
    65  	ctx, cancel := context.WithTimeout(context.Background(), timeout)
    66  
    67  	defer cancel()
    68  
    69  	addr := net.JoinHostPort(host, fmt.Sprintf("%d", port))
    70  	conn, err := grpc.DialContext(ctx, addr, opts...)
    71  
    72  	if err != nil {
    73  		if err == context.DeadlineExceeded {
    74  			klog.V(4).ErrorS(err, "failed to connect grpc service due to timeout", "addr", addr, "service", service, "timeout", timeout)
    75  			return probe.Failure, fmt.Sprintf("timeout: failed to connect service %q within %v: %+v", addr, timeout, err), nil
    76  		} else {
    77  			klog.V(4).ErrorS(err, "failed to connect grpc service", "service", addr)
    78  			return probe.Failure, fmt.Sprintf("error: failed to connect service at %q: %+v", addr, err), nil
    79  		}
    80  	}
    81  
    82  	defer func() {
    83  		_ = conn.Close()
    84  	}()
    85  
    86  	client := grpchealth.NewHealthClient(conn)
    87  
    88  	resp, err := client.Check(metadata.NewOutgoingContext(ctx, make(metadata.MD)), &grpchealth.HealthCheckRequest{
    89  		Service: service,
    90  	})
    91  
    92  	if err != nil {
    93  		stat, ok := status.FromError(err)
    94  		if ok {
    95  			switch stat.Code() {
    96  			case codes.Unimplemented:
    97  				klog.V(4).ErrorS(err, "server does not implement the grpc health protocol (grpc.health.v1.Health)", "addr", addr, "service", service)
    98  				return probe.Failure, fmt.Sprintf("error: this server does not implement the grpc health protocol (grpc.health.v1.Health): %s", stat.Message()), nil
    99  			case codes.DeadlineExceeded:
   100  				klog.V(4).ErrorS(err, "rpc request not finished within timeout", "addr", addr, "service", service, "timeout", timeout)
   101  				return probe.Failure, fmt.Sprintf("timeout: health rpc did not complete within %v", timeout), nil
   102  			default:
   103  				klog.V(4).ErrorS(err, "rpc probe failed")
   104  			}
   105  		} else {
   106  			klog.V(4).ErrorS(err, "health rpc probe failed")
   107  		}
   108  
   109  		return probe.Failure, fmt.Sprintf("error: health rpc probe failed: %+v", err), nil
   110  	}
   111  
   112  	if resp.GetStatus() != grpchealth.HealthCheckResponse_SERVING {
   113  		return probe.Failure, fmt.Sprintf("service unhealthy (responded with %q)", resp.GetStatus().String()), nil
   114  	}
   115  
   116  	return probe.Success, fmt.Sprintf("service healthy"), nil
   117  }
   118  

View as plain text