...

Source file src/k8s.io/kubernetes/test/images/resource-consumer/resource_consumer_handler.go

Documentation: k8s.io/kubernetes/test/images/resource-consumer

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package main
    18  
    19  import (
    20  	"fmt"
    21  	"net/http"
    22  	"net/url"
    23  	"strconv"
    24  	"sync"
    25  	"time"
    26  
    27  	"k8s.io/kubernetes/test/images/resource-consumer/common"
    28  )
    29  
    30  // ResourceConsumerHandler holds metrics for a resource consumer.
    31  type ResourceConsumerHandler struct {
    32  	metrics     map[string]float64
    33  	metricsLock sync.Mutex
    34  }
    35  
    36  // NewResourceConsumerHandler creates and initializes a ResourceConsumerHandler to defaults.
    37  func NewResourceConsumerHandler() *ResourceConsumerHandler {
    38  	return &ResourceConsumerHandler{metrics: map[string]float64{}}
    39  }
    40  
    41  func (handler *ResourceConsumerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
    42  	// handle exposing metrics in Prometheus format (both GET & POST)
    43  	if req.URL.Path == common.MetricsAddress {
    44  		handler.handleMetrics(w)
    45  		return
    46  	}
    47  	if req.Method != "POST" {
    48  		http.Error(w, common.BadRequest, http.StatusBadRequest)
    49  		return
    50  	}
    51  	// parsing POST request data and URL data
    52  	if err := req.ParseForm(); err != nil {
    53  		http.Error(w, err.Error(), http.StatusBadRequest)
    54  		return
    55  	}
    56  	// handle consumeCPU
    57  	if req.URL.Path == common.ConsumeCPUAddress {
    58  		handler.handleConsumeCPU(w, req.Form)
    59  		return
    60  	}
    61  	// handle consumeMem
    62  	if req.URL.Path == common.ConsumeMemAddress {
    63  		handler.handleConsumeMem(w, req.Form)
    64  		return
    65  	}
    66  	// handle getCurrentStatus
    67  	if req.URL.Path == common.GetCurrentStatusAddress {
    68  		handler.handleGetCurrentStatus(w)
    69  		return
    70  	}
    71  	// handle bumpMetric
    72  	if req.URL.Path == common.BumpMetricAddress {
    73  		handler.handleBumpMetric(w, req.Form)
    74  		return
    75  	}
    76  	http.Error(w, fmt.Sprintf("%s: %s", common.UnknownFunction, req.URL.Path), http.StatusNotFound)
    77  }
    78  
    79  func (handler *ResourceConsumerHandler) handleConsumeCPU(w http.ResponseWriter, query url.Values) {
    80  	// getting string data for consumeCPU
    81  	durationSecString := query.Get(common.DurationSecQuery)
    82  	millicoresString := query.Get(common.MillicoresQuery)
    83  	if durationSecString == "" || millicoresString == "" {
    84  		http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
    85  		return
    86  	}
    87  
    88  	// convert data (strings to ints) for consumeCPU
    89  	durationSec, durationSecError := strconv.Atoi(durationSecString)
    90  	millicores, millicoresError := strconv.Atoi(millicoresString)
    91  	if durationSecError != nil || millicoresError != nil {
    92  		http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
    93  		return
    94  	}
    95  
    96  	go ConsumeCPU(millicores, durationSec)
    97  	fmt.Fprintln(w, common.ConsumeCPUAddress[1:])
    98  	fmt.Fprintln(w, millicores, common.MillicoresQuery)
    99  	fmt.Fprintln(w, durationSec, common.DurationSecQuery)
   100  }
   101  
   102  func (handler *ResourceConsumerHandler) handleConsumeMem(w http.ResponseWriter, query url.Values) {
   103  	// getting string data for consumeMem
   104  	durationSecString := query.Get(common.DurationSecQuery)
   105  	megabytesString := query.Get(common.MegabytesQuery)
   106  	if durationSecString == "" || megabytesString == "" {
   107  		http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
   108  		return
   109  	}
   110  
   111  	// convert data (strings to ints) for consumeMem
   112  	durationSec, durationSecError := strconv.Atoi(durationSecString)
   113  	megabytes, megabytesError := strconv.Atoi(megabytesString)
   114  	if durationSecError != nil || megabytesError != nil {
   115  		http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
   116  		return
   117  	}
   118  
   119  	go ConsumeMem(megabytes, durationSec)
   120  	fmt.Fprintln(w, common.ConsumeMemAddress[1:])
   121  	fmt.Fprintln(w, megabytes, common.MegabytesQuery)
   122  	fmt.Fprintln(w, durationSec, common.DurationSecQuery)
   123  }
   124  
   125  func (handler *ResourceConsumerHandler) handleGetCurrentStatus(w http.ResponseWriter) {
   126  	GetCurrentStatus()
   127  	fmt.Fprintln(w, "Warning: not implemented!")
   128  	fmt.Fprint(w, common.GetCurrentStatusAddress[1:])
   129  }
   130  
   131  func (handler *ResourceConsumerHandler) handleMetrics(w http.ResponseWriter) {
   132  	handler.metricsLock.Lock()
   133  	defer handler.metricsLock.Unlock()
   134  	for k, v := range handler.metrics {
   135  		fmt.Fprintf(w, "# HELP %s info message.\n", k)
   136  		fmt.Fprintf(w, "# TYPE %s gauge\n", k)
   137  		fmt.Fprintf(w, "%s %f\n", k, v)
   138  	}
   139  }
   140  
   141  func (handler *ResourceConsumerHandler) bumpMetric(metric string, delta float64, duration time.Duration) {
   142  	handler.metricsLock.Lock()
   143  	if _, ok := handler.metrics[metric]; ok {
   144  		handler.metrics[metric] += delta
   145  	} else {
   146  		handler.metrics[metric] = delta
   147  	}
   148  	handler.metricsLock.Unlock()
   149  
   150  	time.Sleep(duration)
   151  
   152  	handler.metricsLock.Lock()
   153  	handler.metrics[metric] -= delta
   154  	handler.metricsLock.Unlock()
   155  }
   156  
   157  func (handler *ResourceConsumerHandler) handleBumpMetric(w http.ResponseWriter, query url.Values) {
   158  	// getting string data for handleBumpMetric
   159  	metric := query.Get(common.MetricNameQuery)
   160  	deltaString := query.Get(common.DeltaQuery)
   161  	durationSecString := query.Get(common.DurationSecQuery)
   162  	if durationSecString == "" || metric == "" || deltaString == "" {
   163  		http.Error(w, common.NotGivenFunctionArgument, http.StatusBadRequest)
   164  		return
   165  	}
   166  
   167  	// convert data (strings to ints/floats) for handleBumpMetric
   168  	durationSec, durationSecError := strconv.Atoi(durationSecString)
   169  	delta, deltaError := strconv.ParseFloat(deltaString, 64)
   170  	if durationSecError != nil || deltaError != nil {
   171  		http.Error(w, common.IncorrectFunctionArgument, http.StatusBadRequest)
   172  		return
   173  	}
   174  
   175  	go handler.bumpMetric(metric, delta, time.Duration(durationSec)*time.Second)
   176  	fmt.Fprintln(w, common.BumpMetricAddress[1:])
   177  	fmt.Fprintln(w, metric, common.MetricNameQuery)
   178  	fmt.Fprintln(w, delta, common.DeltaQuery)
   179  	fmt.Fprintln(w, durationSec, common.DurationSecQuery)
   180  }
   181  

View as plain text