1 package couchctl
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "net/http"
9 "strings"
10
11 "github.com/go-logr/logr"
12 "github.com/gorilla/websocket"
13
14 "edge-infra.dev/pkg/lib/fog"
15
16 "k8s.io/client-go/util/workqueue"
17 )
18
19
20 type InterlockClient struct {
21 baseURL string
22 log logr.Logger
23 cb func(e HostState, queue workqueue.RateLimitingInterface)
24 }
25
26 type Event struct {
27 Topic string `json:"topic"`
28 HostState HostState `json:"data"`
29 }
30
31 type HostState struct {
32 Hostname string `json:"hostname"`
33 Network HostNetwork `json:"network"`
34 }
35
36 type HostNetwork struct {
37 LanOutageDetected bool `json:"lan-outage-detected"`
38 LanOutageMode bool `json:"lan-outage-mode"`
39 }
40
41 func (h HostState) InLOM() bool {
42 return h.Network.LanOutageMode || h.Network.LanOutageDetected
43 }
44
45
46 func NewInterlockClient(baseURL string, cb func(e HostState, queue workqueue.RateLimitingInterface)) *InterlockClient {
47 return &InterlockClient{
48 baseURL: baseURL,
49 log: fog.New(),
50 cb: cb,
51 }
52 }
53
54 func (c *InterlockClient) GetHostState(ctx context.Context) (*HostState, error) {
55 hs := &HostState{}
56 ulr := c.baseURL
57 if !strings.HasPrefix(c.baseURL, "http") {
58 ulr = fmt.Sprintf("http://%s", c.baseURL)
59 }
60 return hs, c.getRequest(ctx, fmt.Sprintf("%s/v1/host", ulr), hs)
61 }
62
63 func (c *InterlockClient) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
64 if c.cb == nil {
65 return fmt.Errorf("can't subscribe, need a callback function")
66 }
67
68 wsURL := c.baseURL
69 if strings.HasPrefix(wsURL, "http") {
70 wsURL = strings.Replace(wsURL, "http", "ws", 1)
71 } else if !strings.Contains(wsURL, ":") {
72 wsURL = fmt.Sprintf("ws://%s", c.baseURL)
73 }
74 wsURL = fmt.Sprintf("%s/v1/subscribe", wsURL)
75
76 c.log.Info("INTERLOCK SUBSCRIBE URL", "URL", wsURL)
77
78 conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
79 if err != nil {
80 return err
81 }
82
83 go func() {
84 defer conn.Close()
85 for {
86 select {
87 case <-ctx.Done():
88 break
89 default:
90 _, msg, err := conn.ReadMessage()
91 if err != nil {
92 c.log.Error(err, "fail to read websocket message")
93 break
94 }
95 e := Event{}
96 err = json.Unmarshal(msg, &e)
97 if err != nil {
98 c.log.Error(err, "fail to unmarshal websocket message", "body", string(msg))
99 break
100 }
101
102 if e.Topic == "host" && e.HostState.Hostname != "" {
103 c.log.Info("host event received: ", "host", e.HostState.Hostname, "IN_LOM", e.HostState.InLOM())
104 c.cb(e.HostState, queue)
105 }
106 }
107 }
108 }()
109 return nil
110 }
111
112 func (c *InterlockClient) getRequest(ctx context.Context, path string, s interface{}) error {
113 req, err := http.NewRequestWithContext(ctx, "GET", path, nil)
114 if err != nil {
115 return err
116 }
117 resp, err := http.DefaultClient.Do(req)
118 if err != nil {
119 return err
120 }
121 defer resp.Body.Close()
122 body, err := io.ReadAll(resp.Body)
123 if err != nil {
124 return err
125 }
126 if err = json.Unmarshal(body, s); err != nil {
127 c.log.Error(err, "fail to unmarshal interlock api response", "body", string(body), "status", resp.Status)
128 return err
129 }
130 return nil
131 }
132
View as plain text