1
16
17 package main
18
19 import (
20 "fmt"
21 "net/http"
22 "net/url"
23 "strconv"
24 "sync"
25 "time"
26
27 "k8s.io/kubernetes/test/images/resource-consumer/common"
28 )
29
30
31 type ResourceConsumerHandler struct {
32 metrics map[string]float64
33 metricsLock sync.Mutex
34 }
35
36
37 func NewResourceConsumerHandler() *ResourceConsumerHandler {
38 return &ResourceConsumerHandler{metrics: map[string]float64{}}
39 }
40
41 func (handler *ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
42
43 if req.URL.Path == common.MetricsAddress {
44 handler.handleMetrics(w)
45 return
46 }
47 if req.Method != "POST" {
48 http.Error(w, common.BadRequest, http.StatusBadRequest)
49 return
50 }
51
52 if err := req.ParseForm(); err != nil {
53 http.Error(w, err.Error(), http.StatusBadRequest)
54 return
55 }
56
57 if req.URL.Path == common.ConsumeCPUAddress {
58 handler.handleConsumeCPU(w, req.Form)
59 return
60 }
61
62 if req.URL.Path == common.ConsumeMemAddress {
63 handler.handleConsumeMem(w, req.Form)
64 return
65 }
66
67 if req.URL.Path == common.GetCurrentStatusAddress {
68 handler.handleGetCurrentStatus(w)
69 return
70 }
71
72 if req.URL.Path == common.BumpMetricAddress {
73 handler.handleBumpMetric(w, req.Form)
74 return
75 }
76 http.Error(w, fmt.Sprintf("%s: %s", common.UnknownFunction, req.URL.Path), http.StatusNotFound)
77 }
78
79 func (handler *ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, query url.Values) {
80
81 durationSecString := query.Get(common.DurationSecQuery)
82 millicoresString := query.Get(common.MillicoresQuery)
83 if durationSecString == "" || millicoresString == "" {
84 http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
85 return
86 }
87
88
89 durationSec, durationSecError := strconv.Atoi(durationSecString)
90 millicores, millicoresError := strconv.Atoi(millicoresString)
91 if durationSecError != nil || millicoresError != nil {
92 http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
93 return
94 }
95
96 go ConsumeCPU(millicores, durationSec)
97 fmt.Fprintln(w, common.ConsumeCPUAddress[1:])
98 fmt.Fprintln(w, millicores, common.MillicoresQuery)
99 fmt.Fprintln(w, durationSec, common.DurationSecQuery)
100 }
101
102 func (handler *ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, query url.Values) {
103
104 durationSecString := query.Get(common.DurationSecQuery)
105 megabytesString := query.Get(common.MegabytesQuery)
106 if durationSecString == "" || megabytesString == "" {
107 http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
108 return
109 }
110
111
112 durationSec, durationSecError := strconv.Atoi(durationSecString)
113 megabytes, megabytesError := strconv.Atoi(megabytesString)
114 if durationSecError != nil || megabytesError != nil {
115 http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
116 return
117 }
118
119 go ConsumeMem(megabytes, durationSec)
120 fmt.Fprintln(w, common.ConsumeMemAddress[1:])
121 fmt.Fprintln(w, megabytes, common.MegabytesQuery)
122 fmt.Fprintln(w, durationSec, common.DurationSecQuery)
123 }
124
125 func (handler *ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWriter) {
126 GetCurrentStatus()
127 fmt.Fprintln(w, "Warning: not implemented!")
128 fmt.Fprint(w, common.GetCurrentStatusAddress[1:])
129 }
130
131 func (handler *ResourceConsumerHandler) handleMetrics(w http.ResponseWriter) {
132 handler.metricsLock.Lock()
133 defer handler.metricsLock.Unlock()
134 for k, v := range handler.metrics {
135 fmt.Fprintf(w, "# HELP %s info message.\n", k)
136 fmt.Fprintf(w, "# TYPE %s gauge\n", k)
137 fmt.Fprintf(w, "%s %f\n", k, v)
138 }
139 }
140
141 func (handler *ResourceConsumerHandler) bumpMetric(metric string, delta float64, duration time.Duration) {
142 handler.metricsLock.Lock()
143 if _, ok := handler.metrics[metric]; ok {
144 handler.metrics[metric] += delta
145 } else {
146 handler.metrics[metric] = delta
147 }
148 handler.metricsLock.Unlock()
149
150 time.Sleep(duration)
151
152 handler.metricsLock.Lock()
153 handler.metrics[metric] -= delta
154 handler.metricsLock.Unlock()
155 }
156
157 func (handler *ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, query url.Values) {
158
159 metric := query.Get(common.MetricNameQuery)
160 deltaString := query.Get(common.DeltaQuery)
161 durationSecString := query.Get(common.DurationSecQuery)
162 if durationSecString == "" || metric == "" || deltaString == "" {
163 http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
164 return
165 }
166
167
168 durationSec, durationSecError := strconv.Atoi(durationSecString)
169 delta, deltaError := strconv.ParseFloat(deltaString, 64)
170 if durationSecError != nil || deltaError != nil {
171 http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
172 return
173 }
174
175 go handler.bumpMetric(metric, delta, time.Duration(durationSec)*time.Second)
176 fmt.Fprintln(w, common.BumpMetricAddress[1:])
177 fmt.Fprintln(w, metric, common.MetricNameQuery)
178 fmt.Fprintln(w, delta, common.DeltaQuery)
179 fmt.Fprintln(w, durationSec, common.DurationSecQuery)
180 }
181
View as plain text