...

Source file src/google.golang.org/grpc/orca/service_test.go

Documentation: google.golang.org/grpc/orca

     1  /*
     2   *
     3   * Copyright 2022 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package orca_test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"sync"
    25  	"testing"
    26  	"time"
    27  
    28  	"github.com/google/go-cmp/cmp"
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/credentials/insecure"
    31  	"google.golang.org/grpc/internal/pretty"
    32  	"google.golang.org/grpc/internal/testutils"
    33  	"google.golang.org/grpc/orca"
    34  	"google.golang.org/grpc/orca/internal"
    35  	"google.golang.org/protobuf/proto"
    36  	"google.golang.org/protobuf/types/known/durationpb"
    37  
    38  	v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
    39  	v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
    40  	v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
    41  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    42  	testpb "google.golang.org/grpc/interop/grpc_testing"
    43  )
    44  
    45  const requestsMetricKey = "test-service-requests"
    46  
    47  // An implementation of grpc_testing.TestService for the purpose of this test.
    48  // We cannot use the StubServer approach here because we need to register the
    49  // OpenRCAService as well on the same gRPC server.
    50  type testServiceImpl struct {
    51  	mu       sync.Mutex
    52  	requests int64
    53  
    54  	testgrpc.TestServiceServer
    55  	smr orca.ServerMetricsRecorder
    56  }
    57  
    58  func (t *testServiceImpl) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
    59  	t.mu.Lock()
    60  	t.requests++
    61  	t.mu.Unlock()
    62  
    63  	t.smr.SetNamedUtilization(requestsMetricKey, float64(t.requests)*0.01)
    64  	t.smr.SetCPUUtilization(50.0)
    65  	t.smr.SetMemoryUtilization(0.9)
    66  	t.smr.SetApplicationUtilization(1.2)
    67  	return &testpb.SimpleResponse{}, nil
    68  }
    69  
    70  func (t *testServiceImpl) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
    71  	t.smr.DeleteNamedUtilization(requestsMetricKey)
    72  	t.smr.SetCPUUtilization(0)
    73  	t.smr.SetMemoryUtilization(0)
    74  	t.smr.DeleteApplicationUtilization()
    75  	return &testpb.Empty{}, nil
    76  }
    77  
    78  // TestE2E_CustomBackendMetrics_OutOfBand tests the injection of out-of-band
    79  // custom backend metrics from the server application, and verifies that
    80  // expected load reports are received at the client.
    81  //
    82  // TODO: Change this test to use the client API, when ready, to read the
    83  // out-of-band metrics pushed by the server.
    84  func (s) TestE2E_CustomBackendMetrics_OutOfBand(t *testing.T) {
    85  	lis, err := testutils.LocalTCPListener()
    86  	if err != nil {
    87  		t.Fatal(err)
    88  	}
    89  
    90  	// Override the min reporting interval in the internal package.
    91  	const shortReportingInterval = 10 * time.Millisecond
    92  	smr := orca.NewServerMetricsRecorder()
    93  	opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval, ServerMetricsProvider: smr}
    94  	internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts)
    95  
    96  	// Register the OpenRCAService with a very short metrics reporting interval.
    97  	s := grpc.NewServer()
    98  	if err := orca.Register(s, opts); err != nil {
    99  		t.Fatalf("orca.EnableOutOfBandMetricsReportingForTesting() failed: %v", err)
   100  	}
   101  
   102  	// Register the test service implementation on the same grpc server, and start serving.
   103  	testgrpc.RegisterTestServiceServer(s, &testServiceImpl{smr: smr})
   104  	go s.Serve(lis)
   105  	defer s.Stop()
   106  	t.Logf("Started gRPC server at %s...", lis.Addr().String())
   107  
   108  	// Dial the test server.
   109  	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   110  	if err != nil {
   111  		t.Fatalf("grpc.NewClient(%s) failed: %v", lis.Addr().String(), err)
   112  	}
   113  	defer cc.Close()
   114  
   115  	// Spawn a goroutine which sends 20 unary RPCs to the test server. This
   116  	// will trigger the injection of custom backend metrics from the
   117  	// testServiceImpl.
   118  	const numRequests = 20
   119  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   120  	defer cancel()
   121  	testStub := testgrpc.NewTestServiceClient(cc)
   122  	errCh := make(chan error, 1)
   123  	go func() {
   124  		for i := 0; i < numRequests; i++ {
   125  			if _, err := testStub.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
   126  				errCh <- fmt.Errorf("UnaryCall failed: %v", err)
   127  				return
   128  			}
   129  			time.Sleep(time.Millisecond)
   130  		}
   131  		errCh <- nil
   132  	}()
   133  
   134  	// Start the server streaming RPC to receive custom backend metrics.
   135  	oobStub := v3orcaservicegrpc.NewOpenRcaServiceClient(cc)
   136  	stream, err := oobStub.StreamCoreMetrics(ctx, &v3orcaservicepb.OrcaLoadReportRequest{ReportInterval: durationpb.New(shortReportingInterval)})
   137  	if err != nil {
   138  		t.Fatalf("Failed to create a stream for out-of-band metrics")
   139  	}
   140  
   141  	// Wait for the server to push metrics which indicate the completion of all
   142  	// the unary RPCs made from the above goroutine.
   143  	for {
   144  		select {
   145  		case <-ctx.Done():
   146  			t.Fatal("Timeout when waiting for out-of-band custom backend metrics to match expected values")
   147  		case err := <-errCh:
   148  			if err != nil {
   149  				t.Fatal(err)
   150  			}
   151  		default:
   152  		}
   153  
   154  		wantProto := &v3orcapb.OrcaLoadReport{
   155  			CpuUtilization:         50.0,
   156  			MemUtilization:         0.9,
   157  			ApplicationUtilization: 1.2,
   158  			Utilization:            map[string]float64{requestsMetricKey: numRequests * 0.01},
   159  		}
   160  		gotProto, err := stream.Recv()
   161  		if err != nil {
   162  			t.Fatalf("Recv() failed: %v", err)
   163  		}
   164  		if !cmp.Equal(gotProto, wantProto, cmp.Comparer(proto.Equal)) {
   165  			t.Logf("Received load report from stream: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(wantProto))
   166  			continue
   167  		}
   168  		// This means that we received the metrics which we expected.
   169  		break
   170  	}
   171  
   172  	// The EmptyCall RPC is expected to delete earlier injected metrics.
   173  	if _, err := testStub.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   174  		t.Fatalf("EmptyCall failed: %v", err)
   175  	}
   176  	// Wait for the server to push empty metrics which indicate the processing
   177  	// of the above EmptyCall RPC.
   178  	for {
   179  		select {
   180  		case <-ctx.Done():
   181  			t.Fatal("Timeout when waiting for out-of-band custom backend metrics to match expected values")
   182  		default:
   183  		}
   184  
   185  		wantProto := &v3orcapb.OrcaLoadReport{}
   186  		gotProto, err := stream.Recv()
   187  		if err != nil {
   188  			t.Fatalf("Recv() failed: %v", err)
   189  		}
   190  		if !cmp.Equal(gotProto, wantProto, cmp.Comparer(proto.Equal)) {
   191  			t.Logf("Received load report from stream: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(wantProto))
   192  			continue
   193  		}
   194  		// This means that we received the metrics which we expected.
   195  		break
   196  	}
   197  }
   198  

View as plain text