...

Source file src/google.golang.org/grpc/xds/csds/csds.go

Documentation: google.golang.org/grpc/xds/csds

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package csds implements features to dump the status (xDS responses) the
    20  // xds_client is using.
    21  //
    22  // Notice: This package is EXPERIMENTAL and may be changed or removed in a later
    23  // release.
    24  package csds
    25  
    26  import (
    27  	"context"
    28  	"fmt"
    29  	"io"
    30  	"sync"
    31  
    32  	"google.golang.org/grpc/codes"
    33  	"google.golang.org/grpc/grpclog"
    34  	internalgrpclog "google.golang.org/grpc/internal/grpclog"
    35  	"google.golang.org/grpc/status"
    36  	"google.golang.org/grpc/xds/internal/xdsclient"
    37  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    38  	"google.golang.org/protobuf/types/known/timestamppb"
    39  
    40  	v3adminpb "github.com/envoyproxy/go-control-plane/envoy/admin/v3"
    41  	v3statusgrpc "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
    42  	v3statuspb "github.com/envoyproxy/go-control-plane/envoy/service/status/v3"
    43  )
    44  
    45  var logger = grpclog.Component("xds")
    46  
    47  const prefix = "[csds-server %p] "
    48  
    49  func prefixLogger(s *ClientStatusDiscoveryServer) *internalgrpclog.PrefixLogger {
    50  	return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, s))
    51  }
    52  
    53  // ClientStatusDiscoveryServer provides an implementation of the Client Status
    54  // Discovery Service (CSDS) for exposing the xDS config of a given client. See
    55  // https://github.com/envoyproxy/envoy/blob/main/api/envoy/service/status/v3/csds.proto.
    56  //
    57  // For more details about the gRPC implementation of CSDS, refer to gRPC A40 at:
    58  // https://github.com/grpc/proposal/blob/master/A40-csds-support.md.
    59  type ClientStatusDiscoveryServer struct {
    60  	logger *internalgrpclog.PrefixLogger
    61  
    62  	mu             sync.Mutex
    63  	xdsClient      xdsclient.XDSClient
    64  	xdsClientClose func()
    65  }
    66  
    67  // NewClientStatusDiscoveryServer returns an implementation of the CSDS server
    68  // that can be registered on a gRPC server.
    69  func NewClientStatusDiscoveryServer() (*ClientStatusDiscoveryServer, error) {
    70  	c, close, err := xdsclient.New()
    71  	if err != nil {
    72  		logger.Warningf("Failed to create xDS client: %v", err)
    73  	}
    74  	s := &ClientStatusDiscoveryServer{xdsClient: c, xdsClientClose: close}
    75  	s.logger = prefixLogger(s)
    76  	s.logger.Infof("Created CSDS server, with xdsClient %p", c)
    77  	return s, nil
    78  }
    79  
    80  // StreamClientStatus implementations interface ClientStatusDiscoveryServiceServer.
    81  func (s *ClientStatusDiscoveryServer) StreamClientStatus(stream v3statusgrpc.ClientStatusDiscoveryService_StreamClientStatusServer) error {
    82  	for {
    83  		req, err := stream.Recv()
    84  		if err == io.EOF {
    85  			return nil
    86  		}
    87  		if err != nil {
    88  			return err
    89  		}
    90  		resp, err := s.buildClientStatusRespForReq(req)
    91  		if err != nil {
    92  			return err
    93  		}
    94  		if err := stream.Send(resp); err != nil {
    95  			return err
    96  		}
    97  	}
    98  }
    99  
   100  // FetchClientStatus implementations interface ClientStatusDiscoveryServiceServer.
   101  func (s *ClientStatusDiscoveryServer) FetchClientStatus(_ context.Context, req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
   102  	return s.buildClientStatusRespForReq(req)
   103  }
   104  
   105  // buildClientStatusRespForReq fetches the status from the client, and returns
   106  // the response to be sent back to xdsclient.
   107  //
   108  // If it returns an error, the error is a status error.
   109  func (s *ClientStatusDiscoveryServer) buildClientStatusRespForReq(req *v3statuspb.ClientStatusRequest) (*v3statuspb.ClientStatusResponse, error) {
   110  	s.mu.Lock()
   111  	defer s.mu.Unlock()
   112  
   113  	if s.xdsClient == nil {
   114  		return &v3statuspb.ClientStatusResponse{}, nil
   115  	}
   116  	// Field NodeMatchers is unsupported, by design
   117  	// https://github.com/grpc/proposal/blob/master/A40-csds-support.md#detail-node-matching.
   118  	if len(req.NodeMatchers) != 0 {
   119  		return nil, status.Errorf(codes.InvalidArgument, "node_matchers are not supported, request contains node_matchers: %v", req.NodeMatchers)
   120  	}
   121  
   122  	dump := s.xdsClient.DumpResources()
   123  	ret := &v3statuspb.ClientStatusResponse{
   124  		Config: []*v3statuspb.ClientConfig{
   125  			{
   126  				Node:              s.xdsClient.BootstrapConfig().NodeProto,
   127  				GenericXdsConfigs: dumpToGenericXdsConfig(dump),
   128  			},
   129  		},
   130  	}
   131  	return ret, nil
   132  }
   133  
   134  // Close cleans up the resources.
   135  func (s *ClientStatusDiscoveryServer) Close() {
   136  	if s.xdsClientClose != nil {
   137  		s.xdsClientClose()
   138  	}
   139  }
   140  
   141  func dumpToGenericXdsConfig(dump map[string]map[string]xdsresource.UpdateWithMD) []*v3statuspb.ClientConfig_GenericXdsConfig {
   142  	var ret []*v3statuspb.ClientConfig_GenericXdsConfig
   143  	for typeURL, updates := range dump {
   144  		for name, update := range updates {
   145  			config := &v3statuspb.ClientConfig_GenericXdsConfig{
   146  				TypeUrl:      typeURL,
   147  				Name:         name,
   148  				VersionInfo:  update.MD.Version,
   149  				XdsConfig:    update.Raw,
   150  				LastUpdated:  timestamppb.New(update.MD.Timestamp),
   151  				ClientStatus: serviceStatusToProto(update.MD.Status),
   152  			}
   153  			if errState := update.MD.ErrState; errState != nil {
   154  				config.ErrorState = &v3adminpb.UpdateFailureState{
   155  					LastUpdateAttempt: timestamppb.New(errState.Timestamp),
   156  					Details:           errState.Err.Error(),
   157  					VersionInfo:       errState.Version,
   158  				}
   159  			}
   160  			ret = append(ret, config)
   161  		}
   162  	}
   163  	return ret
   164  }
   165  
   166  func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
   167  	switch serviceStatus {
   168  	case xdsresource.ServiceStatusUnknown:
   169  		return v3adminpb.ClientResourceStatus_UNKNOWN
   170  	case xdsresource.ServiceStatusRequested:
   171  		return v3adminpb.ClientResourceStatus_REQUESTED
   172  	case xdsresource.ServiceStatusNotExist:
   173  		return v3adminpb.ClientResourceStatus_DOES_NOT_EXIST
   174  	case xdsresource.ServiceStatusACKed:
   175  		return v3adminpb.ClientResourceStatus_ACKED
   176  	case xdsresource.ServiceStatusNACKed:
   177  		return v3adminpb.ClientResourceStatus_NACKED
   178  	default:
   179  		return v3adminpb.ClientResourceStatus_UNKNOWN
   180  	}
   181  }
   182  

View as plain text