...
1 package ambex
2
3 import (
4 "context"
5 "os"
6 "strconv"
7 "time"
8
9 "github.com/datawire/ambassador/v2/pkg/debug"
10 "github.com/datawire/dlib/dlog"
11 )
12
13
14
15 type Update struct {
16 Version string
17 Update func() error
18 }
19
20
21 type MemoryGetter func() int
22
23
24
25
26
27
28 func Updater(ctx context.Context, updates <-chan Update, getUsage MemoryGetter) error {
29 drainTime := GetAmbassadorDrainTime(ctx)
30 ticker := time.NewTicker(drainTime)
31 defer ticker.Stop()
32 return updaterWithTicker(ctx, updates, getUsage, drainTime, ticker, time.Now)
33 }
34
35 type debugInfo struct {
36 Times []time.Time `json:"times"`
37 StaleCount int `json:"staleCount"`
38 StaleMax int `json:"staleMax"`
39 Synced bool `json:"synced"`
40 DisableRatelimiter bool `json:"disableRatelimiter"`
41 }
42
43 func updaterWithTicker(ctx context.Context, updates <-chan Update, getUsage MemoryGetter,
44 drainTime time.Duration, ticker *time.Ticker, clock func() time.Time) error {
45
46 dbg := debug.FromContext(ctx)
47 info := dbg.Value("envoyReconfigs")
48
49
50 disableRatelimiter, err := strconv.ParseBool(os.Getenv("AMBASSADOR_AMBEX_NO_RATELIMIT"))
51
52 if err != nil {
53 disableRatelimiter = false
54 }
55
56 if disableRatelimiter {
57 dlog.Info(ctx, "snapshot ratelimiter DISABLED")
58 }
59
60
61
62
63
64 updateTimes := []time.Time{}
65
66
67 var latest Update
68 gotFirst := false
69 pushed := false
70 for {
71
72
73
74
75
76
77
78 var now time.Time
79 tick := false
80 select {
81 case up := <-updates:
82 latest = up
83 pushed = false
84 gotFirst = true
85 now = clock()
86 case now = <-ticker.C:
87 if pushed {
88 continue
89 }
90 tick = true
91 case <-ctx.Done():
92 return nil
93 }
94
95
96 updateTimes = gcUpdateTimes(updateTimes, now, drainTime)
97
98 usagePercent := getUsage()
99
100 if disableRatelimiter {
101 usagePercent = 0
102 }
103
104 var maxStaleReconfigs int
105 switch {
106 case usagePercent >= 90:
107
108
109
110 maxStaleReconfigs = 1
111 case usagePercent >= 80:
112
113
114 maxStaleReconfigs = 15
115 case usagePercent >= 70:
116
117
118 maxStaleReconfigs = 30
119 case usagePercent >= 60:
120
121
122 maxStaleReconfigs = 60
123 case usagePercent >= 50:
124
125
126 maxStaleReconfigs = 120
127 default:
128
129
130 maxStaleReconfigs = 0
131 }
132
133 staleReconfigs := len(updateTimes)
134
135 info.Store(debugInfo{updateTimes, staleReconfigs, maxStaleReconfigs, pushed, disableRatelimiter})
136
137
138 if maxStaleReconfigs > 0 && staleReconfigs >= maxStaleReconfigs {
139 if !tick {
140 dlog.Warnf(ctx, "Memory Usage: throttling reconfig %+v due to constrained memory with %d stale reconfigs (%d max)",
141 latest.Version, staleReconfigs, maxStaleReconfigs)
142 }
143 continue
144 }
145
146
147 if !gotFirst {
148 continue
149 }
150
151
152 err := latest.Update()
153 if err != nil {
154 return err
155 }
156
157
158 updateTimes = append(updateTimes, now)
159 dlog.Infof(ctx, "Pushing snapshot %+v", latest.Version)
160 pushed = true
161
162 info.Store(debugInfo{updateTimes, staleReconfigs, maxStaleReconfigs, pushed, disableRatelimiter})
163 }
164 }
165
166
167 func gcUpdateTimes(updateTimes []time.Time, now time.Time, drainTime time.Duration) []time.Time {
168 result := []time.Time{}
169 for _, ut := range updateTimes {
170 if ut.Add(drainTime).After(now) {
171 result = append(result, ut)
172 }
173 }
174 return result
175 }
176
177
178 func GetAmbassadorDrainTime(ctx context.Context) time.Duration {
179 s := os.Getenv("AMBASSADOR_DRAIN_TIME")
180 if s == "" {
181 s = "600"
182 }
183 i, err := strconv.Atoi(s)
184 if err != nil {
185 dlog.Printf(ctx, "Error parsing AMBASSADOR_DRAIN_TIME: %v", err)
186 i = 600
187 }
188
189 return time.Duration(i) * time.Second
190 }
191
View as plain text