...

Source file src/google.golang.org/grpc/xds/internal/xdsclient/requests_counter.go

Documentation: google.golang.org/grpc/xds/internal/xdsclient

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package xdsclient
    20  
    21  import (
    22  	"fmt"
    23  	"sync"
    24  	"sync/atomic"
    25  )
    26  
    27  type clusterNameAndServiceName struct {
    28  	clusterName, edsServiceName string
    29  }
    30  
    31  type clusterRequestsCounter struct {
    32  	mu       sync.Mutex
    33  	clusters map[clusterNameAndServiceName]*ClusterRequestsCounter
    34  }
    35  
    36  var src = &clusterRequestsCounter{
    37  	clusters: make(map[clusterNameAndServiceName]*ClusterRequestsCounter),
    38  }
    39  
    40  // ClusterRequestsCounter is used to track the total inflight requests for a
    41  // service with the provided name.
    42  type ClusterRequestsCounter struct {
    43  	ClusterName    string
    44  	EDSServiceName string
    45  	numRequests    uint32
    46  }
    47  
    48  // GetClusterRequestsCounter returns the ClusterRequestsCounter with the
    49  // provided serviceName. If one does not exist, it creates it.
    50  func GetClusterRequestsCounter(clusterName, edsServiceName string) *ClusterRequestsCounter {
    51  	src.mu.Lock()
    52  	defer src.mu.Unlock()
    53  	k := clusterNameAndServiceName{
    54  		clusterName:    clusterName,
    55  		edsServiceName: edsServiceName,
    56  	}
    57  	c, ok := src.clusters[k]
    58  	if !ok {
    59  		c = &ClusterRequestsCounter{ClusterName: clusterName}
    60  		src.clusters[k] = c
    61  	}
    62  	return c
    63  }
    64  
    65  // StartRequest starts a request for a cluster, incrementing its number of
    66  // requests by 1. Returns an error if the max number of requests is exceeded.
    67  func (c *ClusterRequestsCounter) StartRequest(max uint32) error {
    68  	// Note that during race, the limits could be exceeded. This is allowed:
    69  	// "Since the implementation is eventually consistent, races between threads
    70  	// may allow limits to be potentially exceeded."
    71  	// https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/circuit_breaking#arch-overview-circuit-break.
    72  	if atomic.LoadUint32(&c.numRequests) >= max {
    73  		return fmt.Errorf("max requests %v exceeded on service %v", max, c.ClusterName)
    74  	}
    75  	atomic.AddUint32(&c.numRequests, 1)
    76  	return nil
    77  }
    78  
    79  // EndRequest ends a request for a service, decrementing its number of requests
    80  // by 1.
    81  func (c *ClusterRequestsCounter) EndRequest() {
    82  	atomic.AddUint32(&c.numRequests, ^uint32(0))
    83  }
    84  
    85  // ClearCounterForTesting clears the counter for the service. Should be only
    86  // used in tests.
    87  func ClearCounterForTesting(clusterName, edsServiceName string) {
    88  	src.mu.Lock()
    89  	defer src.mu.Unlock()
    90  	k := clusterNameAndServiceName{
    91  		clusterName:    clusterName,
    92  		edsServiceName: edsServiceName,
    93  	}
    94  	c, ok := src.clusters[k]
    95  	if !ok {
    96  		return
    97  	}
    98  	c.numRequests = 0
    99  }
   100  
   101  // ClearAllCountersForTesting clears all the counters. Should be only used in
   102  // tests.
   103  func ClearAllCountersForTesting() {
   104  	src.mu.Lock()
   105  	defer src.mu.Unlock()
   106  	src.clusters = make(map[clusterNameAndServiceName]*ClusterRequestsCounter)
   107  }
   108  

View as plain text