...

Source file src/github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3/simple.go

Documentation: github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3

     1  // Code generated by create_version. DO NOT EDIT.
     2  // Copyright 2018 Envoyproxy Authors
     3  //
     4  //   Licensed under the Apache License, Version 2.0 (the "License");
     5  //   you may not use this file except in compliance with the License.
     6  //   You may obtain a copy of the License at
     7  //
     8  //       http://www.apache.org/licenses/LICENSE-2.0
     9  //
    10  //   Unless required by applicable law or agreed to in writing, software
    11  //   distributed under the License is distributed on an "AS IS" BASIS,
    12  //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  //   See the License for the specific language governing permissions and
    14  //   limitations under the License.
    15  
    16  package cache
    17  
    18  import (
    19  	"context"
    20  	"fmt"
    21  	"sync"
    22  	"sync/atomic"
    23  	"time"
    24  
    25  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
    26  	"github.com/datawire/ambassador/v2/pkg/envoy-control-plane/log"
    27  )
    28  
    29  // SnapshotCache is a snapshot-based cache that maintains a single versioned
    30  // snapshot of responses per node. SnapshotCache consistently replies with the
    31  // latest snapshot. For the protocol to work correctly in ADS mode, EDS/RDS
    32  // requests are responded only when all resources in the snapshot xDS response
    33  // are named as part of the request. It is expected that the CDS response names
    34  // all EDS clusters, and the LDS response names all RDS routes in a snapshot,
    35  // to ensure that Envoy makes the request for all EDS clusters or RDS routes
    36  // eventually.
    37  //
    38  // SnapshotCache can operate as a REST or regular xDS backend. The snapshot
    39  // can be partial, e.g. only include RDS or EDS resources.
    40  type SnapshotCache interface {
    41  	Cache
    42  
    43  	// SetSnapshot sets a response snapshot for a node. For ADS, the snapshots
    44  	// should have distinct versions and be internally consistent (e.g. all
    45  	// referenced resources must be included in the snapshot).
    46  	//
    47  	// This method will cause the server to respond to all open watches, for which
    48  	// the version differs from the snapshot version.
    49  	SetSnapshot(node string, snapshot Snapshot) error
    50  
    51  	// GetSnapshots gets the snapshot for a node.
    52  	GetSnapshot(node string) (Snapshot, error)
    53  
    54  	// ClearSnapshot removes all status and snapshot information associated with a node.
    55  	ClearSnapshot(node string)
    56  
    57  	// GetStatusInfo retrieves status information for a node ID.
    58  	GetStatusInfo(string) StatusInfo
    59  
    60  	// GetStatusKeys retrieves node IDs for all statuses.
    61  	GetStatusKeys() []string
    62  }
    63  
    64  type heartbeatHandle struct {
    65  	cancel func()
    66  }
    67  
    68  type snapshotCache struct {
    69  	// watchCount is an atomic counter incremented for each watch. This needs to
    70  	// be the first field in the struct to guarantee that it is 64-bit aligned,
    71  	// which is a requirement for atomic operations on 64-bit operands to work on
    72  	// 32-bit machines.
    73  	watchCount int64
    74  
    75  	log log.Logger
    76  
    77  	// ads flag to hold responses until all resources are named
    78  	ads bool
    79  
    80  	// snapshots are cached resources indexed by node IDs
    81  	snapshots map[string]Snapshot
    82  
    83  	// status information for all nodes indexed by node IDs
    84  	status map[string]*statusInfo
    85  
    86  	// hash is the hashing function for Envoy nodes
    87  	hash NodeHash
    88  
    89  	mu sync.RWMutex
    90  }
    91  
    92  // NewSnapshotCache initializes a simple cache.
    93  //
    94  // ADS flag forces a delay in responding to streaming requests until all
    95  // resources are explicitly named in the request. This avoids the problem of a
    96  // partial request over a single stream for a subset of resources which would
    97  // require generating a fresh version for acknowledgement. ADS flag requires
    98  // snapshot consistency. For non-ADS case (and fetch), multiple partial
    99  // requests are sent across multiple streams and re-using the snapshot version
   100  // is OK.
   101  //
   102  // Logger is optional.
   103  func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache {
   104  	return newSnapshotCache(ads, hash, logger)
   105  }
   106  
   107  func newSnapshotCache(ads bool, hash NodeHash, logger log.Logger) *snapshotCache {
   108  	cache := &snapshotCache{
   109  		log:       logger,
   110  		ads:       ads,
   111  		snapshots: make(map[string]Snapshot),
   112  		status:    make(map[string]*statusInfo),
   113  		hash:      hash,
   114  	}
   115  
   116  	return cache
   117  }
   118  
   119  // NewSnapshotCacheWithHeartbeating initializes a simple cache that sends periodic heartbeat
   120  // responses for resources with a TTL.
   121  //
   122  // ADS flag forces a delay in responding to streaming requests until all
   123  // resources are explicitly named in the request. This avoids the problem of a
   124  // partial request over a single stream for a subset of resources which would
   125  // require generating a fresh version for acknowledgement. ADS flag requires
   126  // snapshot consistency. For non-ADS case (and fetch), multiple partial
   127  // requests are sent across multiple streams and re-using the snapshot version
   128  // is OK.
   129  //
   130  // Logger is optional.
   131  //
   132  // The context provides a way to cancel the heartbeating routine, while the heartbeatInterval
   133  // parameter controls how often heartbeating occurs.
   134  func NewSnapshotCacheWithHeartbeating(ctx context.Context, ads bool, hash NodeHash, logger log.Logger, heartbeatInterval time.Duration) SnapshotCache {
   135  	cache := newSnapshotCache(ads, hash, logger)
   136  	go func() {
   137  		t := time.NewTicker(heartbeatInterval)
   138  
   139  		for {
   140  			select {
   141  			case <-t.C:
   142  				cache.mu.Lock()
   143  				for node := range cache.status {
   144  					// TODO(snowp): Omit heartbeats if a real response has been sent recently.
   145  					cache.sendHeartbeats(ctx, node)
   146  				}
   147  				cache.mu.Unlock()
   148  			case <-ctx.Done():
   149  				return
   150  			}
   151  		}
   152  	}()
   153  	return cache
   154  }
   155  
   156  func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
   157  	snapshot := cache.snapshots[node]
   158  	if info, ok := cache.status[node]; ok {
   159  		info.mu.Lock()
   160  		for id, watch := range info.watches {
   161  			// Respond with the current version regardless of whether the version has changed.
   162  			version := snapshot.GetVersion(watch.Request.TypeUrl)
   163  			resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl)
   164  
   165  			// TODO(snowp): Construct this once per type instead of once per watch.
   166  			resourcesWithTtl := map[string]types.ResourceWithTtl{}
   167  			for k, v := range resources {
   168  				if v.Ttl != nil {
   169  					resourcesWithTtl[k] = v
   170  				}
   171  			}
   172  
   173  			if len(resourcesWithTtl) == 0 {
   174  				continue
   175  			}
   176  			if cache.log != nil {
   177  				cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version)
   178  			}
   179  
   180  			cache.respond(watch.Request, watch.Response, resourcesWithTtl, version, true)
   181  
   182  			// The watch must be deleted and we must rely on the client to ack this response to create a new watch.
   183  			delete(info.watches, id)
   184  		}
   185  		info.mu.Unlock()
   186  	}
   187  }
   188  
   189  // SetSnapshotCache updates a snapshot for a node.
   190  func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) error {
   191  	cache.mu.Lock()
   192  	defer cache.mu.Unlock()
   193  
   194  	// update the existing entry
   195  	cache.snapshots[node] = snapshot
   196  
   197  	// trigger existing watches for which version changed
   198  	if info, ok := cache.status[node]; ok {
   199  		info.mu.Lock()
   200  		for id, watch := range info.watches {
   201  			version := snapshot.GetVersion(watch.Request.TypeUrl)
   202  			if version != watch.Request.VersionInfo {
   203  				if cache.log != nil {
   204  					cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version)
   205  				}
   206  				resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl)
   207  				cache.respond(watch.Request, watch.Response, resources, version, false)
   208  
   209  				// discard the watch
   210  				delete(info.watches, id)
   211  			}
   212  		}
   213  		info.mu.Unlock()
   214  	}
   215  
   216  	return nil
   217  }
   218  
   219  // GetSnapshots gets the snapshot for a node, and returns an error if not found.
   220  func (cache *snapshotCache) GetSnapshot(node string) (Snapshot, error) {
   221  	cache.mu.RLock()
   222  	defer cache.mu.RUnlock()
   223  
   224  	snap, ok := cache.snapshots[node]
   225  	if !ok {
   226  		return Snapshot{}, fmt.Errorf("no snapshot found for node %s", node)
   227  	}
   228  	return snap, nil
   229  }
   230  
   231  // ClearSnapshot clears snapshot and info for a node.
   232  func (cache *snapshotCache) ClearSnapshot(node string) {
   233  	cache.mu.Lock()
   234  	defer cache.mu.Unlock()
   235  
   236  	delete(cache.snapshots, node)
   237  	delete(cache.status, node)
   238  }
   239  
   240  // nameSet creates a map from a string slice to value true.
   241  func nameSet(names []string) map[string]bool {
   242  	set := make(map[string]bool)
   243  	for _, name := range names {
   244  		set[name] = true
   245  	}
   246  	return set
   247  }
   248  
   249  // superset checks that all resources are listed in the names set.
   250  func superset(names map[string]bool, resources map[string]types.ResourceWithTtl) error {
   251  	for resourceName := range resources {
   252  		if _, exists := names[resourceName]; !exists {
   253  			return fmt.Errorf("%q not listed", resourceName)
   254  		}
   255  	}
   256  	return nil
   257  }
   258  
   259  // CreateWatch returns a watch for an xDS request.
   260  func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func()) {
   261  	nodeID := cache.hash.ID(request.Node)
   262  
   263  	cache.mu.Lock()
   264  	defer cache.mu.Unlock()
   265  
   266  	info, ok := cache.status[nodeID]
   267  	if !ok {
   268  		info = newStatusInfo(request.Node)
   269  		cache.status[nodeID] = info
   270  	}
   271  
   272  	// update last watch request time
   273  	info.mu.Lock()
   274  	info.lastWatchRequestTime = time.Now()
   275  	info.mu.Unlock()
   276  
   277  	// allocate capacity 1 to allow one-time non-blocking use
   278  	value := make(chan Response, 1)
   279  
   280  	snapshot, exists := cache.snapshots[nodeID]
   281  	version := snapshot.GetVersion(request.TypeUrl)
   282  
   283  	// if the requested version is up-to-date or missing a response, leave an open watch
   284  	if !exists || request.VersionInfo == version {
   285  		watchID := cache.nextWatchID()
   286  		if cache.log != nil {
   287  			cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID,
   288  				request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo)
   289  		}
   290  		info.mu.Lock()
   291  		info.watches[watchID] = ResponseWatch{Request: request, Response: value}
   292  		info.mu.Unlock()
   293  		return value, cache.cancelWatch(nodeID, watchID)
   294  	}
   295  
   296  	// otherwise, the watch may be responded immediately
   297  	resources := snapshot.GetResourcesAndTtl(request.TypeUrl)
   298  	cache.respond(request, value, resources, version, false)
   299  
   300  	return value, nil
   301  }
   302  
   303  func (cache *snapshotCache) nextWatchID() int64 {
   304  	return atomic.AddInt64(&cache.watchCount, 1)
   305  }
   306  
   307  // cancellation function for cleaning stale watches
   308  func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() {
   309  	return func() {
   310  		// uses the cache mutex
   311  		cache.mu.Lock()
   312  		defer cache.mu.Unlock()
   313  		if info, ok := cache.status[nodeID]; ok {
   314  			info.mu.Lock()
   315  			delete(info.watches, watchID)
   316  			info.mu.Unlock()
   317  		}
   318  	}
   319  }
   320  
   321  // Respond to a watch with the snapshot value. The value channel should have capacity not to block.
   322  // TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46
   323  func (cache *snapshotCache) respond(request *Request, value chan Response, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) {
   324  	// for ADS, the request names must match the snapshot names
   325  	// if they do not, then the watch is never responded, and it is expected that envoy makes another request
   326  	if len(request.ResourceNames) != 0 && cache.ads {
   327  		if err := superset(nameSet(request.ResourceNames), resources); err != nil {
   328  			if cache.log != nil {
   329  				cache.log.Debugf("ADS mode: not responding to request: %v", err)
   330  			}
   331  			return
   332  		}
   333  	}
   334  	if cache.log != nil {
   335  		cache.log.Debugf("respond %s%v version %q with version %q",
   336  			request.TypeUrl, request.ResourceNames, request.VersionInfo, version)
   337  	}
   338  
   339  	value <- createResponse(request, resources, version, heartbeat)
   340  }
   341  
   342  func createResponse(request *Request, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) Response {
   343  	filtered := make([]types.ResourceWithTtl, 0, len(resources))
   344  
   345  	// Reply only with the requested resources. Envoy may ask each resource
   346  	// individually in a separate stream. It is ok to reply with the same version
   347  	// on separate streams since requests do not share their response versions.
   348  	if len(request.ResourceNames) != 0 {
   349  		set := nameSet(request.ResourceNames)
   350  		for name, resource := range resources {
   351  			if set[name] {
   352  				filtered = append(filtered, resource)
   353  			}
   354  		}
   355  	} else {
   356  		for _, resource := range resources {
   357  			filtered = append(filtered, resource)
   358  		}
   359  	}
   360  
   361  	return &RawResponse{
   362  		Request:   request,
   363  		Version:   version,
   364  		Resources: filtered,
   365  		Heartbeat: heartbeat,
   366  	}
   367  }
   368  
   369  // Fetch implements the cache fetch function.
   370  // Fetch is called on multiple streams, so responding to individual names with the same version works.
   371  func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Response, error) {
   372  	nodeID := cache.hash.ID(request.Node)
   373  
   374  	cache.mu.RLock()
   375  	defer cache.mu.RUnlock()
   376  
   377  	if snapshot, exists := cache.snapshots[nodeID]; exists {
   378  		// Respond only if the request version is distinct from the current snapshot state.
   379  		// It might be beneficial to hold the request since Envoy will re-attempt the refresh.
   380  		version := snapshot.GetVersion(request.TypeUrl)
   381  		if request.VersionInfo == version {
   382  			if cache.log != nil {
   383  				cache.log.Warnf("skip fetch: version up to date")
   384  			}
   385  			return nil, &types.SkipFetchError{}
   386  		}
   387  
   388  		resources := snapshot.GetResourcesAndTtl(request.TypeUrl)
   389  		out := createResponse(request, resources, version, false)
   390  		return out, nil
   391  	}
   392  
   393  	return nil, fmt.Errorf("missing snapshot for %q", nodeID)
   394  }
   395  
   396  // GetStatusInfo retrieves the status info for the node.
   397  func (cache *snapshotCache) GetStatusInfo(node string) StatusInfo {
   398  	cache.mu.RLock()
   399  	defer cache.mu.RUnlock()
   400  
   401  	info, exists := cache.status[node]
   402  	if !exists {
   403  		if cache.log != nil {
   404  			cache.log.Warnf("node does not exist")
   405  		}
   406  		return nil
   407  	}
   408  
   409  	return info
   410  }
   411  
   412  // GetStatusKeys retrieves all node IDs in the status map.
   413  func (cache *snapshotCache) GetStatusKeys() []string {
   414  	cache.mu.RLock()
   415  	defer cache.mu.RUnlock()
   416  
   417  	out := make([]string, 0, len(cache.status))
   418  	for id := range cache.status {
   419  		out = append(out, id)
   420  	}
   421  
   422  	return out
   423  }
   424  

View as plain text