...

Source file src/github.com/sassoftware/relic/internal/activation/activatecmd/notify_unix.go

Documentation: github.com/sassoftware/relic/internal/activation/activatecmd

     1  // Copyright © SAS Institute Inc.
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  //     http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  //
    14  // +build darwin dragonfly freebsd linux netbsd openbsd solaris
    15  
    16  package activatecmd
    17  
    18  import (
    19  	"context"
    20  	"encoding/json"
    21  	"errors"
    22  	"fmt"
    23  	"log"
    24  	"net"
    25  	"os"
    26  	"os/exec"
    27  	"strconv"
    28  	"sync"
    29  	"syscall"
    30  	"time"
    31  
    32  	"github.com/sassoftware/relic/internal/closeonce"
    33  	"golang.org/x/sync/errgroup"
    34  	"golang.org/x/sys/unix"
    35  )
    36  
    37  // Listener implements a channel that receives ready notifications from spawned processes
    38  type Listener struct {
    39  	once     sync.Once
    40  	closed   closeonce.Closed
    41  	ready    chan int
    42  	stopping chan int
    43  	eg       *errgroup.Group
    44  	ctx      context.Context
    45  	cancel   context.CancelFunc
    46  }
    47  
    48  func (l *Listener) initialize() {
    49  	l.ctx, l.cancel = context.WithCancel(context.Background())
    50  	l.eg, l.ctx = errgroup.WithContext(l.ctx)
    51  	l.ready = make(chan int, 10)
    52  	l.stopping = make(chan int, 10)
    53  }
    54  
    55  // Ready returns a channel that receives PIDs that are ready
    56  func (l *Listener) Ready() <-chan int {
    57  	l.once.Do(l.initialize)
    58  	return l.ready
    59  }
    60  
    61  // Stopping returns a channel that receives PIDs that are stopping
    62  func (l *Listener) Stopping() <-chan int {
    63  	l.once.Do(l.initialize)
    64  	return l.stopping
    65  }
    66  
    67  // Close shuts down all notification sockets
    68  func (l *Listener) Close() error {
    69  	l.once.Do(l.initialize)
    70  	return l.closed.Close(func() error {
    71  		l.cancel()
    72  		err := l.eg.Wait()
    73  		close(l.ready)
    74  		close(l.stopping)
    75  		return err
    76  	})
    77  }
    78  
    79  // Attach a notification socket to a new child process. The returned "detach"
    80  // function must be invoked on cleanup, and should be invoked after Start()
    81  // returns.
    82  func (l *Listener) Attach(cmd *exec.Cmd) (detach func(), err error) {
    83  	l.once.Do(l.initialize)
    84  	if l.closed.Closed() {
    85  		return nil, errors.New("listener is closed")
    86  	}
    87  	if cmd.Env == nil {
    88  		cmd.Env = ClearEnv(os.Environ())
    89  	}
    90  	parentEnd, childEnd, err := socketpair()
    91  	if err != nil {
    92  		return nil, err
    93  	}
    94  	cmd.Env = append(cmd.Env, fmt.Sprintf("EINHORN_SOCK_FD=%d", 3+len(cmd.ExtraFiles)))
    95  	cmd.ExtraFiles = append(cmd.ExtraFiles, childEnd)
    96  	go func() {
    97  		<-l.ctx.Done()
    98  		parentEnd.Close()
    99  	}()
   100  	l.eg.Go(func() error { return l.listen(parentEnd) })
   101  	detach = func() { childEnd.Close() }
   102  	return detach, nil
   103  }
   104  
   105  // Consume packets from a socket and forward them to the main channel until ctx is cancelled
   106  func (l *Listener) listen(sock net.PacketConn) error {
   107  	buf := make([]byte, 4096)
   108  	failures := 0
   109  	var payload struct {
   110  		Command string `json:"command"`
   111  		PID     int    `json:"pid"`
   112  	}
   113  	for l.ctx.Err() == nil {
   114  		n, _, err := sock.ReadFrom(buf)
   115  		if err != nil {
   116  			if l.ctx.Err() != nil {
   117  				return nil
   118  			}
   119  			log.Printf("error: failed to read from notify socket: %s", err)
   120  			failures++
   121  			if failures > 100 {
   122  				return err
   123  			}
   124  			time.Sleep(100 * time.Millisecond)
   125  		}
   126  		failures = 0
   127  		if err := json.Unmarshal(buf[:n], &payload); err != nil {
   128  			log.Printf("error: failed to decode notification: %s", err)
   129  			continue
   130  		}
   131  		switch payload.Command {
   132  		case "worker:ack":
   133  			l.ready <- payload.PID
   134  		case "worker:stopping":
   135  			l.stopping <- payload.PID
   136  		}
   137  	}
   138  	return nil
   139  }
   140  
   141  // Create a socketpair with the parent end wrapped in a PacketConn and the child end as a plain *os.File
   142  func socketpair() (parentEnd net.PacketConn, childEnd *os.File, err error) {
   143  	files, err := socketpairFiles()
   144  	if err != nil {
   145  		return nil, nil, err
   146  	}
   147  	defer files[0].Close() // FilePacketConn will dup this so always close it
   148  	childEnd = files[1]
   149  	if err = unix.SetNonblock(int(files[0].Fd()), true); err == nil {
   150  		parentEnd, err = net.FilePacketConn(files[0])
   151  		if err == nil {
   152  			return parentEnd, childEnd, nil
   153  		}
   154  	}
   155  	files[1].Close()
   156  	return nil, nil, err
   157  }
   158  
   159  // Create a socketpair as *os.File objects. must hold the fork lock to ensure that no file descriptors are leaked to a child process before CloseOnExec can be set.
   160  func socketpairFiles() (files [2]*os.File, err error) {
   161  	syscall.ForkLock.RLock()
   162  	defer syscall.ForkLock.RUnlock()
   163  	fds, err := unix.Socketpair(unix.AF_UNIX, unix.SOCK_DGRAM, 0)
   164  	if err == nil {
   165  		unix.CloseOnExec(fds[0])
   166  		unix.CloseOnExec(fds[1])
   167  		files[0] = os.NewFile(uintptr(fds[0]), "<socketpair>")
   168  		files[1] = os.NewFile(uintptr(fds[1]), "<socketpair>")
   169  	}
   170  	return
   171  }
   172  
   173  // DaemonStopping is used by the child process to indicate it is no longer
   174  // serving requests and will exit soon.
   175  func DaemonStopping() error {
   176  	fd, err := strconv.Atoi(os.Getenv("EINHORN_SOCK_FD"))
   177  	if err != nil {
   178  		return err
   179  	}
   180  	message := fmt.Sprintf(`{"command":"worker:stopping", "pid":%d}`+"\n", os.Getpid())
   181  	_, err = unix.Write(fd, []byte(message))
   182  	return err
   183  }
   184  

View as plain text