1 /* 2 * 3 * Copyright 2022 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package orca 20 21 import ( 22 "fmt" 23 "time" 24 25 "google.golang.org/grpc" 26 "google.golang.org/grpc/codes" 27 "google.golang.org/grpc/internal" 28 ointernal "google.golang.org/grpc/orca/internal" 29 "google.golang.org/grpc/status" 30 31 v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3" 32 v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3" 33 ) 34 35 func init() { 36 ointernal.AllowAnyMinReportingInterval = func(so *ServiceOptions) { 37 so.allowAnyMinReportingInterval = true 38 } 39 internal.ORCAAllowAnyMinReportingInterval = ointernal.AllowAnyMinReportingInterval 40 } 41 42 // minReportingInterval is the absolute minimum value supported for 43 // out-of-band metrics reporting from the ORCA service implementation 44 // provided by the orca package. 45 const minReportingInterval = 30 * time.Second 46 47 // Service provides an implementation of the OpenRcaService as defined in the 48 // [ORCA] service protos. Instances of this type must be created via calls to 49 // Register() or NewService(). 50 // 51 // Server applications can use the SetXxx() and DeleteXxx() methods to record 52 // measurements corresponding to backend metrics, which eventually get pushed to 53 // clients who have initiated the SteamCoreMetrics streaming RPC. 54 // 55 // [ORCA]: https://github.com/cncf/xds/blob/main/xds/service/orca/v3/orca.proto 56 type Service struct { 57 v3orcaservicegrpc.UnimplementedOpenRcaServiceServer 58 59 // Minimum reporting interval, as configured by the user, or the default. 60 minReportingInterval time.Duration 61 62 smProvider ServerMetricsProvider 63 } 64 65 // ServiceOptions contains options to configure the ORCA service implementation. 66 type ServiceOptions struct { 67 // ServerMetricsProvider is the provider to be used by the service for 68 // reporting OOB server metrics to clients. Typically obtained via 69 // NewServerMetricsRecorder. This field is required. 70 ServerMetricsProvider ServerMetricsProvider 71 72 // MinReportingInterval sets the lower bound for how often out-of-band 73 // metrics are reported on the streaming RPC initiated by the client. If 74 // unspecified, negative or less than the default value of 30s, the default 75 // is used. Clients may request a higher value as part of the 76 // StreamCoreMetrics streaming RPC. 77 MinReportingInterval time.Duration 78 79 // Allow a minReportingInterval which is less than the default of 30s. 80 // Used for testing purposes only. 81 allowAnyMinReportingInterval bool 82 } 83 84 // A ServerMetricsProvider provides ServerMetrics upon request. 85 type ServerMetricsProvider interface { 86 // ServerMetrics returns the current set of server metrics. It should 87 // return a read-only, immutable copy of the data that is active at the 88 // time of the call. 89 ServerMetrics() *ServerMetrics 90 } 91 92 // NewService creates a new ORCA service implementation configured using the 93 // provided options. 94 func NewService(opts ServiceOptions) (*Service, error) { 95 // The default minimum supported reporting interval value can be overridden 96 // for testing purposes through the orca internal package. 97 if opts.ServerMetricsProvider == nil { 98 return nil, fmt.Errorf("ServerMetricsProvider not specified") 99 } 100 if !opts.allowAnyMinReportingInterval { 101 if opts.MinReportingInterval < 0 || opts.MinReportingInterval < minReportingInterval { 102 opts.MinReportingInterval = minReportingInterval 103 } 104 } 105 service := &Service{ 106 minReportingInterval: opts.MinReportingInterval, 107 smProvider: opts.ServerMetricsProvider, 108 } 109 return service, nil 110 } 111 112 // Register creates a new ORCA service implementation configured using the 113 // provided options and registers the same on the provided grpc Server. 114 func Register(s *grpc.Server, opts ServiceOptions) error { 115 // TODO(https://github.com/cncf/xds/issues/41): replace *grpc.Server with 116 // grpc.ServiceRegistrar when possible. 117 service, err := NewService(opts) 118 if err != nil { 119 return err 120 } 121 v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, service) 122 return nil 123 } 124 125 // determineReportingInterval determines the reporting interval for out-of-band 126 // metrics. If the reporting interval is not specified in the request, or is 127 // negative or is less than the configured minimum (via 128 // ServiceOptions.MinReportingInterval), the latter is used. Else the value from 129 // the incoming request is used. 130 func (s *Service) determineReportingInterval(req *v3orcaservicepb.OrcaLoadReportRequest) time.Duration { 131 if req.GetReportInterval() == nil { 132 return s.minReportingInterval 133 } 134 dur := req.GetReportInterval().AsDuration() 135 if dur < s.minReportingInterval { 136 logger.Warningf("Received reporting interval %q is less than configured minimum: %v. Using minimum", dur, s.minReportingInterval) 137 return s.minReportingInterval 138 } 139 return dur 140 } 141 142 func (s *Service) sendMetricsResponse(stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error { 143 return stream.Send(s.smProvider.ServerMetrics().toLoadReportProto()) 144 } 145 146 // StreamCoreMetrics streams custom backend metrics injected by the server 147 // application. 148 func (s *Service) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error { 149 ticker := time.NewTicker(s.determineReportingInterval(req)) 150 defer ticker.Stop() 151 152 for { 153 if err := s.sendMetricsResponse(stream); err != nil { 154 return err 155 } 156 // Send a response containing the currently recorded metrics 157 select { 158 case <-stream.Context().Done(): 159 return status.Error(codes.Canceled, "Stream has ended.") 160 case <-ticker.C: 161 } 162 } 163 } 164