...

Source file src/github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3/linear.go

Documentation: github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3

     1  // Code generated by create_version. DO NOT EDIT.
     2  // Copyright 2020 Envoyproxy Authors
     3  //
     4  //   Licensed under the Apache License, Version 2.0 (the "License");
     5  //   you may not use this file except in compliance with the License.
     6  //   You may obtain a copy of the License at
     7  //
     8  //       http://www.apache.org/licenses/LICENSE-2.0
     9  //
    10  //   Unless required by applicable law or agreed to in writing, software
    11  //   distributed under the License is distributed on an "AS IS" BASIS,
    12  //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  //   See the License for the specific language governing permissions and
    14  //   limitations under the License.
    15  
    16  package cache
    17  
    18  import (
    19  	"context"
    20  	"errors"
    21  	"strconv"
    22  	"strings"
    23  	"sync"
    24  
    25  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
    26  )
    27  
    28  type watches = map[chan Response]struct{}
    29  
    30  // LinearCache supports collectons of opaque resources. This cache has a
    31  // single collection indexed by resource names and manages resource versions
    32  // internally. It implements the cache interface for a single type URL and
    33  // should be combined with other caches via type URL muxing. It can be used to
    34  // supply EDS entries, for example, uniformly across a fleet of proxies.
    35  type LinearCache struct {
    36  	// Type URL specific to the cache.
    37  	typeURL string
    38  	// Collection of resources indexed by name.
    39  	resources map[string]types.Resource
    40  	// Watches open by clients, indexed by resource name. Whenever resources
    41  	// are changed, the watch is triggered.
    42  	watches map[string]watches
    43  	// Set of watches for all resources in the collection
    44  	watchAll watches
    45  	// Continously incremented version
    46  	version uint64
    47  	// Version prefix to be sent to the clients
    48  	versionPrefix string
    49  	// Versions for each resource by name.
    50  	versionVector map[string]uint64
    51  	mu            sync.Mutex
    52  }
    53  
    54  var _ Cache = &LinearCache{}
    55  
    56  // Options for modifying the behavior of the linear cache.
    57  type LinearCacheOption func(*LinearCache)
    58  
    59  // WithVersionPrefix sets a version prefix of the form "prefixN" in the version info.
    60  // Version prefix can be used to distinguish replicated instances of the cache, in case
    61  // a client re-connects to another instance.
    62  func WithVersionPrefix(prefix string) LinearCacheOption {
    63  	return func(cache *LinearCache) {
    64  		cache.versionPrefix = prefix
    65  	}
    66  }
    67  
    68  // WithInitialResources initializes the initial set of resources.
    69  func WithInitialResources(resources map[string]types.Resource) LinearCacheOption {
    70  	return func(cache *LinearCache) {
    71  		cache.resources = resources
    72  		for name := range resources {
    73  			cache.versionVector[name] = 0
    74  		}
    75  	}
    76  }
    77  
    78  // NewLinearCache creates a new cache. See the comments on the struct definition.
    79  func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
    80  	out := &LinearCache{
    81  		typeURL:       typeURL,
    82  		resources:     make(map[string]types.Resource),
    83  		watches:       make(map[string]watches),
    84  		watchAll:      make(watches),
    85  		version:       0,
    86  		versionVector: make(map[string]uint64),
    87  	}
    88  	for _, opt := range opts {
    89  		opt(out)
    90  	}
    91  	return out
    92  }
    93  
    94  func (cache *LinearCache) respond(value chan Response, staleResources []string) {
    95  	var resources []types.ResourceWithTtl
    96  	// TODO: optimize the resources slice creations across different clients
    97  	if len(staleResources) == 0 {
    98  		resources = make([]types.ResourceWithTtl, 0, len(cache.resources))
    99  		for _, resource := range cache.resources {
   100  			resources = append(resources, types.ResourceWithTtl{Resource: resource})
   101  		}
   102  	} else {
   103  		resources = make([]types.ResourceWithTtl, 0, len(staleResources))
   104  		for _, name := range staleResources {
   105  			resource := cache.resources[name]
   106  			if resource != nil {
   107  				resources = append(resources, types.ResourceWithTtl{Resource: resource})
   108  			}
   109  		}
   110  	}
   111  	value <- &RawResponse{
   112  		Request:   &Request{TypeUrl: cache.typeURL},
   113  		Resources: resources,
   114  		Version:   cache.versionPrefix + strconv.FormatUint(cache.version, 10),
   115  	}
   116  }
   117  
   118  func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
   119  	// de-duplicate watches that need to be responded
   120  	notifyList := make(map[chan Response][]string)
   121  	for name := range modified {
   122  		for watch := range cache.watches[name] {
   123  			notifyList[watch] = append(notifyList[watch], name)
   124  		}
   125  		delete(cache.watches, name)
   126  	}
   127  	for value, stale := range notifyList {
   128  		cache.respond(value, stale)
   129  	}
   130  	for value := range cache.watchAll {
   131  		cache.respond(value, nil)
   132  	}
   133  	cache.watchAll = make(watches)
   134  }
   135  
   136  // UpdateResource updates a resource in the collection.
   137  func (cache *LinearCache) UpdateResource(name string, res types.Resource) error {
   138  	if res == nil {
   139  		return errors.New("nil resource")
   140  	}
   141  	cache.mu.Lock()
   142  	defer cache.mu.Unlock()
   143  
   144  	cache.version += 1
   145  	cache.versionVector[name] = cache.version
   146  	cache.resources[name] = res
   147  
   148  	// TODO: batch watch closures to prevent rapid updates
   149  	cache.notifyAll(map[string]struct{}{name: {}})
   150  
   151  	return nil
   152  }
   153  
   154  // DeleteResource removes a resource in the collection.
   155  func (cache *LinearCache) DeleteResource(name string) error {
   156  	cache.mu.Lock()
   157  	defer cache.mu.Unlock()
   158  
   159  	cache.version += 1
   160  	delete(cache.versionVector, name)
   161  	delete(cache.resources, name)
   162  
   163  	// TODO: batch watch closures to prevent rapid updates
   164  	cache.notifyAll(map[string]struct{}{name: {}})
   165  	return nil
   166  }
   167  
   168  func (cache *LinearCache) CreateWatch(request *Request) (chan Response, func()) {
   169  	value := make(chan Response, 1)
   170  	if request.TypeUrl != cache.typeURL {
   171  		close(value)
   172  		return value, nil
   173  	}
   174  	// If the version is not up to date, check whether any requested resource has
   175  	// been updated between the last version and the current version. This avoids the problem
   176  	// of sending empty updates whenever an irrelevant resource changes.
   177  	stale := false
   178  	staleResources := []string{} // empty means all
   179  
   180  	// strip version prefix if it is present
   181  	var lastVersion uint64
   182  	var err error
   183  	if strings.HasPrefix(request.VersionInfo, cache.versionPrefix) {
   184  		lastVersion, err = strconv.ParseUint(request.VersionInfo[len(cache.versionPrefix):], 0, 64)
   185  	} else {
   186  		err = errors.New("mis-matched version prefix")
   187  	}
   188  
   189  	cache.mu.Lock()
   190  	defer cache.mu.Unlock()
   191  
   192  	if err != nil {
   193  		stale = true
   194  		staleResources = request.ResourceNames
   195  	} else if len(request.ResourceNames) == 0 {
   196  		stale = lastVersion != cache.version
   197  	} else {
   198  		for _, name := range request.ResourceNames {
   199  			// When a resource is removed, its version defaults 0 and it is not considered stale.
   200  			if lastVersion < cache.versionVector[name] {
   201  				stale = true
   202  				staleResources = append(staleResources, name)
   203  			}
   204  		}
   205  	}
   206  	if stale {
   207  		cache.respond(value, staleResources)
   208  		return value, nil
   209  	}
   210  	// Create open watches since versions are up to date.
   211  	if len(request.ResourceNames) == 0 {
   212  		cache.watchAll[value] = struct{}{}
   213  		return value, func() {
   214  			cache.mu.Lock()
   215  			defer cache.mu.Unlock()
   216  			delete(cache.watchAll, value)
   217  		}
   218  	}
   219  	for _, name := range request.ResourceNames {
   220  		set, exists := cache.watches[name]
   221  		if !exists {
   222  			set = make(watches)
   223  			cache.watches[name] = set
   224  		}
   225  		set[value] = struct{}{}
   226  	}
   227  	return value, func() {
   228  		cache.mu.Lock()
   229  		defer cache.mu.Unlock()
   230  		for _, name := range request.ResourceNames {
   231  			set, exists := cache.watches[name]
   232  			if exists {
   233  				delete(set, value)
   234  			}
   235  			if len(set) == 0 {
   236  				delete(cache.watches, name)
   237  			}
   238  		}
   239  	}
   240  }
   241  
   242  func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response, error) {
   243  	return nil, errors.New("not implemented")
   244  }
   245  
   246  // Number of active watches for a resource name.
   247  func (cache *LinearCache) NumWatches(name string) int {
   248  	cache.mu.Lock()
   249  	defer cache.mu.Unlock()
   250  	return len(cache.watches[name]) + len(cache.watchAll)
   251  }
   252  

View as plain text