1 package entrypoint
2
3 import (
4 "context"
5 "fmt"
6
7 "github.com/datawire/dlib/dlog"
8 "github.com/emissary-ingress/emissary/v3/pkg/api/getambassador.io/v3alpha1"
9 "github.com/emissary-ingress/emissary/v3/pkg/kates"
10 )
11
12 func iterateOverRateLimitServices(sh *SnapshotHolder, cb func(
13 rateLimitService *v3alpha1.RateLimitService,
14 name string,
15 parentName string,
16 idx int,
17 )) {
18 envAmbID := GetAmbassadorID()
19
20 for i, rateLimitService := range sh.k8sSnapshot.RateLimitServices {
21 if rateLimitService.Spec.AmbassadorID.Matches(envAmbID) {
22 name := rateLimitService.TypeMeta.Kind + "/" + rateLimitService.ObjectMeta.Name + "." + rateLimitService.ObjectMeta.Namespace
23 cb(rateLimitService, name, "", i)
24 }
25 }
26
27 for parentName, list := range sh.k8sSnapshot.Annotations {
28 for i, obj := range list {
29 if rateLimitService, ok := obj.(*v3alpha1.RateLimitService); ok && rateLimitService.Spec.AmbassadorID.Matches(envAmbID) {
30 name := fmt.Sprintf("%s#%d", parentName, i)
31 cb(rateLimitService, name, parentName, i)
32 }
33 }
34 }
35 }
36
37
38
39
40 func ReconcileRateLimit(ctx context.Context, sh *SnapshotHolder, deltas *[]*kates.Delta) error {
41
42 if isEdgeStack, err := IsEdgeStack(); err != nil {
43 return fmt.Errorf("ReconcileRateLimitServices: %w", err)
44 } else if !isEdgeStack {
45 return nil
46 }
47
48
49
50 const syntheticRateLimitServiceName = "synthetic_edge_stack_rate_limit"
51
52 var (
53 numRateLimitServices uint64
54 syntheticRateLimit *v3alpha1.RateLimitService
55 syntheticRateLimitIdx int
56 )
57
58 iterateOverRateLimitServices(sh, func(rateLimitService *v3alpha1.RateLimitService, name, parentName string, i int) {
59 numRateLimitServices++
60 if IsLocalhost8500(rateLimitService.Spec.Service) {
61 if parentName == "" && rateLimitService.ObjectMeta.Name == syntheticRateLimitServiceName {
62 syntheticRateLimit = rateLimitService
63 syntheticRateLimitIdx = i
64 }
65 if rateLimitService.Spec.ProtocolVersion != "v3" {
66
67
68
69
70 dlog.Debugf(ctx, "ReconcileRateLimitServices: Forcing protocol_version=v3 on %s", name)
71 rateLimitService.Spec.ProtocolVersion = "v3"
72 }
73 }
74 })
75
76 switch {
77 case numRateLimitServices == 0:
78 dlog.Debug(ctx, "ReconcileRateLimitServices: No user-provided RateLimitServices detected; injecting synthetic RateLimitService")
79 syntheticRateLimit = &v3alpha1.RateLimitService{
80 TypeMeta: kates.TypeMeta{
81 Kind: "RateLimitService",
82 APIVersion: "getambassador.io/v3alpha1",
83 },
84 ObjectMeta: kates.ObjectMeta{
85 Name: syntheticRateLimitServiceName,
86 Namespace: GetAmbassadorNamespace(),
87 },
88 Spec: v3alpha1.RateLimitServiceSpec{
89 AmbassadorID: []string{GetAmbassadorID()},
90 Service: "127.0.0.1:8500",
91 ProtocolVersion: "v3",
92 },
93 }
94 sh.k8sSnapshot.RateLimitServices = append(sh.k8sSnapshot.RateLimitServices, syntheticRateLimit)
95 *deltas = append(*deltas, &kates.Delta{
96 TypeMeta: syntheticRateLimit.TypeMeta,
97 ObjectMeta: syntheticRateLimit.ObjectMeta,
98 DeltaType: kates.ObjectAdd,
99 })
100 case numRateLimitServices > 1 && syntheticRateLimit != nil:
101 dlog.Debugf(ctx, "ReconcileRateLimitServices: %d user-provided RateLimitServices detected; removing synthetic RateLimitServices", numRateLimitServices-1)
102 sh.k8sSnapshot.RateLimitServices = append(
103 sh.k8sSnapshot.RateLimitServices[:syntheticRateLimitIdx],
104 sh.k8sSnapshot.RateLimitServices[syntheticRateLimitIdx+1:]...)
105 *deltas = append(*deltas, &kates.Delta{
106 TypeMeta: syntheticRateLimit.TypeMeta,
107 ObjectMeta: syntheticRateLimit.ObjectMeta,
108 DeltaType: kates.ObjectDelete,
109 })
110 }
111
112 return nil
113 }
114
View as plain text