1
16
17 package guestbook
18
19 import (
20 "encoding/json"
21 "fmt"
22 "io"
23 "log"
24 "net"
25 "net/http"
26 "net/url"
27 "strings"
28 "time"
29
30 "github.com/spf13/cobra"
31
32 utilnet "k8s.io/apimachinery/pkg/util/net"
33 )
34
35
36 var CmdGuestbook = &cobra.Command{
37 Use: "guestbook",
38 Short: "Creates a HTTP server with various endpoints representing a guestbook app",
39 Long: `Starts a HTTP server on the given --http-port (default: 80), serving various endpoints representing a guestbook app. The endpoints and their purpose are:
40
41 - /register: A guestbook replica will subscribe to a primary, to its given --replicaof endpoint. The primary will then push any updates it receives to its registered replicas through the --backend-port.
42 - /get: Returns '{"data": value}', where the value is the stored value for the given key if non-empty, or the entire store.
43 - /set: Will set the given key-value pair in its own store and propagate it to its replicas, if any. Will return '{"data": "Updated"}' to the caller on success.
44 - /guestbook: Will proxy the request to agnhost-primary if the given cmd is 'set', or agnhost-replica if the given cmd is 'get'.`,
45 Args: cobra.MaximumNArgs(0),
46 Run: main,
47 }
48
49 var (
50 httpPort string
51 backendPort string
52 replicaOf string
53 replicas []string
54 store map[string]interface{}
55 )
56
57 const (
58 timeout = time.Duration(15) * time.Second
59 sleep = time.Duration(1) * time.Second
60 )
61
62 func init() {
63 CmdGuestbook.Flags().StringVar(&httpPort, "http-port", "80", "HTTP Listen Port")
64 CmdGuestbook.Flags().StringVar(&backendPort, "backend-port", "6379", "Backend's HTTP Listen Port")
65 CmdGuestbook.Flags().StringVar(&replicaOf, "replicaof", "", "The host's name to register to")
66 store = make(map[string]interface{})
67 }
68
69 func main(cmd *cobra.Command, args []string) {
70 go registerNode(replicaOf, backendPort)
71 startHTTPServer(httpPort)
72 }
73
74 func registerNode(registerTo, port string) {
75 if registerTo == "" {
76 return
77 }
78
79 hostPort := net.JoinHostPort(registerTo, backendPort)
80
81 start := time.Now()
82 for time.Since(start) < timeout {
83 host, err := getIP(hostPort)
84 if err != nil {
85 log.Printf("unable to get IP %s: %v. Retrying in %s.", hostPort, err, sleep)
86 time.Sleep(sleep)
87 continue
88 }
89
90 request := fmt.Sprintf("register?host=%s", host.String())
91 log.Printf("Registering to primary: %s/%s", hostPort, request)
92 _, err = net.ResolveTCPAddr("tcp", hostPort)
93 if err != nil {
94 log.Printf("unable to resolve %s, --replicaof param and/or --backend-port param are invalid: %v. Retrying in %s.", hostPort, err, sleep)
95 time.Sleep(sleep)
96 continue
97 }
98
99 response, err := dialHTTP(request, hostPort)
100 if err != nil {
101 log.Printf("encountered error while registering to primary: %v. Retrying in %s.", err, sleep)
102 time.Sleep(sleep)
103 continue
104 }
105
106 responseJSON := make(map[string]interface{})
107 err = json.Unmarshal([]byte(response), &responseJSON)
108 if err != nil {
109 log.Fatalf("Error while unmarshaling primary's response: %v", err)
110 }
111
112 var ok bool
113 store, ok = responseJSON["data"].(map[string]interface{})
114 if !ok {
115 log.Fatalf("Could not cast responseJSON: %s", responseJSON["data"])
116 }
117 log.Printf("Registered to node: %s", registerTo)
118 return
119 }
120
121 log.Fatal("Timed out while registering to primary.")
122 }
123
124 func startHTTPServer(port string) {
125 http.HandleFunc("/register", registerHandler)
126 http.HandleFunc("/get", getHandler)
127 http.HandleFunc("/set", setHandler)
128 http.HandleFunc("/guestbook", guestbookHandler)
129 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), nil))
130 }
131
132
133
134 func registerHandler(w http.ResponseWriter, r *http.Request) {
135 values, err := url.Parse(r.URL.RequestURI())
136 if err != nil {
137 http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
138 return
139 }
140
141 ip := values.Query().Get("host")
142 log.Printf("GET /register?host=%s", ip)
143
144
145 output := make(map[string]interface{})
146 output["data"] = store
147 bytes, err := json.Marshal(output)
148 if err != nil {
149 http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
150 return
151 }
152 fmt.Fprint(w, string(bytes))
153 replicas = append(replicas, ip)
154 log.Printf("Node '%s' registered.", ip)
155 }
156
157
158
159 func getHandler(w http.ResponseWriter, r *http.Request) {
160 values, err := url.Parse(r.URL.RequestURI())
161 if err != nil {
162 http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
163 return
164 }
165
166 key := values.Query().Get("key")
167
168 log.Printf("GET /get?key=%s", key)
169
170 output := make(map[string]interface{})
171 if key == "" {
172 output["data"] = store
173 } else {
174 value, found := store[key]
175 if !found {
176 value = ""
177 }
178 output["data"] = value
179 }
180
181 bytes, err := json.Marshal(output)
182 if err == nil {
183 fmt.Fprint(w, string(bytes))
184 } else {
185 http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
186 }
187 }
188
189
190
191 func setHandler(w http.ResponseWriter, r *http.Request) {
192 values, err := url.Parse(r.URL.RequestURI())
193 if err != nil {
194 http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
195 return
196 }
197
198 key := values.Query().Get("key")
199 value := values.Query().Get("value")
200
201 log.Printf("GET /set?key=%s&value=%s", key, value)
202
203 if key == "" {
204 http.Error(w, "cannot set with empty key.", http.StatusBadRequest)
205 return
206 }
207
208 store[key] = value
209 request := fmt.Sprintf("set?key=%s&value=%s", key, value)
210 for _, replica := range replicas {
211 hostPort := net.JoinHostPort(replica, backendPort)
212 _, err = dialHTTP(request, hostPort)
213 if err != nil {
214 http.Error(w, fmt.Sprintf("encountered error while propagating to replica '%s': %v", replica, err), http.StatusExpectationFailed)
215 return
216 }
217 }
218
219 output := map[string]string{}
220 output["message"] = "Updated"
221 bytes, err := json.Marshal(output)
222 if err == nil {
223 fmt.Fprint(w, string(bytes))
224 } else {
225 http.Error(w, fmt.Sprintf("response could not be serialized. %v", err), http.StatusExpectationFailed)
226 }
227 }
228
229
230
231 func guestbookHandler(w http.ResponseWriter, r *http.Request) {
232 values, err := url.Parse(r.URL.RequestURI())
233 if err != nil {
234 http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
235 return
236 }
237
238 cmd := strings.ToLower(values.Query().Get("cmd"))
239 key := values.Query().Get("key")
240 value := values.Query().Get("value")
241
242 log.Printf("GET /guestbook?cmd=%s&key=%s&value=%s", cmd, key, value)
243
244 if cmd != "get" && cmd != "set" {
245 http.Error(w, fmt.Sprintf("unsupported cmd: '%s'", cmd), http.StatusBadRequest)
246 return
247 }
248 if cmd == "set" && key == "" {
249 http.Error(w, "cannot set with empty key.", http.StatusBadRequest)
250 return
251 }
252
253 host := "agnhost-primary"
254 if cmd == "get" {
255 host = "agnhost-replica"
256 }
257
258 hostPort := net.JoinHostPort(host, backendPort)
259 _, err = net.ResolveTCPAddr("tcp", hostPort)
260 if err != nil {
261 http.Error(w, fmt.Sprintf("host and/or port param are invalid. %v", err), http.StatusBadRequest)
262 return
263 }
264
265 request := fmt.Sprintf("%s?key=%s&value=%s", cmd, key, value)
266 response, err := dialHTTP(request, hostPort)
267 if err == nil {
268 fmt.Fprint(w, response)
269 } else {
270 http.Error(w, fmt.Sprintf("encountered error: %v", err), http.StatusExpectationFailed)
271 }
272 }
273
274 func dialHTTP(request, hostPort string) (string, error) {
275 transport := utilnet.SetTransportDefaults(&http.Transport{})
276 httpClient := createHTTPClient(transport)
277 resp, err := httpClient.Get(fmt.Sprintf("http://%s/%s", hostPort, request))
278 defer transport.CloseIdleConnections()
279 if err == nil {
280 defer resp.Body.Close()
281 body, err := io.ReadAll(resp.Body)
282 if err == nil {
283 return string(body), nil
284 }
285 }
286 return "", err
287 }
288
289 func createHTTPClient(transport *http.Transport) *http.Client {
290 client := &http.Client{
291 Transport: transport,
292 Timeout: 5 * time.Second,
293 }
294 return client
295 }
296
297 func getIP(hostPort string) (net.IP, error) {
298 conn, err := net.Dial("udp", hostPort)
299 if err != nil {
300 return []byte{}, err
301 }
302 defer conn.Close()
303
304 localAddr := conn.LocalAddr().(*net.UDPAddr)
305 return localAddr.IP, nil
306 }
307
View as plain text