...

Source file src/github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3/cache.go

Documentation: github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3

     1  // Code generated by create_version. DO NOT EDIT.
     2  // Copyright 2018 Envoyproxy Authors
     3  //
     4  //   Licensed under the Apache License, Version 2.0 (the "License");
     5  //   you may not use this file except in compliance with the License.
     6  //   You may obtain a copy of the License at
     7  //
     8  //       http://www.apache.org/licenses/LICENSE-2.0
     9  //
    10  //   Unless required by applicable law or agreed to in writing, software
    11  //   distributed under the License is distributed on an "AS IS" BASIS,
    12  //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  //   See the License for the specific language governing permissions and
    14  //   limitations under the License.
    15  
    16  // Package cache defines a configuration cache for the server.
    17  package cache
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync/atomic"
    23  
    24  	discovery "github.com/datawire/ambassador/v2/pkg/api/envoy/service/discovery/v3"
    25  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
    26  	ttl "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/ttl/v3"
    27  	"github.com/golang/protobuf/ptypes/any"
    28  )
    29  
    30  // Request is an alias for the discovery request type.
    31  type Request = discovery.DiscoveryRequest
    32  
    33  // ConfigWatcher requests watches for configuration resources by a node, last
    34  // applied version identifier, and resource names hint. The watch should send
    35  // the responses when they are ready. The watch can be cancelled by the
    36  // consumer, in effect terminating the watch for the request.
    37  // ConfigWatcher implementation must be thread-safe.
    38  type ConfigWatcher interface {
    39  	// CreateWatch returns a new open watch from a non-empty request.
    40  	// An individual consumer normally issues a single open watch by each type URL.
    41  	//
    42  	// Value channel produces requested resources, once they are available.  If
    43  	// the channel is closed prior to cancellation of the watch, an unrecoverable
    44  	// error has occurred in the producer, and the consumer should close the
    45  	// corresponding stream.
    46  	//
    47  	// Cancel is an optional function to release resources in the producer. If
    48  	// provided, the consumer may call this function multiple times.
    49  	CreateWatch(*Request) (value chan Response, cancel func())
    50  }
    51  
    52  // ConfigFetcher fetches configuration resources from cache
    53  type ConfigFetcher interface {
    54  	// Fetch implements the polling method of the config cache using a non-empty request.
    55  	Fetch(context.Context, *Request) (Response, error)
    56  }
    57  
    58  // Cache is a generic config cache with a watcher.
    59  type Cache interface {
    60  	ConfigWatcher
    61  	ConfigFetcher
    62  }
    63  
    64  // Response is a wrapper around Envoy's DiscoveryResponse.
    65  type Response interface {
    66  	// Get the Constructed DiscoveryResponse
    67  	GetDiscoveryResponse() (*discovery.DiscoveryResponse, error)
    68  
    69  	// Get the original Request for the Response.
    70  	GetRequest() *discovery.DiscoveryRequest
    71  
    72  	// Get the version in the Response.
    73  	GetVersion() (string, error)
    74  }
    75  
    76  // RawResponse is a pre-serialized xDS response containing the raw resources to
    77  // be included in the final Discovery Response.
    78  type RawResponse struct {
    79  	// Request is the original request.
    80  	Request *discovery.DiscoveryRequest
    81  
    82  	// Version of the resources as tracked by the cache for the given type.
    83  	// Proxy responds with this version as an acknowledgement.
    84  	Version string
    85  
    86  	// Resources to be included in the response.
    87  	Resources []types.ResourceWithTtl
    88  
    89  	// Whether this is a heartbeat response. For xDS versions that support TTL, this
    90  	// will be converted into a response that doesn't contain the actual resource protobuf.
    91  	// This allows for more lightweight updates that server only to update the TTL timer.
    92  	Heartbeat bool
    93  
    94  	// marshaledResponse holds an atomic reference to the serialized discovery response.
    95  	marshaledResponse atomic.Value
    96  }
    97  
    98  var _ Response = &RawResponse{}
    99  
   100  // PassthroughResponse is a pre constructed xDS response that need not go through marshalling transformations.
   101  type PassthroughResponse struct {
   102  	// Request is the original request.
   103  	Request *discovery.DiscoveryRequest
   104  
   105  	// The discovery response that needs to be sent as is, without any marshalling transformations.
   106  	DiscoveryResponse *discovery.DiscoveryResponse
   107  }
   108  
   109  var _ Response = &PassthroughResponse{}
   110  
   111  // GetDiscoveryResponse performs the marshalling the first time its called and uses the cached response subsequently.
   112  // This is necessary because the marshalled response does not change across the calls.
   113  // This caching behavior is important in high throughput scenarios because grpc marshalling has a cost and it drives the cpu utilization under load.
   114  func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
   115  
   116  	marshaledResponse := r.marshaledResponse.Load()
   117  
   118  	if marshaledResponse == nil {
   119  
   120  		marshaledResources := make([]*any.Any, len(r.Resources))
   121  
   122  		for i, resource := range r.Resources {
   123  			maybeTtldResource, resourceType, err := ttl.MaybeCreateTtlResourceIfSupported(resource, GetResourceName(resource.Resource), r.Request.TypeUrl, r.Heartbeat)
   124  			if err != nil {
   125  				return nil, err
   126  			}
   127  			marshaledResource, err := MarshalResource(maybeTtldResource)
   128  			if err != nil {
   129  				return nil, err
   130  			}
   131  			marshaledResources[i] = &any.Any{
   132  				TypeUrl: resourceType,
   133  				Value:   marshaledResource,
   134  			}
   135  		}
   136  
   137  		marshaledResponse = &discovery.DiscoveryResponse{
   138  			VersionInfo: r.Version,
   139  			Resources:   marshaledResources,
   140  			TypeUrl:     r.Request.TypeUrl,
   141  		}
   142  
   143  		r.marshaledResponse.Store(marshaledResponse)
   144  	}
   145  
   146  	return marshaledResponse.(*discovery.DiscoveryResponse), nil
   147  }
   148  
   149  // GetRequest returns the original Discovery Request.
   150  func (r *RawResponse) GetRequest() *discovery.DiscoveryRequest {
   151  	return r.Request
   152  }
   153  
   154  // GetVersion returns the response version.
   155  func (r *RawResponse) GetVersion() (string, error) {
   156  	return r.Version, nil
   157  }
   158  
   159  // GetDiscoveryResponse returns the final passthrough Discovery Response.
   160  func (r *PassthroughResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, error) {
   161  	return r.DiscoveryResponse, nil
   162  }
   163  
   164  // GetRequest returns the original Discovery Request
   165  func (r *PassthroughResponse) GetRequest() *discovery.DiscoveryRequest {
   166  	return r.Request
   167  }
   168  
   169  // GetVersion returns the response version.
   170  func (r *PassthroughResponse) GetVersion() (string, error) {
   171  	if r.DiscoveryResponse != nil {
   172  		return r.DiscoveryResponse.VersionInfo, nil
   173  	}
   174  	return "", fmt.Errorf("DiscoveryResponse is nil")
   175  }
   176  

View as plain text