package socket import ( "context" "fmt" "net" "time" "github.com/google/uuid" "github.com/spf13/afero" "edge-infra.dev/pkg/lib/fog" ) const ( ResetMessage = "reset" SuccessResponse = "success" LockedResponse = "locked" InvalidResponse = "invalid" ) type Reseter interface { WithTryLock(ctx context.Context, busy chan<- bool, fn func(context.Context) error) (bool, error) ResetCluster(ctx context.Context) error } type Socket struct { Fs afero.Fs Path string Listener net.Listener Connections chan *Connection } type Connection struct { net.Conn uuid.UUID } // NewSocket returns a new Socket func NewSocket(fs afero.Fs, path string) *Socket { connections := make(chan *Connection, 5) return &Socket{ Fs: fs, Path: path, Connections: connections, } } // NewConnection returns a new Connection func NewConnection(conn net.Conn) *Connection { return &Connection{ conn, uuid.New(), } } // Listen cleans up old connections and creates a new net.Listener for the // Socket func (s *Socket) Listen() error { if err := s.Clear(); err != nil { return err } // announce a new listener for the socket listener, err := net.Listen("unix", s.Path) if err != nil { return fmt.Errorf("failed to create Unix socket listener: %w", err) } s.Listener = listener return nil } // Accept waits for connections to the Socket. When a connection is found, // it will be sent to the Connections channel to be handled by the Handle // method. // // Accept will loop continuously until the ctx.Done() is closed (usually via // WithContext, WithTimeout or WithCancel). func (s *Socket) Accept(ctx context.Context) { var lastLoggedErrorAt *time.Time log := fog.FromContext(ctx).WithValues("routine", "socket") ctx = fog.IntoContext(ctx, log) for { select { case <-ctx.Done(): // if ctx.Done() is closed, close the Connections channel to // indirectly stop Socket.Handle and then return close(s.Connections) return default: s.acceptConnection(ctx, lastLoggedErrorAt) } } } func (s *Socket) acceptConnection(ctx context.Context, lastLoggedErrorAt *time.Time) { log := fog.FromContext(ctx) conn, err := s.Listener.Accept() if err != nil { if time.Since(*lastLoggedErrorAt) > 5*time.Minute { log.Error(err, "failed to accept socket connections", "socket", s.Path) *lastLoggedErrorAt = time.Now() } // sleep for 20 seconds here to prevent continuous retries time.Sleep(20 * time.Second) return } log.V(0).Info("socket connection accepted", "emaudit", "") // send the new connection to the Socket Connections channel s.Connections <- NewConnection(conn) } // Handle watches the Connections channel for new connections. When it finds a // connection it will handle it and decide on further action to take. func (s *Socket) Handle(ctx context.Context, reseter Reseter) { log := fog.FromContext(ctx).WithValues("routine", "socket") ctx = fog.IntoContext(ctx, log) for c := range s.Connections { if err := c.handle(ctx, reseter); err != nil { log.Error(err, "failed to handle socket connection", "socket", s.Path) } } } // Clear removes the old socket file to close any lingering connections func (s *Socket) Clear() error { return s.Fs.RemoveAll(s.Path) } // handleConnection will attempt to read the input to the socket. If the // message "reset" is sent, then a cluster reset will be attempted. // // To eliminate the risk of race conditions, the reset will only be // carried out if the mutex is not locked. // // One of the following responses will then be written back to the socket // for the client to consume: "invalid" - this will be returned when the // message received by the socket was not "reset", "locked" - this will be // returned when the mutex lock could not be acquired due to being in use, // "success" - this will be returned when the lock was successfully acquired. // // A "success" response does not mean that the cluster was reset successfully, // it just means that the lock to do so was acquired and the process was // successfully initiated. func (c *Connection) handle(ctx context.Context, reseter Reseter) error { log := fog.FromContext(ctx).WithValues("connection", c.UUID) ctx = fog.IntoContext(ctx, log) defer c.Close() if err := c.SetDeadline(time.Now().Add(10 * time.Second)); err != nil { return err } msg, err := c.read() if err != nil { return err } // if the message is not "reset", this is an invalid request if msg != ResetMessage { log.V(0).Info("received invalid reset request") return c.write(InvalidResponse) } return handleReset(ctx, reseter, c) } // handleReset will attempt to reset the etcd cluster func handleReset(ctx context.Context, reseter Reseter, conn *Connection) error { log := fog.FromContext(ctx) log.V(0).Info("received instant reset request, resetting cluster...") locked := make(chan bool) go func() { if _, err := reseter.WithTryLock(ctx, locked, reseter.ResetCluster); err != nil { log.Error(err, "failed to reset the cluster") } }() // wait to receive a response from the ResetCluster method to confirm whether // the lock was successfully acquired if isLocked := <-locked; isLocked { log.V(0).Info("reset already in progress") return conn.write(LockedResponse) } return conn.write(SuccessResponse) } // read reads the request from the socket connection func (c *Connection) read() (string, error) { buffer := make([]byte, 512) n, err := c.Read(buffer) if err != nil { return "", fmt.Errorf("failed to read data from socket connection: %w", err) } return string(buffer[:n]), nil } // write writes the response to the socket connection func (c *Connection) write(response string) error { _, err := c.Write([]byte(response)) return err }