...

Source file src/k8s.io/kubernetes/test/images/agnhost/guestbook/guestbook.go

Documentation: k8s.io/kubernetes/test/images/agnhost/guestbook

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package guestbook
    18  
    19  import (
    20  	"encoding/json"
    21  	"fmt"
    22  	"io"
    23  	"log"
    24  	"net"
    25  	"net/http"
    26  	"net/url"
    27  	"strings"
    28  	"time"
    29  
    30  	"github.com/spf13/cobra"
    31  
    32  	utilnet "k8s.io/apimachinery/pkg/util/net"
    33  )
    34  
    35  // CmdGuestbook is used by agnhost Cobra.
    36  var CmdGuestbook = &cobra.Command{
    37  	Use:   "guestbook",
    38  	Short: "Creates a HTTP server with various endpoints representing a guestbook app",
    39  	Long: `Starts a HTTP server on the given --http-port (default: 80), serving various endpoints representing a guestbook app. The endpoints and their purpose are:
    40  
    41  - /register: A guestbook replica will subscribe to a primary, to its given --replicaof endpoint. The primary will then push any updates it receives to its registered replicas through the --backend-port.
    42  - /get: Returns '{"data": value}', where the value is the stored value for the given key if non-empty, or the entire store.
    43  - /set: Will set the given key-value pair in its own store and propagate it to its replicas, if any. Will return '{"data": "Updated"}' to the caller on success.
    44  - /guestbook: Will proxy the request to agnhost-primary if the given cmd is 'set', or agnhost-replica if the given cmd is 'get'.`,
    45  	Args: cobra.MaximumNArgs(0),
    46  	Run:  main,
    47  }
    48  
    49  var (
    50  	httpPort    string
    51  	backendPort string
    52  	replicaOf   string
    53  	replicas    []string
    54  	store       map[string]interface{}
    55  )
    56  
    57  const (
    58  	timeout = time.Duration(15) * time.Second
    59  	sleep   = time.Duration(1) * time.Second
    60  )
    61  
    62  func init() {
    63  	CmdGuestbook.Flags().StringVar(&httpPort, "http-port", "80", "HTTP Listen Port")
    64  	CmdGuestbook.Flags().StringVar(&backendPort, "backend-port", "6379", "Backend's HTTP Listen Port")
    65  	CmdGuestbook.Flags().StringVar(&replicaOf, "replicaof", "", "The host's name to register to")
    66  	store = make(map[string]interface{})
    67  }
    68  
    69  func main(cmd *cobra.Command, args []string) {
    70  	go registerNode(replicaOf, backendPort)
    71  	startHTTPServer(httpPort)
    72  }
    73  
    74  func registerNode(registerTo, port string) {
    75  	if registerTo == "" {
    76  		return
    77  	}
    78  
    79  	hostPort := net.JoinHostPort(registerTo, backendPort)
    80  
    81  	start := time.Now()
    82  	for time.Since(start) < timeout {
    83  		host, err := getIP(hostPort)
    84  		if err != nil {
    85  			log.Printf("unable to get IP %s: %v. Retrying in %s.", hostPort, err, sleep)
    86  			time.Sleep(sleep)
    87  			continue
    88  		}
    89  
    90  		request := fmt.Sprintf("register?host=%s", host.String())
    91  		log.Printf("Registering to primary: %s/%s", hostPort, request)
    92  		_, err = net.ResolveTCPAddr("tcp", hostPort)
    93  		if err != nil {
    94  			log.Printf("unable to resolve %s, --replicaof param and/or --backend-port param are invalid: %v. Retrying in %s.", hostPort, err, sleep)
    95  			time.Sleep(sleep)
    96  			continue
    97  		}
    98  
    99  		response, err := dialHTTP(request, hostPort)
   100  		if err != nil {
   101  			log.Printf("encountered error while registering to primary: %v. Retrying in %s.", err, sleep)
   102  			time.Sleep(sleep)
   103  			continue
   104  		}
   105  
   106  		responseJSON := make(map[string]interface{})
   107  		err = json.Unmarshal([]byte(response), &responseJSON)
   108  		if err != nil {
   109  			log.Fatalf("Error while unmarshaling primary's response: %v", err)
   110  		}
   111  
   112  		var ok bool
   113  		store, ok = responseJSON["data"].(map[string]interface{})
   114  		if !ok {
   115  			log.Fatalf("Could not cast responseJSON: %s", responseJSON["data"])
   116  		}
   117  		log.Printf("Registered to node: %s", registerTo)
   118  		return
   119  	}
   120  
   121  	log.Fatal("Timed out while registering to primary.")
   122  }
   123  
   124  func startHTTPServer(port string) {
   125  	http.HandleFunc("/register", registerHandler)
   126  	http.HandleFunc("/get", getHandler)
   127  	http.HandleFunc("/set", setHandler)
   128  	http.HandleFunc("/guestbook", guestbookHandler)
   129  	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
   130  }
   131  
   132  // registerHandler will register the caller in this server's list of replicas.
   133  // /set requests will be propagated to replicas, if any.
   134  func registerHandler(w http.ResponseWriter, r *http.Request) {
   135  	values, err := url.Parse(r.URL.RequestURI())
   136  	if err != nil {
   137  		http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
   138  		return
   139  	}
   140  
   141  	ip := values.Query().Get("host")
   142  	log.Printf("GET /register?host=%s", ip)
   143  
   144  	// send all the store to the replica as well.
   145  	output := make(map[string]interface{})
   146  	output["data"] = store
   147  	bytes, err := json.Marshal(output)
   148  	if err != nil {
   149  		http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
   150  		return
   151  	}
   152  	fmt.Fprint(w, string(bytes))
   153  	replicas = append(replicas, ip)
   154  	log.Printf("Node '%s' registered.", ip)
   155  }
   156  
   157  // getHandler will return '{"data": value}', where value is the stored value for
   158  // the given key if non-empty, or entire store.
   159  func getHandler(w http.ResponseWriter, r *http.Request) {
   160  	values, err := url.Parse(r.URL.RequestURI())
   161  	if err != nil {
   162  		http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
   163  		return
   164  	}
   165  
   166  	key := values.Query().Get("key")
   167  
   168  	log.Printf("GET /get?key=%s", key)
   169  
   170  	output := make(map[string]interface{})
   171  	if key == "" {
   172  		output["data"] = store
   173  	} else {
   174  		value, found := store[key]
   175  		if !found {
   176  			value = ""
   177  		}
   178  		output["data"] = value
   179  	}
   180  
   181  	bytes, err := json.Marshal(output)
   182  	if err == nil {
   183  		fmt.Fprint(w, string(bytes))
   184  	} else {
   185  		http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
   186  	}
   187  }
   188  
   189  // setHandler will set the given key-value pair in its own store and propagate
   190  // it to its replicas, if any. Will return '{"message": "Updated"}' to the caller on success.
   191  func setHandler(w http.ResponseWriter, r *http.Request) {
   192  	values, err := url.Parse(r.URL.RequestURI())
   193  	if err != nil {
   194  		http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
   195  		return
   196  	}
   197  
   198  	key := values.Query().Get("key")
   199  	value := values.Query().Get("value")
   200  
   201  	log.Printf("GET /set?key=%s&value=%s", key, value)
   202  
   203  	if key == "" {
   204  		http.Error(w, "cannot set with empty key.", http.StatusBadRequest)
   205  		return
   206  	}
   207  
   208  	store[key] = value
   209  	request := fmt.Sprintf("set?key=%s&value=%s", key, value)
   210  	for _, replica := range replicas {
   211  		hostPort := net.JoinHostPort(replica, backendPort)
   212  		_, err = dialHTTP(request, hostPort)
   213  		if err != nil {
   214  			http.Error(w, fmt.Sprintf("encountered error while propagating to replica '%s': %v", replica, err), http.StatusExpectationFailed)
   215  			return
   216  		}
   217  	}
   218  
   219  	output := map[string]string{}
   220  	output["message"] = "Updated"
   221  	bytes, err := json.Marshal(output)
   222  	if err == nil {
   223  		fmt.Fprint(w, string(bytes))
   224  	} else {
   225  		http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
   226  	}
   227  }
   228  
   229  // guestbookHandler will proxy the request to agnhost-primary if the given cmd is
   230  // 'set' or agnhost-replica if the given cmd is 'get'.
   231  func guestbookHandler(w http.ResponseWriter, r *http.Request) {
   232  	values, err := url.Parse(r.URL.RequestURI())
   233  	if err != nil {
   234  		http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
   235  		return
   236  	}
   237  
   238  	cmd := strings.ToLower(values.Query().Get("cmd"))
   239  	key := values.Query().Get("key")
   240  	value := values.Query().Get("value")
   241  
   242  	log.Printf("GET /guestbook?cmd=%s&key=%s&value=%s", cmd, key, value)
   243  
   244  	if cmd != "get" && cmd != "set" {
   245  		http.Error(w, fmt.Sprintf("unsupported cmd: '%s'", cmd), http.StatusBadRequest)
   246  		return
   247  	}
   248  	if cmd == "set" && key == "" {
   249  		http.Error(w, "cannot set with empty key.", http.StatusBadRequest)
   250  		return
   251  	}
   252  
   253  	host := "agnhost-primary"
   254  	if cmd == "get" {
   255  		host = "agnhost-replica"
   256  	}
   257  
   258  	hostPort := net.JoinHostPort(host, backendPort)
   259  	_, err = net.ResolveTCPAddr("tcp", hostPort)
   260  	if err != nil {
   261  		http.Error(w, fmt.Sprintf("host and/or port param are invalid. %v", err), http.StatusBadRequest)
   262  		return
   263  	}
   264  
   265  	request := fmt.Sprintf("%s?key=%s&value=%s", cmd, key, value)
   266  	response, err := dialHTTP(request, hostPort)
   267  	if err == nil {
   268  		fmt.Fprint(w, response)
   269  	} else {
   270  		http.Error(w, fmt.Sprintf("encountered error: %v", err), http.StatusExpectationFailed)
   271  	}
   272  }
   273  
   274  func dialHTTP(request, hostPort string) (string, error) {
   275  	transport := utilnet.SetTransportDefaults(&http.Transport{})
   276  	httpClient := createHTTPClient(transport)
   277  	resp, err := httpClient.Get(fmt.Sprintf("http://%s/%s", hostPort, request))
   278  	defer transport.CloseIdleConnections()
   279  	if err == nil {
   280  		defer resp.Body.Close()
   281  		body, err := io.ReadAll(resp.Body)
   282  		if err == nil {
   283  			return string(body), nil
   284  		}
   285  	}
   286  	return "", err
   287  }
   288  
   289  func createHTTPClient(transport *http.Transport) *http.Client {
   290  	client := &http.Client{
   291  		Transport: transport,
   292  		Timeout:   5 * time.Second,
   293  	}
   294  	return client
   295  }
   296  
   297  func getIP(hostPort string) (net.IP, error) {
   298  	conn, err := net.Dial("udp", hostPort)
   299  	if err != nil {
   300  		return []byte{}, err
   301  	}
   302  	defer conn.Close()
   303  
   304  	localAddr := conn.LocalAddr().(*net.UDPAddr)
   305  	return localAddr.IP, nil
   306  }
   307  

View as plain text