1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package assuan
18
19 import (
20 "bufio"
21 "errors"
22 "fmt"
23 "net"
24 "net/url"
25 "strings"
26 "sync"
27
28 "github.com/sassoftware/relic/lib/dlog"
29 )
30
31
32
33
34
35 const (
36 StatusOk = "OK"
37 StatusErr = "ERR"
38 StatusInquire = "INQUIRE"
39 StatusData = "D"
40 StatusLines = "S"
41 StatusComment = "#"
42 )
43
44 type Conn struct {
45 conn net.Conn
46 r *bufio.Reader
47 mu sync.Mutex
48 }
49
50 type Response struct {
51 Status string
52 StatusMessage string
53 Lines []string
54 Blob []byte
55 }
56
57 func (r Response) Error() string {
58 return fmt.Sprintf("response error: %s", r.StatusMessage)
59 }
60
61 type InquireFunc func(inquireLine string, msgLines []string) (string, error)
62
63 var InquireCancel = errors.New("inquiry cancelled")
64
65 func Dial(path string) (*Conn, error) {
66 conn, err := net.Dial("unix", path)
67 if err != nil {
68 return nil, err
69 }
70 s := &Conn{
71 conn: conn,
72 r: bufio.NewReader(conn),
73 }
74 status, msg, err := s.readLine()
75 if err != nil {
76 conn.Close()
77 return nil, err
78 } else if status != StatusOk {
79 conn.Close()
80 return nil, fmt.Errorf("failed to connect to %s: %s", path, msg)
81 }
82 return s, nil
83 }
84
85 func (c *Conn) write(cmd string) error {
86 dlog.Printf(7, "> %#v", cmd)
87 _, err := c.conn.Write([]byte(cmd))
88 return err
89 }
90
91 func (c *Conn) data(data string) error {
92 data = url.PathEscape(data)
93 for len(data) > 0 {
94 n := 512
95 if n > len(data) {
96 n = len(data)
97 }
98 chunk := data[:n]
99 data = data[n:]
100 if err := c.write(fmt.Sprintf("D %s\n", chunk)); err != nil {
101 return err
102 }
103 }
104 return c.write("END\n")
105 }
106
107 func (c *Conn) readLine() (string, string, error) {
108 line, err := c.r.ReadString('\n')
109 if err != nil {
110 return "", "", err
111 }
112 line = line[:len(line)-1]
113 dlog.Printf(7, "< %#v", line)
114 parts := strings.SplitN(line, " ", 2)
115 status := parts[0]
116 if len(parts) > 1 {
117 return status, parts[1], nil
118 }
119 return status, "", nil
120 }
121
122 func (c *Conn) read(inquire InquireFunc) (res Response, err error) {
123 var quotedBlob string
124 var saved error
125 readloop:
126 for {
127 status, msg, err := c.readLine()
128 if err != nil {
129 return res, err
130 }
131 switch status {
132 case StatusData:
133 quotedBlob += msg
134 case StatusLines:
135 msg, err := url.PathUnescape(msg)
136 if err != nil {
137 return res, err
138 }
139 res.Lines = append(res.Lines, msg)
140 case StatusInquire:
141 if inquire != nil {
142 d, err := inquire(msg, res.Lines)
143 if err != nil {
144 c.write("CANCEL\n")
145 if err != InquireCancel {
146
147 saved = err
148 }
149 } else {
150 if err := c.data(d); err != nil {
151 return res, err
152 }
153 }
154 } else {
155 c.write("CANCEL\n")
156 }
157 case StatusComment:
158
159 default:
160 res.Status = status
161 res.StatusMessage = msg
162 break readloop
163 }
164 }
165 if len(quotedBlob) > 0 {
166 blob, err := url.PathUnescape(quotedBlob)
167 if err != nil {
168 return res, err
169 }
170 res.Blob = []byte(blob)
171 }
172 err = saved
173 return
174 }
175
176
177
178
179
180
181
182
183 func (c *Conn) Transact(command string, inquire InquireFunc) (res Response, err error) {
184 c.mu.Lock()
185 defer c.mu.Unlock()
186 if c.conn == nil {
187 return res, errors.New("connection is closed")
188 }
189 if err := c.write(command + "\n"); err != nil {
190 return res, err
191 }
192 res, err = c.read(inquire)
193 if err == nil && res.Status != StatusOk {
194 err = res
195 }
196 return
197 }
198
199 func (c *Conn) Close() error {
200 c.mu.Lock()
201 defer c.mu.Unlock()
202 if c.conn != nil {
203 c.conn.Close()
204 c.conn = nil
205 c.r = nil
206 }
207 return nil
208 }
209
View as plain text