...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/interlock_client.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"io"
     8  	"net/http"
     9  	"strings"
    10  
    11  	"github.com/go-logr/logr"
    12  	"github.com/gorilla/websocket"
    13  
    14  	"edge-infra.dev/pkg/lib/fog"
    15  
    16  	"k8s.io/client-go/util/workqueue"
    17  )
    18  
    19  // InterlockClient https://docs.edge-infra.dev/edge/sds/interlock/api/
    20  type InterlockClient struct {
    21  	baseURL string
    22  	log     logr.Logger
    23  	cb      func(e HostState, queue workqueue.RateLimitingInterface)
    24  }
    25  
    26  type Event struct {
    27  	Topic     string    `json:"topic"` // must be `host` for HostState data
    28  	HostState HostState `json:"data"`
    29  }
    30  
    31  type HostState struct {
    32  	Hostname string      `json:"hostname"`
    33  	Network  HostNetwork `json:"network"`
    34  }
    35  
    36  type HostNetwork struct {
    37  	LanOutageDetected bool `json:"lan-outage-detected"`
    38  	LanOutageMode     bool `json:"lan-outage-mode"`
    39  }
    40  
    41  func (h HostState) InLOM() bool {
    42  	return h.Network.LanOutageMode || h.Network.LanOutageDetected
    43  }
    44  
    45  // NewInterlockClient retrieves state of the cluster
    46  func NewInterlockClient(baseURL string, cb func(e HostState, queue workqueue.RateLimitingInterface)) *InterlockClient {
    47  	return &InterlockClient{
    48  		baseURL: baseURL,
    49  		log:     fog.New(),
    50  		cb:      cb,
    51  	}
    52  }
    53  
    54  func (c *InterlockClient) GetHostState(ctx context.Context) (*HostState, error) {
    55  	hs := &HostState{}
    56  	ulr := c.baseURL
    57  	if !strings.HasPrefix(c.baseURL, "http") {
    58  		ulr = fmt.Sprintf("http://%s", c.baseURL)
    59  	}
    60  	return hs, c.getRequest(ctx, fmt.Sprintf("%s/v1/host", ulr), hs)
    61  }
    62  
    63  func (c *InterlockClient) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
    64  	if c.cb == nil {
    65  		return fmt.Errorf("can't subscribe, need a callback function")
    66  	}
    67  
    68  	wsURL := c.baseURL
    69  	if strings.HasPrefix(wsURL, "http") {
    70  		wsURL = strings.Replace(wsURL, "http", "ws", 1)
    71  	} else if !strings.Contains(wsURL, ":") {
    72  		wsURL = fmt.Sprintf("ws://%s", c.baseURL)
    73  	}
    74  	wsURL = fmt.Sprintf("%s/v1/subscribe", wsURL)
    75  
    76  	c.log.Info("INTERLOCK SUBSCRIBE URL", "URL", wsURL)
    77  
    78  	conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
    79  	if err != nil {
    80  		return err
    81  	}
    82  
    83  	go func() {
    84  		defer conn.Close()
    85  		for {
    86  			select {
    87  			case <-ctx.Done():
    88  				break
    89  			default:
    90  				_, msg, err := conn.ReadMessage()
    91  				if err != nil {
    92  					c.log.Error(err, "fail to read websocket message")
    93  					break
    94  				}
    95  				e := Event{}
    96  				err = json.Unmarshal(msg, &e)
    97  				if err != nil {
    98  					c.log.Error(err, "fail to unmarshal websocket message", "body", string(msg))
    99  					break
   100  				}
   101  				// host state topic == host
   102  				if e.Topic == "host" && e.HostState.Hostname != "" {
   103  					c.log.Info("host event received: ", "host", e.HostState.Hostname, "IN_LOM", e.HostState.InLOM())
   104  					c.cb(e.HostState, queue)
   105  				}
   106  			}
   107  		}
   108  	}()
   109  	return nil
   110  }
   111  
   112  func (c *InterlockClient) getRequest(ctx context.Context, path string, s interface{}) error {
   113  	req, err := http.NewRequestWithContext(ctx, "GET", path, nil)
   114  	if err != nil {
   115  		return err
   116  	}
   117  	resp, err := http.DefaultClient.Do(req)
   118  	if err != nil {
   119  		return err
   120  	}
   121  	defer resp.Body.Close()
   122  	body, err := io.ReadAll(resp.Body)
   123  	if err != nil {
   124  		return err
   125  	}
   126  	if err = json.Unmarshal(body, s); err != nil {
   127  		c.log.Error(err, "fail to unmarshal interlock api response", "body", string(body), "status", resp.Status)
   128  		return err
   129  	}
   130  	return nil
   131  }
   132  

View as plain text