...

Source file src/google.golang.org/grpc/orca/service.go

Documentation: google.golang.org/grpc/orca

     1  /*
     2   *
     3   * Copyright 2022 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package orca
    20  
    21  import (
    22  	"fmt"
    23  	"time"
    24  
    25  	"google.golang.org/grpc"
    26  	"google.golang.org/grpc/codes"
    27  	"google.golang.org/grpc/internal"
    28  	ointernal "google.golang.org/grpc/orca/internal"
    29  	"google.golang.org/grpc/status"
    30  
    31  	v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
    32  	v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
    33  )
    34  
    35  func init() {
    36  	ointernal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
    37  		so.allowAnyMinReportingInterval = true
    38  	}
    39  	internal.ORCAAllowAnyMinReportingInterval = ointernal.AllowAnyMinReportingInterval
    40  }
    41  
    42  // minReportingInterval is the absolute minimum value supported for
    43  // out-of-band metrics reporting from the ORCA service implementation
    44  // provided by the orca package.
    45  const minReportingInterval = 30 * time.Second
    46  
    47  // Service provides an implementation of the OpenRcaService as defined in the
    48  // [ORCA] service protos. Instances of this type must be created via calls to
    49  // Register() or NewService().
    50  //
    51  // Server applications can use the SetXxx() and DeleteXxx() methods to record
    52  // measurements corresponding to backend metrics, which eventually get pushed to
    53  // clients who have initiated the SteamCoreMetrics streaming RPC.
    54  //
    55  // [ORCA]: https://github.com/cncf/xds/blob/main/xds/service/orca/v3/orca.proto
    56  type Service struct {
    57  	v3orcaservicegrpc.UnimplementedOpenRcaServiceServer
    58  
    59  	// Minimum reporting interval, as configured by the user, or the default.
    60  	minReportingInterval time.Duration
    61  
    62  	smProvider ServerMetricsProvider
    63  }
    64  
    65  // ServiceOptions contains options to configure the ORCA service implementation.
    66  type ServiceOptions struct {
    67  	// ServerMetricsProvider is the provider to be used by the service for
    68  	// reporting OOB server metrics to clients.  Typically obtained via
    69  	// NewServerMetricsRecorder.  This field is required.
    70  	ServerMetricsProvider ServerMetricsProvider
    71  
    72  	// MinReportingInterval sets the lower bound for how often out-of-band
    73  	// metrics are reported on the streaming RPC initiated by the client. If
    74  	// unspecified, negative or less than the default value of 30s, the default
    75  	// is used. Clients may request a higher value as part of the
    76  	// StreamCoreMetrics streaming RPC.
    77  	MinReportingInterval time.Duration
    78  
    79  	// Allow a minReportingInterval which is less than the default of 30s.
    80  	// Used for testing purposes only.
    81  	allowAnyMinReportingInterval bool
    82  }
    83  
    84  // A ServerMetricsProvider provides ServerMetrics upon request.
    85  type ServerMetricsProvider interface {
    86  	// ServerMetrics returns the current set of server metrics.  It should
    87  	// return a read-only, immutable copy of the data that is active at the
    88  	// time of the call.
    89  	ServerMetrics() *ServerMetrics
    90  }
    91  
    92  // NewService creates a new ORCA service implementation configured using the
    93  // provided options.
    94  func NewService(opts ServiceOptions) (*Service, error) {
    95  	// The default minimum supported reporting interval value can be overridden
    96  	// for testing purposes through the orca internal package.
    97  	if opts.ServerMetricsProvider == nil {
    98  		return nil, fmt.Errorf("ServerMetricsProvider not specified")
    99  	}
   100  	if !opts.allowAnyMinReportingInterval {
   101  		if opts.MinReportingInterval < 0 || opts.MinReportingInterval < minReportingInterval {
   102  			opts.MinReportingInterval = minReportingInterval
   103  		}
   104  	}
   105  	service := &Service{
   106  		minReportingInterval: opts.MinReportingInterval,
   107  		smProvider:           opts.ServerMetricsProvider,
   108  	}
   109  	return service, nil
   110  }
   111  
   112  // Register creates a new ORCA service implementation configured using the
   113  // provided options and registers the same on the provided grpc Server.
   114  func Register(s *grpc.Server, opts ServiceOptions) error {
   115  	// TODO(https://github.com/cncf/xds/issues/41): replace *grpc.Server with
   116  	// grpc.ServiceRegistrar when possible.
   117  	service, err := NewService(opts)
   118  	if err != nil {
   119  		return err
   120  	}
   121  	v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, service)
   122  	return nil
   123  }
   124  
   125  // determineReportingInterval determines the reporting interval for out-of-band
   126  // metrics. If the reporting interval is not specified in the request, or is
   127  // negative or is less than the configured minimum (via
   128  // ServiceOptions.MinReportingInterval), the latter is used. Else the value from
   129  // the incoming request is used.
   130  func (s *Service) determineReportingInterval(req *v3orcaservicepb.OrcaLoadReportRequest) time.Duration {
   131  	if req.GetReportInterval() == nil {
   132  		return s.minReportingInterval
   133  	}
   134  	dur := req.GetReportInterval().AsDuration()
   135  	if dur < s.minReportingInterval {
   136  		logger.Warningf("Received reporting interval %q is less than configured minimum: %v. Using minimum", dur, s.minReportingInterval)
   137  		return s.minReportingInterval
   138  	}
   139  	return dur
   140  }
   141  
   142  func (s *Service) sendMetricsResponse(stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
   143  	return stream.Send(s.smProvider.ServerMetrics().toLoadReportProto())
   144  }
   145  
   146  // StreamCoreMetrics streams custom backend metrics injected by the server
   147  // application.
   148  func (s *Service) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error {
   149  	ticker := time.NewTicker(s.determineReportingInterval(req))
   150  	defer ticker.Stop()
   151  
   152  	for {
   153  		if err := s.sendMetricsResponse(stream); err != nil {
   154  			return err
   155  		}
   156  		// Send a response containing the currently recorded metrics
   157  		select {
   158  		case <-stream.Context().Done():
   159  			return status.Error(codes.Canceled, "Stream has ended.")
   160  		case <-ticker.C:
   161  		}
   162  	}
   163  }
   164  

View as plain text