...

Source file src/github.com/emissary-ingress/emissary/v3/cmd/entrypoint/testutil_fake_k8s_store_test.go

Documentation: github.com/emissary-ingress/emissary/v3/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"encoding/json"
     5  	"fmt"
     6  	"io/ioutil"
     7  	"sort"
     8  	"strings"
     9  	"sync"
    10  
    11  	"github.com/emissary-ingress/emissary/v3/pkg/kates"
    12  )
    13  
    14  // A K8sStore is implement just enough data structures to mock the watch aspect of kubernetes for
    15  // testing purposes. It holds a map of kubernetes resources. Whenever any of these resources change
    16  // it computes a delta and adds it to the list of deltas. The store is also capable of creating
    17  // cursors that can be used to track multiple watches independently consuming the deltas at
    18  // different rates.
    19  type K8sStore struct {
    20  	// The mutex protects the entire struct, including any cursors that may have been created.
    21  	mutex     sync.Mutex
    22  	resources map[K8sKey]kates.Object
    23  	// This tracks every delta forever. That's ok because we only use this for tests, so we want to
    24  	// favor simplicity over efficiency. Also tests don't run that long, so it's not a big deal.
    25  	deltas []*kates.Delta
    26  }
    27  
    28  type K8sKey struct {
    29  	Kind      string
    30  	Namespace string
    31  	Name      string
    32  }
    33  
    34  func (k K8sKey) sortKey() string {
    35  	return fmt.Sprintf("%s:%s:%s", k.Kind, k.Namespace, k.Name)
    36  }
    37  
    38  // NewK8sStore creates a new and empty store.
    39  func NewK8sStore() *K8sStore {
    40  	return &K8sStore{resources: map[K8sKey]kates.Object{}}
    41  }
    42  
    43  // Upsert will either update or insert the given object depending on whether or not an object with
    44  // that key already exists. Note that this is currently done based solely on the key (namespace,
    45  // name) of the resource. Theoretically resources are assigned UUIDs and so in theory we could
    46  // detect changes to the name and namespace, however I'm not even sure how kubernetes handles this
    47  // or if it even permits that, so I am not going to attempt to consider those cases, and that may
    48  // well result in some very obscure edgecases around changing names/namespaces that behave
    49  // differently different from kubernetes.
    50  func (k *K8sStore) Upsert(resource kates.Object) error {
    51  	var un *kates.Unstructured
    52  	bytes, err := json.Marshal(resource)
    53  	if err != nil {
    54  		return err
    55  	}
    56  	err = json.Unmarshal(bytes, &un)
    57  	if err != nil {
    58  		return err
    59  	}
    60  
    61  	kind, apiVersion, err := canonGVK(un.GetKind())
    62  	if err != nil {
    63  		return err
    64  	}
    65  	un.SetKind(kind)
    66  	un.SetAPIVersion(apiVersion)
    67  	if un.GetNamespace() == "" {
    68  		un.SetNamespace("default")
    69  	}
    70  
    71  	k.mutex.Lock()
    72  	defer k.mutex.Unlock()
    73  
    74  	key := K8sKey{un.GetKind(), un.GetNamespace(), un.GetName()}
    75  	_, ok := k.resources[key]
    76  	if ok {
    77  		k.deltas = append(k.deltas, kates.NewDelta(kates.ObjectUpdate, un))
    78  	} else {
    79  		k.deltas = append(k.deltas, kates.NewDelta(kates.ObjectAdd, un))
    80  	}
    81  	k.resources[key] = un
    82  	return nil
    83  }
    84  
    85  // Delete will remove the identified resource from the store.
    86  func (k *K8sStore) Delete(kind, namespace, name string) error {
    87  	k.mutex.Lock()
    88  	defer k.mutex.Unlock()
    89  
    90  	canonKind, err := canon(kind)
    91  	if err != nil {
    92  		return err
    93  	}
    94  	key := K8sKey{canonKind, namespace, name}
    95  	old, ok := k.resources[key]
    96  	if ok {
    97  		delta, err := kates.NewDeltaFromObject(kates.ObjectDelete, old)
    98  		if err != nil {
    99  			return err
   100  		}
   101  		k.deltas = append(k.deltas, delta)
   102  	}
   103  	delete(k.resources, key)
   104  	return nil
   105  }
   106  
   107  // UpsertFile will parse the yaml manifests in the referenced file and Upsert each resource from the
   108  // file.
   109  func (k *K8sStore) UpsertFile(filename string) error {
   110  	content, err := ioutil.ReadFile(filename)
   111  	if err != nil {
   112  		return err
   113  	}
   114  
   115  	return k.UpsertYAML(string(content))
   116  }
   117  
   118  // UpsertYAML will parse the provided YAML and feed the resources in it into the control plane,
   119  // creating or updating any overlapping resources that exist.
   120  func (k *K8sStore) UpsertYAML(yaml string) error {
   121  	objs, err := kates.ParseManifests(yaml)
   122  	if err != nil {
   123  		return err
   124  	}
   125  
   126  	for _, obj := range objs {
   127  		if err := k.Upsert(obj); err != nil {
   128  			return err
   129  		}
   130  	}
   131  	return nil
   132  }
   133  
   134  // A Cursor allows multiple views of the same stream of deltas. The cursors implement a bootstrap
   135  // semantic where they will generate synthetic Add deltas for every resource that currently exists,
   136  // and from that point on report the real deltas that actually occur on the store.
   137  func (k *K8sStore) Cursor() *K8sStoreCursor {
   138  	k.mutex.Lock()
   139  	defer k.mutex.Unlock()
   140  	return &K8sStoreCursor{store: k, offset: -1}
   141  }
   142  
   143  type K8sStoreCursor struct {
   144  	store *K8sStore
   145  	// Offset into the deltas slice, or negative one if the cursor is brand new.
   146  	offset int
   147  }
   148  
   149  // Get returns a map of resources plus all the deltas that lead to the map being in its current
   150  // state.
   151  func (kc *K8sStoreCursor) Get() (map[K8sKey]kates.Object, []*kates.Delta, error) {
   152  	kc.store.mutex.Lock()
   153  	defer kc.store.mutex.Unlock()
   154  
   155  	var deltas []*kates.Delta
   156  
   157  	resources := map[K8sKey]kates.Object{}
   158  	for _, key := range sortedKeys(kc.store.resources) {
   159  		resource := kc.store.resources[key]
   160  		resources[key] = resource
   161  		// This is the first time Get() has been called, so we shall create a synthetic ADD delta
   162  		// for every resource that currently exists.
   163  		if kc.offset < 0 {
   164  			delta, err := kates.NewDeltaFromObject(kates.ObjectAdd, resource)
   165  			if err != nil {
   166  				return nil, nil, err
   167  			}
   168  			deltas = append(deltas, delta)
   169  		}
   170  	}
   171  
   172  	if kc.offset >= 0 {
   173  		deltas = append(deltas, kc.store.deltas[kc.offset:len(kc.store.deltas)]...)
   174  	}
   175  	kc.offset = len(kc.store.deltas)
   176  
   177  	return resources, deltas, nil
   178  }
   179  
   180  func sortedKeys(resources map[K8sKey]kates.Object) []K8sKey {
   181  	var keys []K8sKey
   182  	for k := range resources {
   183  		keys = append(keys, k)
   184  	}
   185  
   186  	sort.Slice(keys, func(i, j int) bool {
   187  		return keys[i].sortKey() < keys[j].sortKey()
   188  	})
   189  
   190  	return keys
   191  }
   192  
   193  func canonGVK(rawString string) (canonKind string, canonGroupVersion string, err error) {
   194  	// XXX: there is probably a better way to do this, but this is good enough for now, we just need
   195  	// this to work well for ambassador and core types.
   196  
   197  	rawParts := strings.SplitN(rawString, ".", 2)
   198  	var rawKind, rawVG string
   199  	switch len(rawParts) {
   200  	case 1:
   201  		rawKind = rawParts[0]
   202  	case 2:
   203  		rawKind = rawParts[0]
   204  		rawVG = rawParts[1]
   205  	}
   206  
   207  	// Each case should be `case "singular", "plural":`
   208  	switch strings.ToLower(rawKind) {
   209  	// Native Kubernetes types
   210  	case "service", "services":
   211  		return "Service", "v1", nil
   212  	case "endpoints":
   213  		return "Endpoints", "v1", nil
   214  	case "secret", "secrets":
   215  		return "Secret", "v1", nil
   216  	case "configmap", "configmaps":
   217  		return "ConfigMap", "v1", nil
   218  	case "ingress", "ingresses":
   219  		if strings.HasSuffix(rawVG, ".knative.dev") {
   220  			return "Ingress", "networking.internal.knative.dev/v1alpha1", nil
   221  		}
   222  		return "Ingress", "networking.k8s.io/v1", nil
   223  	case "ingressclass", "ingressclasses":
   224  		return "IngressClass", "networking.k8s.io/v1", nil
   225  	// Gateway API
   226  	case "gatewayclass", "gatewayclasses":
   227  		return "GatewayClass", "networking.x-k8s.io/v1alpha1", nil
   228  	case "gateway", "gateways":
   229  		return "Gateway", "networking.x-k8s.io/v1alpha1", nil
   230  	case "httproute", "httproutes":
   231  		return "HTTPRoute", "networking.x-k8s.io/v1alpha1", nil
   232  	// Knative types
   233  	case "clusteringress", "clusteringresses":
   234  		return "ClusterIngress", "networking.internal.knative.dev/v1alpha1", nil
   235  	// Native Emissary types
   236  	case "authservice", "authservices":
   237  		return "AuthService", "getambassador.io/v3alpha1", nil
   238  	case "consulresolver", "consulresolvers":
   239  		return "ConsulResolver", "getambassador.io/v3alpha1", nil
   240  	case "devportal", "devportals":
   241  		return "DevPortal", "getambassador.io/v3alpha1", nil
   242  	case "host", "hosts":
   243  		return "Host", "getambassador.io/v3alpha1", nil
   244  	case "kubernetesendpointresolver", "kubernetesendpointresolvers":
   245  		return "KubernetesEndpointResolver", "getambassador.io/v3alpha1", nil
   246  	case "kubernetesserviceresolver", "kubernetesserviceresolvers":
   247  		return "KubernetesServiceResolver", "getambassador.io/v3alpha1", nil
   248  	case "listener", "listeners":
   249  		return "Listener", "getambassador.io/v3alpha1", nil
   250  	case "logservice", "logservices":
   251  		return "LogService", "getambassador.io/v3alpha1", nil
   252  	case "mapping", "mappings":
   253  		return "Mapping", "getambassador.io/v3alpha1", nil
   254  	case "module", "modules":
   255  		return "Module", "getambassador.io/v3alpha1", nil
   256  	case "ratelimitservice", "ratelimitservices":
   257  		return "RateLimitService", "getambassador.io/v3alpha1", nil
   258  	case "tcpmapping", "tcpmappings":
   259  		return "TCPMapping", "getambassador.io/v3alpha1", nil
   260  	case "tlscontext", "tlscontexts":
   261  		return "TLSContext", "getambassador.io/v3alpha1", nil
   262  	case "tracingservice", "tracingservices":
   263  		return "TracingService", "getambassador.io/v3alpha1", nil
   264  	case "filter", "filters":
   265  		return "Filter", "getambassador.io/v3alpha1", nil
   266  	case "filterpolicy", "filterpolicies":
   267  		return "Filterpolicy", "getambassador.io/v3alpha1", nil
   268  	default:
   269  		return "", "", fmt.Errorf("I don't know how to canonicalize kind: %q", rawString)
   270  	}
   271  }
   272  
   273  func canon(kind string) (string, error) {
   274  	canonKind, _, err := canonGVK(kind)
   275  	if err != nil {
   276  		return "", err
   277  	}
   278  	return canonKind, nil
   279  }
   280  

View as plain text