...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package activatecmd
17
18 import (
19 "context"
20 "encoding/json"
21 "errors"
22 "fmt"
23 "log"
24 "net"
25 "os"
26 "os/exec"
27 "strconv"
28 "sync"
29 "syscall"
30 "time"
31
32 "github.com/sassoftware/relic/internal/closeonce"
33 "golang.org/x/sync/errgroup"
34 "golang.org/x/sys/unix"
35 )
36
37
38 type Listener struct {
39 once sync.Once
40 closed closeonce.Closed
41 ready chan int
42 stopping chan int
43 eg *errgroup.Group
44 ctx context.Context
45 cancel context.CancelFunc
46 }
47
48 func (l *Listener) initialize() {
49 l.ctx, l.cancel = context.WithCancel(context.Background())
50 l.eg, l.ctx = errgroup.WithContext(l.ctx)
51 l.ready = make(chan int, 10)
52 l.stopping = make(chan int, 10)
53 }
54
55
56 func (l *Listener) Ready() <-chan int {
57 l.once.Do(l.initialize)
58 return l.ready
59 }
60
61
62 func (l *Listener) Stopping() <-chan int {
63 l.once.Do(l.initialize)
64 return l.stopping
65 }
66
67
68 func (l *Listener) Close() error {
69 l.once.Do(l.initialize)
70 return l.closed.Close(func() error {
71 l.cancel()
72 err := l.eg.Wait()
73 close(l.ready)
74 close(l.stopping)
75 return err
76 })
77 }
78
79
80
81
82 func (l *Listener) Attach(cmd *exec.Cmd) (detach func(), err error) {
83 l.once.Do(l.initialize)
84 if l.closed.Closed() {
85 return nil, errors.New("listener is closed")
86 }
87 if cmd.Env == nil {
88 cmd.Env = ClearEnv(os.Environ())
89 }
90 parentEnd, childEnd, err := socketpair()
91 if err != nil {
92 return nil, err
93 }
94 cmd.Env = append(cmd.Env, fmt.Sprintf("EINHORN_SOCK_FD=%d", 3+len(cmd.ExtraFiles)))
95 cmd.ExtraFiles = append(cmd.ExtraFiles, childEnd)
96 go func() {
97 <-l.ctx.Done()
98 parentEnd.Close()
99 }()
100 l.eg.Go(func() error { return l.listen(parentEnd) })
101 detach = func() { childEnd.Close() }
102 return detach, nil
103 }
104
105
106 func (l *Listener) listen(sock net.PacketConn) error {
107 buf := make([]byte, 4096)
108 failures := 0
109 var payload struct {
110 Command string `json:"command"`
111 PID int `json:"pid"`
112 }
113 for l.ctx.Err() == nil {
114 n, _, err := sock.ReadFrom(buf)
115 if err != nil {
116 if l.ctx.Err() != nil {
117 return nil
118 }
119 log.Printf("error: failed to read from notify socket: %s", err)
120 failures++
121 if failures > 100 {
122 return err
123 }
124 time.Sleep(100 * time.Millisecond)
125 }
126 failures = 0
127 if err := json.Unmarshal(buf[:n], &payload); err != nil {
128 log.Printf("error: failed to decode notification: %s", err)
129 continue
130 }
131 switch payload.Command {
132 case "worker:ack":
133 l.ready <- payload.PID
134 case "worker:stopping":
135 l.stopping <- payload.PID
136 }
137 }
138 return nil
139 }
140
141
142 func socketpair() (parentEnd net.PacketConn, childEnd *os.File, err error) {
143 files, err := socketpairFiles()
144 if err != nil {
145 return nil, nil, err
146 }
147 defer files[0].Close()
148 childEnd = files[1]
149 if err = unix.SetNonblock(int(files[0].Fd()), true); err == nil {
150 parentEnd, err = net.FilePacketConn(files[0])
151 if err == nil {
152 return parentEnd, childEnd, nil
153 }
154 }
155 files[1].Close()
156 return nil, nil, err
157 }
158
159
160 func socketpairFiles() (files [2]*os.File, err error) {
161 syscall.ForkLock.RLock()
162 defer syscall.ForkLock.RUnlock()
163 fds, err := unix.Socketpair(unix.AF_UNIX, unix.SOCK_DGRAM, 0)
164 if err == nil {
165 unix.CloseOnExec(fds[0])
166 unix.CloseOnExec(fds[1])
167 files[0] = os.NewFile(uintptr(fds[0]), "<socketpair>")
168 files[1] = os.NewFile(uintptr(fds[1]), "<socketpair>")
169 }
170 return
171 }
172
173
174
175 func DaemonStopping() error {
176 fd, err := strconv.Atoi(os.Getenv("EINHORN_SOCK_FD"))
177 if err != nil {
178 return err
179 }
180 message := fmt.Sprintf(`{"command":"worker:stopping", "pid":%d}`+"\n", os.Getpid())
181 _, err = unix.Write(fd, []byte(message))
182 return err
183 }
184
View as plain text