package couchctl import ( "context" "encoding/json" "fmt" "io" "net/http" "strings" "github.com/go-logr/logr" "github.com/gorilla/websocket" "edge-infra.dev/pkg/lib/fog" "k8s.io/client-go/util/workqueue" ) // InterlockClient https://docs.edge-infra.dev/edge/sds/interlock/api/ type InterlockClient struct { baseURL string log logr.Logger cb func(e HostState, queue workqueue.RateLimitingInterface) } type Event struct { Topic string `json:"topic"` // must be `host` for HostState data HostState HostState `json:"data"` } type HostState struct { Hostname string `json:"hostname"` Network HostNetwork `json:"network"` } type HostNetwork struct { LanOutageDetected bool `json:"lan-outage-detected"` LanOutageMode bool `json:"lan-outage-mode"` } func (h HostState) InLOM() bool { return h.Network.LanOutageMode || h.Network.LanOutageDetected } // NewInterlockClient retrieves state of the cluster func NewInterlockClient(baseURL string, cb func(e HostState, queue workqueue.RateLimitingInterface)) *InterlockClient { return &InterlockClient{ baseURL: baseURL, log: fog.New(), cb: cb, } } func (c *InterlockClient) GetHostState(ctx context.Context) (*HostState, error) { hs := &HostState{} ulr := c.baseURL if !strings.HasPrefix(c.baseURL, "http") { ulr = fmt.Sprintf("http://%s", c.baseURL) } return hs, c.getRequest(ctx, fmt.Sprintf("%s/v1/host", ulr), hs) } func (c *InterlockClient) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { if c.cb == nil { return fmt.Errorf("can't subscribe, need a callback function") } wsURL := c.baseURL if strings.HasPrefix(wsURL, "http") { wsURL = strings.Replace(wsURL, "http", "ws", 1) } else if !strings.Contains(wsURL, ":") { wsURL = fmt.Sprintf("ws://%s", c.baseURL) } wsURL = fmt.Sprintf("%s/v1/subscribe", wsURL) c.log.Info("INTERLOCK SUBSCRIBE URL", "URL", wsURL) conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) if err != nil { return err } go func() { defer conn.Close() for { select { case <-ctx.Done(): break default: _, msg, err := conn.ReadMessage() if err != nil { c.log.Error(err, "fail to read websocket message") break } e := Event{} err = json.Unmarshal(msg, &e) if err != nil { c.log.Error(err, "fail to unmarshal websocket message", "body", string(msg)) break } // host state topic == host if e.Topic == "host" && e.HostState.Hostname != "" { c.log.Info("host event received: ", "host", e.HostState.Hostname, "IN_LOM", e.HostState.InLOM()) c.cb(e.HostState, queue) } } } }() return nil } func (c *InterlockClient) getRequest(ctx context.Context, path string, s interface{}) error { req, err := http.NewRequestWithContext(ctx, "GET", path, nil) if err != nil { return err } resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return err } if err = json.Unmarshal(body, s); err != nil { c.log.Error(err, "fail to unmarshal interlock api response", "body", string(body), "status", resp.Status) return err } return nil }