...

Source file src/google.golang.org/grpc/orca/producer.go

Documentation: google.golang.org/grpc/orca

     1  /*
     2   * Copyright 2022 gRPC authors.
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  package orca
    18  
    19  import (
    20  	"context"
    21  	"sync"
    22  	"time"
    23  
    24  	"google.golang.org/grpc"
    25  	"google.golang.org/grpc/balancer"
    26  	"google.golang.org/grpc/codes"
    27  	"google.golang.org/grpc/internal/backoff"
    28  	"google.golang.org/grpc/internal/grpcsync"
    29  	"google.golang.org/grpc/orca/internal"
    30  	"google.golang.org/grpc/status"
    31  
    32  	v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
    33  	v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3"
    34  	v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3"
    35  	"google.golang.org/protobuf/types/known/durationpb"
    36  )
    37  
    38  type producerBuilder struct{}
    39  
    40  // Build constructs and returns a producer and its cleanup function
    41  func (*producerBuilder) Build(cci any) (balancer.Producer, func()) {
    42  	p := &producer{
    43  		client:    v3orcaservicegrpc.NewOpenRcaServiceClient(cci.(grpc.ClientConnInterface)),
    44  		intervals: make(map[time.Duration]int),
    45  		listeners: make(map[OOBListener]struct{}),
    46  		backoff:   internal.DefaultBackoffFunc,
    47  	}
    48  	return p, func() {
    49  		<-p.stopped
    50  	}
    51  }
    52  
    53  var producerBuilderSingleton = &producerBuilder{}
    54  
    55  // OOBListener is used to receive out-of-band load reports as they arrive.
    56  type OOBListener interface {
    57  	// OnLoadReport is called when a load report is received.
    58  	OnLoadReport(*v3orcapb.OrcaLoadReport)
    59  }
    60  
    61  // OOBListenerOptions contains options to control how an OOBListener is called.
    62  type OOBListenerOptions struct {
    63  	// ReportInterval specifies how often to request the server to provide a
    64  	// load report.  May be provided less frequently if the server requires a
    65  	// longer interval, or may be provided more frequently if another
    66  	// subscriber requests a shorter interval.
    67  	ReportInterval time.Duration
    68  }
    69  
    70  // RegisterOOBListener registers an out-of-band load report listener on sc.
    71  // Any OOBListener may only be registered once per subchannel at a time.  The
    72  // returned stop function must be called when no longer needed.  Do not
    73  // register a single OOBListener more than once per SubConn.
    74  func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOptions) (stop func()) {
    75  	pr, close := sc.GetOrBuildProducer(producerBuilderSingleton)
    76  	p := pr.(*producer)
    77  
    78  	p.registerListener(l, opts.ReportInterval)
    79  
    80  	// TODO: When we can register for SubConn state updates, automatically call
    81  	// stop() on SHUTDOWN.
    82  
    83  	// If stop is called multiple times, prevent it from having any effect on
    84  	// subsequent calls.
    85  	return grpcsync.OnceFunc(func() {
    86  		p.unregisterListener(l, opts.ReportInterval)
    87  		close()
    88  	})
    89  }
    90  
    91  type producer struct {
    92  	client v3orcaservicegrpc.OpenRcaServiceClient
    93  
    94  	// backoff is called between stream attempts to determine how long to delay
    95  	// to avoid overloading a server experiencing problems.  The attempt count
    96  	// is incremented when stream errors occur and is reset when the stream
    97  	// reports a result.
    98  	backoff func(int) time.Duration
    99  
   100  	mu          sync.Mutex
   101  	intervals   map[time.Duration]int    // map from interval time to count of listeners requesting that time
   102  	listeners   map[OOBListener]struct{} // set of registered listeners
   103  	minInterval time.Duration
   104  	stop        func()        // stops the current run goroutine
   105  	stopped     chan struct{} // closed when the run goroutine exits
   106  }
   107  
   108  // registerListener adds the listener and its requested report interval to the
   109  // producer.
   110  func (p *producer) registerListener(l OOBListener, interval time.Duration) {
   111  	p.mu.Lock()
   112  	defer p.mu.Unlock()
   113  
   114  	p.listeners[l] = struct{}{}
   115  	p.intervals[interval]++
   116  	if len(p.listeners) == 1 || interval < p.minInterval {
   117  		p.minInterval = interval
   118  		p.updateRunLocked()
   119  	}
   120  }
   121  
   122  // registerListener removes the listener and its requested report interval to
   123  // the producer.
   124  func (p *producer) unregisterListener(l OOBListener, interval time.Duration) {
   125  	p.mu.Lock()
   126  	defer p.mu.Unlock()
   127  
   128  	delete(p.listeners, l)
   129  	p.intervals[interval]--
   130  	if p.intervals[interval] == 0 {
   131  		delete(p.intervals, interval)
   132  
   133  		if p.minInterval == interval {
   134  			p.recomputeMinInterval()
   135  			p.updateRunLocked()
   136  		}
   137  	}
   138  }
   139  
   140  // recomputeMinInterval sets p.minInterval to the minimum key's value in
   141  // p.intervals.
   142  func (p *producer) recomputeMinInterval() {
   143  	first := true
   144  	for interval := range p.intervals {
   145  		if first || interval < p.minInterval {
   146  			p.minInterval = interval
   147  			first = false
   148  		}
   149  	}
   150  }
   151  
   152  // updateRunLocked is called whenever the run goroutine needs to be started /
   153  // stopped / restarted due to: 1. the initial listener being registered, 2. the
   154  // final listener being unregistered, or 3. the minimum registered interval
   155  // changing.
   156  func (p *producer) updateRunLocked() {
   157  	if p.stop != nil {
   158  		p.stop()
   159  		p.stop = nil
   160  	}
   161  	if len(p.listeners) > 0 {
   162  		var ctx context.Context
   163  		ctx, p.stop = context.WithCancel(context.Background())
   164  		p.stopped = make(chan struct{})
   165  		go p.run(ctx, p.stopped, p.minInterval)
   166  	}
   167  }
   168  
   169  // run manages the ORCA OOB stream on the subchannel.
   170  func (p *producer) run(ctx context.Context, done chan struct{}, interval time.Duration) {
   171  	defer close(done)
   172  
   173  	runStream := func() error {
   174  		resetBackoff, err := p.runStream(ctx, interval)
   175  		if status.Code(err) == codes.Unimplemented {
   176  			// Unimplemented; do not retry.
   177  			logger.Error("Server doesn't support ORCA OOB load reporting protocol; not listening for load reports.")
   178  			return err
   179  		}
   180  		// Retry for all other errors.
   181  		if code := status.Code(err); code != codes.Unavailable && code != codes.Canceled {
   182  			// TODO: Unavailable and Canceled should also ideally log an error,
   183  			// but for now we receive them when shutting down the ClientConn
   184  			// (Unavailable if the stream hasn't started yet, and Canceled if it
   185  			// happens mid-stream).  Once we can determine the state or ensure
   186  			// the producer is stopped before the stream ends, we can log an
   187  			// error when it's not a natural shutdown.
   188  			logger.Error("Received unexpected stream error:", err)
   189  		}
   190  		if resetBackoff {
   191  			return backoff.ErrResetBackoff
   192  		}
   193  		return nil
   194  	}
   195  	backoff.RunF(ctx, runStream, p.backoff)
   196  }
   197  
   198  // runStream runs a single stream on the subchannel and returns the resulting
   199  // error, if any, and whether or not the run loop should reset the backoff
   200  // timer to zero or advance it.
   201  func (p *producer) runStream(ctx context.Context, interval time.Duration) (resetBackoff bool, err error) {
   202  	streamCtx, cancel := context.WithCancel(ctx)
   203  	defer cancel()
   204  	stream, err := p.client.StreamCoreMetrics(streamCtx, &v3orcaservicepb.OrcaLoadReportRequest{
   205  		ReportInterval: durationpb.New(interval),
   206  	})
   207  	if err != nil {
   208  		return false, err
   209  	}
   210  
   211  	for {
   212  		report, err := stream.Recv()
   213  		if err != nil {
   214  			return resetBackoff, err
   215  		}
   216  		resetBackoff = true
   217  		p.mu.Lock()
   218  		for l := range p.listeners {
   219  			l.OnLoadReport(report)
   220  		}
   221  		p.mu.Unlock()
   222  	}
   223  }
   224  

View as plain text