1
16
17 package resconsumerctrl
18
19 import (
20 "fmt"
21 "log"
22 "net/http"
23 "net/url"
24 "regexp"
25 "strconv"
26 "sync"
27
28 "github.com/spf13/cobra"
29
30 "k8s.io/kubernetes/test/images/agnhost/dns"
31 "k8s.io/kubernetes/test/images/resource-consumer/common"
32 )
33
34
35 var CmdResourceConsumerController = &cobra.Command{
36 Use: "resource-consumer-controller",
37 Short: "Starts a HTTP server that spreads requests around resource consumers",
38 Long: "Starts a HTTP server that spreads requests around resource consumers. The HTTP server has the same endpoints and usage as the one spawned by the \"resource-consumer\" subcommand.",
39 Args: cobra.MaximumNArgs(0),
40 Run: main,
41 }
42
43 var (
44 port int
45 consumerPort int
46 consumerServiceName string
47 consumerServiceNamespace string
48 dnsDomain string
49 )
50
51
52
53
54 func getDNSDomain() string {
55 if dnsDomain != "" {
56 return dnsDomain
57 }
58 dnsSuffixList := dns.GetDNSSuffixList()
59 r, _ := regexp.Compile("^svc.")
60 for _, currentDNSSuffix := range dnsSuffixList {
61 if r.MatchString(currentDNSSuffix) {
62
63 dnsDomain = currentDNSSuffix[4:]
64 break
65 }
66 }
67 if dnsDomain == "" {
68 panic("Could not find DNS suffix starting with 'svc.' substring")
69 }
70 return dnsDomain
71 }
72
73 func init() {
74 CmdResourceConsumerController.Flags().IntVar(&port, "port", 8080, "Port number.")
75 CmdResourceConsumerController.Flags().IntVar(&consumerPort, "consumer-port", 8080, "Port number of consumers.")
76 CmdResourceConsumerController.Flags().StringVar(&consumerServiceName, "consumer-service-name", "resource-consumer", "Name of service containing resource consumers.")
77 CmdResourceConsumerController.Flags().StringVar(&consumerServiceNamespace, "consumer-service-namespace", "default", "Namespace of service containing resource consumers.")
78 }
79
80 func main(cmd *cobra.Command, args []string) {
81 mgr := newController()
82 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), mgr))
83 }
84
85 type controller struct {
86 responseWriterLock sync.Mutex
87 waitGroup sync.WaitGroup
88 }
89
90 func newController() *controller {
91 c := &controller{}
92 return c
93 }
94
95 func (c *controller) ServeHTTP(w http.ResponseWriter, req *http.Request) {
96 if req.Method != "POST" {
97 http.Error(w, common.BadRequest, http.StatusBadRequest)
98 return
99 }
100
101 if err := req.ParseForm(); err != nil {
102 http.Error(w, err.Error(), http.StatusBadRequest)
103 return
104 }
105
106 if req.URL.Path == common.ConsumeCPUAddress {
107 c.handleConsumeCPU(w, req.Form)
108 return
109 }
110
111 if req.URL.Path == common.ConsumeMemAddress {
112 c.handleConsumeMem(w, req.Form)
113 return
114 }
115
116 if req.URL.Path == common.BumpMetricAddress {
117 c.handleBumpMetric(w, req.Form)
118 return
119 }
120 http.Error(w, common.UnknownFunction, http.StatusNotFound)
121 }
122
123 func (c *controller) handleConsumeCPU(w http.ResponseWriter, query url.Values) {
124
125 durationSecString := query.Get(common.DurationSecQuery)
126 millicoresString := query.Get(common.MillicoresQuery)
127 requestSizeInMillicoresString := query.Get(common.RequestSizeInMillicoresQuery)
128 if durationSecString == "" || millicoresString == "" || requestSizeInMillicoresString == "" {
129 http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
130 return
131 }
132
133
134 durationSec, durationSecError := strconv.Atoi(durationSecString)
135 millicores, millicoresError := strconv.Atoi(millicoresString)
136 requestSizeInMillicores, requestSizeInMillicoresError := strconv.Atoi(requestSizeInMillicoresString)
137 if durationSecError != nil || millicoresError != nil || requestSizeInMillicoresError != nil || requestSizeInMillicores <= 0 {
138 http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
139 return
140 }
141
142 count := millicores / requestSizeInMillicores
143 rest := millicores - count*requestSizeInMillicores
144 fmt.Fprintf(w, "RC manager: sending %v requests to consume %v millicores each and 1 request to consume %v millicores\n", count, requestSizeInMillicores, rest)
145 if count > 0 {
146 c.waitGroup.Add(count)
147 c.sendConsumeCPURequests(w, count, requestSizeInMillicores, durationSec)
148 }
149 if rest > 0 {
150 c.waitGroup.Add(1)
151 go c.sendOneConsumeCPURequest(w, rest, durationSec)
152 }
153 c.waitGroup.Wait()
154 }
155
156 func (c *controller) handleConsumeMem(w http.ResponseWriter, query url.Values) {
157
158 durationSecString := query.Get(common.DurationSecQuery)
159 megabytesString := query.Get(common.MegabytesQuery)
160 requestSizeInMegabytesString := query.Get(common.RequestSizeInMegabytesQuery)
161 if durationSecString == "" || megabytesString == "" || requestSizeInMegabytesString == "" {
162 http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
163 return
164 }
165
166
167 durationSec, durationSecError := strconv.Atoi(durationSecString)
168 megabytes, megabytesError := strconv.Atoi(megabytesString)
169 requestSizeInMegabytes, requestSizeInMegabytesError := strconv.Atoi(requestSizeInMegabytesString)
170 if durationSecError != nil || megabytesError != nil || requestSizeInMegabytesError != nil || requestSizeInMegabytes <= 0 {
171 http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
172 return
173 }
174
175 count := megabytes / requestSizeInMegabytes
176 rest := megabytes - count*requestSizeInMegabytes
177 fmt.Fprintf(w, "RC manager: sending %v requests to consume %v MB each and 1 request to consume %v MB\n", count, requestSizeInMegabytes, rest)
178 if count > 0 {
179 c.waitGroup.Add(count)
180 c.sendConsumeMemRequests(w, count, requestSizeInMegabytes, durationSec)
181 }
182 if rest > 0 {
183 c.waitGroup.Add(1)
184 go c.sendOneConsumeMemRequest(w, rest, durationSec)
185 }
186 c.waitGroup.Wait()
187 }
188
189 func (c *controller) handleBumpMetric(w http.ResponseWriter, query url.Values) {
190
191 metric := query.Get(common.MetricNameQuery)
192 deltaString := query.Get(common.DeltaQuery)
193 durationSecString := query.Get(common.DurationSecQuery)
194 requestSizeCustomMetricString := query.Get(common.RequestSizeCustomMetricQuery)
195 if durationSecString == "" || metric == "" || deltaString == "" || requestSizeCustomMetricString == "" {
196 http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
197 return
198 }
199
200
201 durationSec, durationSecError := strconv.Atoi(durationSecString)
202 delta, deltaError := strconv.Atoi(deltaString)
203 requestSizeCustomMetric, requestSizeCustomMetricError := strconv.Atoi(requestSizeCustomMetricString)
204 if durationSecError != nil || deltaError != nil || requestSizeCustomMetricError != nil || requestSizeCustomMetric <= 0 {
205 http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
206 return
207 }
208
209 count := delta / requestSizeCustomMetric
210 rest := delta - count*requestSizeCustomMetric
211 fmt.Fprintf(w, "RC manager: sending %v requests to bump custom metric by %v each and 1 request to bump by %v\n", count, requestSizeCustomMetric, rest)
212 if count > 0 {
213 c.waitGroup.Add(count)
214 c.sendConsumeCustomMetric(w, metric, count, requestSizeCustomMetric, durationSec)
215 }
216 if rest > 0 {
217 c.waitGroup.Add(1)
218 go c.sendOneConsumeCustomMetric(w, metric, rest, durationSec)
219 }
220 c.waitGroup.Wait()
221 }
222
223 func (c *controller) sendConsumeCPURequests(w http.ResponseWriter, requests, millicores, durationSec int) {
224 for i := 0; i < requests; i++ {
225 go c.sendOneConsumeCPURequest(w, millicores, durationSec)
226 }
227 }
228
229 func (c *controller) sendConsumeMemRequests(w http.ResponseWriter, requests, megabytes, durationSec int) {
230 for i := 0; i < requests; i++ {
231 go c.sendOneConsumeMemRequest(w, megabytes, durationSec)
232 }
233 }
234
235 func (c *controller) sendConsumeCustomMetric(w http.ResponseWriter, metric string, requests, delta, durationSec int) {
236 for i := 0; i < requests; i++ {
237 go c.sendOneConsumeCustomMetric(w, metric, delta, durationSec)
238 }
239 }
240
241 func createConsumerURL(suffix string) string {
242
243 return fmt.Sprintf("http://%s.%s.svc.%s:%d%s", consumerServiceName, consumerServiceNamespace, getDNSDomain(), consumerPort, suffix)
244 }
245
246
247 func (c *controller) sendOneConsumeCPURequest(w http.ResponseWriter, millicores int, durationSec int) {
248 defer c.waitGroup.Done()
249 query := createConsumerURL(common.ConsumeCPUAddress)
250 _, err := http.PostForm(query, url.Values{common.MillicoresQuery: {strconv.Itoa(millicores)}, common.DurationSecQuery: {strconv.Itoa(durationSec)}})
251 c.responseWriterLock.Lock()
252 defer c.responseWriterLock.Unlock()
253 if err != nil {
254 fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err)
255 return
256 }
257 fmt.Fprintf(w, "Consumed %d millicores\n", millicores)
258 }
259
260
261 func (c *controller) sendOneConsumeMemRequest(w http.ResponseWriter, megabytes int, durationSec int) {
262 defer c.waitGroup.Done()
263 query := createConsumerURL(common.ConsumeMemAddress)
264 _, err := http.PostForm(query, url.Values{common.MegabytesQuery: {strconv.Itoa(megabytes)}, common.DurationSecQuery: {strconv.Itoa(durationSec)}})
265 c.responseWriterLock.Lock()
266 defer c.responseWriterLock.Unlock()
267 if err != nil {
268 fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err)
269 return
270 }
271 fmt.Fprintf(w, "Consumed %d megabytes\n", megabytes)
272 }
273
274
275 func (c *controller) sendOneConsumeCustomMetric(w http.ResponseWriter, customMetricName string, delta int, durationSec int) {
276 defer c.waitGroup.Done()
277 query := createConsumerURL(common.BumpMetricAddress)
278 _, err := http.PostForm(query,
279 url.Values{common.MetricNameQuery: {customMetricName}, common.DurationSecQuery: {strconv.Itoa(durationSec)}, common.DeltaQuery: {strconv.Itoa(delta)}})
280 c.responseWriterLock.Lock()
281 defer c.responseWriterLock.Unlock()
282 if err != nil {
283 fmt.Fprintf(w, "Failed to connect to consumer: %v\n", err)
284 return
285 }
286 fmt.Fprintf(w, "Bumped metric %s by %d\n", customMetricName, delta)
287 }
288
View as plain text