...

Source file src/github.com/emissary-ingress/emissary/v3/cmd/entrypoint/syntheticratelimit.go

Documentation: github.com/emissary-ingress/emissary/v3/cmd/entrypoint

     1  package entrypoint
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	"github.com/datawire/dlib/dlog"
     8  	"github.com/emissary-ingress/emissary/v3/pkg/api/getambassador.io/v3alpha1"
     9  	"github.com/emissary-ingress/emissary/v3/pkg/kates"
    10  )
    11  
    12  func iterateOverRateLimitServices(sh *SnapshotHolder, cb func(
    13  	rateLimitService *v3alpha1.RateLimitService, // rateLimitService
    14  	name string, // name to unambiguously refer to the rateLimitServices by; might be more complex than "name.namespace" if it's an annotation
    15  	parentName string, // name of the thing that the annotation is on (or empty if not an annotation)
    16  	idx int, // index of the rateLimitService; either in sh.k8sSnapshot.RateLimitServices or in sh.k8sSnapshot.Annotations[parentName]
    17  )) {
    18  	envAmbID := GetAmbassadorID()
    19  
    20  	for i, rateLimitService := range sh.k8sSnapshot.RateLimitServices {
    21  		if rateLimitService.Spec.AmbassadorID.Matches(envAmbID) {
    22  			name := rateLimitService.TypeMeta.Kind + "/" + rateLimitService.ObjectMeta.Name + "." + rateLimitService.ObjectMeta.Namespace
    23  			cb(rateLimitService, name, "", i)
    24  		}
    25  	}
    26  
    27  	for parentName, list := range sh.k8sSnapshot.Annotations {
    28  		for i, obj := range list {
    29  			if rateLimitService, ok := obj.(*v3alpha1.RateLimitService); ok && rateLimitService.Spec.AmbassadorID.Matches(envAmbID) {
    30  				name := fmt.Sprintf("%s#%d", parentName, i)
    31  				cb(rateLimitService, name, parentName, i)
    32  			}
    33  		}
    34  	}
    35  }
    36  
    37  // ReconcileRateLimit is a hack to remove all RateLimitService using protocol_version: v2 only when running Edge-Stack and then inject an
    38  // RateLimitService with protocol_version: v3 if needed. The purpose of this hack is to prevent Edge-Stack 2.3 from
    39  // using any other RateLimitService than the default one running as part of amb-sidecar and force the protocol version to v3.
    40  func ReconcileRateLimit(ctx context.Context, sh *SnapshotHolder, deltas *[]*kates.Delta) error {
    41  	// We only want to remove RateLimitServices if this is an instance of Edge-Stack
    42  	if isEdgeStack, err := IsEdgeStack(); err != nil {
    43  		return fmt.Errorf("ReconcileRateLimitServices: %w", err)
    44  	} else if !isEdgeStack {
    45  		return nil
    46  	}
    47  
    48  	// using a name with underscores prevents it from colliding with anything real in the
    49  	// cluster--Kubernetes resources can't have underscores in their name.
    50  	const syntheticRateLimitServiceName = "synthetic_edge_stack_rate_limit"
    51  
    52  	var (
    53  		numRateLimitServices  uint64
    54  		syntheticRateLimit    *v3alpha1.RateLimitService
    55  		syntheticRateLimitIdx int
    56  	)
    57  
    58  	iterateOverRateLimitServices(sh, func(rateLimitService *v3alpha1.RateLimitService, name, parentName string, i int) {
    59  		numRateLimitServices++
    60  		if IsLocalhost8500(rateLimitService.Spec.Service) {
    61  			if parentName == "" && rateLimitService.ObjectMeta.Name == syntheticRateLimitServiceName {
    62  				syntheticRateLimit = rateLimitService
    63  				syntheticRateLimitIdx = i
    64  			}
    65  			if rateLimitService.Spec.ProtocolVersion != "v3" {
    66  				// Force the Edge Stack RateLimitService to be protocol_version=v3.  This
    67  				// is important so that <2.3 and >=2.3 installations can coexist.
    68  				// This is important, because for zero-downtime upgrades, they must
    69  				// coexist briefly while the new Deployment is getting rolled out.
    70  				dlog.Debugf(ctx, "ReconcileRateLimitServices: Forcing protocol_version=v3 on %s", name)
    71  				rateLimitService.Spec.ProtocolVersion = "v3"
    72  			}
    73  		}
    74  	})
    75  
    76  	switch {
    77  	case numRateLimitServices == 0: // add the synthetic rate limit service
    78  		dlog.Debug(ctx, "ReconcileRateLimitServices: No user-provided RateLimitServices detected; injecting synthetic RateLimitService")
    79  		syntheticRateLimit = &v3alpha1.RateLimitService{
    80  			TypeMeta: kates.TypeMeta{
    81  				Kind:       "RateLimitService",
    82  				APIVersion: "getambassador.io/v3alpha1",
    83  			},
    84  			ObjectMeta: kates.ObjectMeta{
    85  				Name:      syntheticRateLimitServiceName,
    86  				Namespace: GetAmbassadorNamespace(),
    87  			},
    88  			Spec: v3alpha1.RateLimitServiceSpec{
    89  				AmbassadorID:    []string{GetAmbassadorID()},
    90  				Service:         "127.0.0.1:8500",
    91  				ProtocolVersion: "v3",
    92  			},
    93  		}
    94  		sh.k8sSnapshot.RateLimitServices = append(sh.k8sSnapshot.RateLimitServices, syntheticRateLimit)
    95  		*deltas = append(*deltas, &kates.Delta{
    96  			TypeMeta:   syntheticRateLimit.TypeMeta,
    97  			ObjectMeta: syntheticRateLimit.ObjectMeta,
    98  			DeltaType:  kates.ObjectAdd,
    99  		})
   100  	case numRateLimitServices > 1 && syntheticRateLimit != nil: // remove the synthetic rate limit service
   101  		dlog.Debugf(ctx, "ReconcileRateLimitServices: %d user-provided RateLimitServices detected; removing synthetic RateLimitServices", numRateLimitServices-1)
   102  		sh.k8sSnapshot.RateLimitServices = append(
   103  			sh.k8sSnapshot.RateLimitServices[:syntheticRateLimitIdx],
   104  			sh.k8sSnapshot.RateLimitServices[syntheticRateLimitIdx+1:]...)
   105  		*deltas = append(*deltas, &kates.Delta{
   106  			TypeMeta:   syntheticRateLimit.TypeMeta,
   107  			ObjectMeta: syntheticRateLimit.ObjectMeta,
   108  			DeltaType:  kates.ObjectDelete,
   109  		})
   110  	}
   111  
   112  	return nil
   113  }
   114  

View as plain text