1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package server
16
17 import (
18 "context"
19 "encoding/json"
20 "fmt"
21 "net/http"
22 "strconv"
23 "strings"
24 "time"
25
26 "github.com/go-chi/chi/v5"
27 "gitlab.com/flimzy/httpe"
28
29 "github.com/go-kivik/kivik/v4"
30 "github.com/go-kivik/kivik/v4/driver"
31 internal "github.com/go-kivik/kivik/v4/int/errors"
32 )
33
34 func (s *Server) db() httpe.HandlerWithError {
35 return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
36 db := chi.URLParam(r, "db")
37 stats, err := s.client.DB(db).Stats(r.Context())
38 if err != nil {
39 return err
40 }
41 return serveJSON(w, http.StatusOK, stats)
42 })
43 }
44
45 func (s *Server) dbExists() httpe.HandlerWithError {
46 return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
47 db := chi.URLParam(r, "db")
48 exists, err := s.client.DBExists(r.Context(), db, options(r))
49 if err != nil {
50 return err
51 }
52 if !exists {
53 w.WriteHeader(http.StatusNotFound)
54 return nil
55 }
56 w.WriteHeader(http.StatusOK)
57 return nil
58 })
59 }
60
61 func (s *Server) createDB() httpe.HandlerWithError {
62 return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
63 db := chi.URLParam(r, "db")
64 if err := s.client.CreateDB(r.Context(), db, options(r)); err != nil {
65 return err
66 }
67 return serveJSON(w, http.StatusCreated, map[string]interface{}{
68 "ok": true,
69 })
70 })
71 }
72
73 func (s *Server) deleteDB() httpe.HandlerWithError {
74 return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
75 db := chi.URLParam(r, "db")
76 if err := s.client.DestroyDB(r.Context(), db, options(r)); err != nil {
77 return err
78 }
79 return serveJSON(w, http.StatusOK, map[string]interface{}{
80 "ok": true,
81 })
82 })
83 }
84
85 const (
86 defaultHeartbeat = heartbeat(60 * time.Second)
87 defaultTimeout = 60000
88
89 feedTypeNormal = "normal"
90 feedTypeLongpoll = "longpoll"
91 feedTypeContinuous = "continuous"
92 )
93
94 type heartbeat time.Duration
95
96 func (h *heartbeat) UnmarshalText(text []byte) error {
97 var value heartbeat
98 if string(text) == "true" {
99 value = defaultHeartbeat
100 } else {
101 ms, err := strconv.Atoi(string(text))
102 if err != nil {
103 return err
104 }
105 value = heartbeat(ms) * heartbeat(time.Millisecond)
106 }
107 *h = value
108 return nil
109 }
110
111 func (s *Server) dbUpdates() httpe.HandlerWithError {
112 return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
113 switch feed := r.URL.Query().Get("feed"); feed {
114 case "", feedTypeNormal:
115 return s.serveNormalDBUpdates(w, r)
116 case feedTypeContinuous, feedTypeLongpoll:
117 return s.serveContinuousDBUpdates(w, r)
118 default:
119 return &internal.Error{Status: http.StatusBadRequest, Message: fmt.Sprintf("kivik: feed type %q not supported", feed)}
120 }
121 })
122 }
123
124 func (s *Server) serveNormalDBUpdates(w http.ResponseWriter, r *http.Request) error {
125 updates := s.client.DBUpdates(r.Context(), options(r))
126 if err := updates.Err(); err != nil {
127 return err
128 }
129
130 defer updates.Close()
131
132 w.Header().Set("Content-Type", "application/json; charset=utf-8")
133 w.WriteHeader(http.StatusOK)
134
135 if _, err := w.Write([]byte(`{"results":[`)); err != nil {
136 return err
137 }
138
139 enc := json.NewEncoder(w)
140
141 var update driver.DBUpdate
142 for updates.Next() {
143 if update.DBName != "" {
144 if _, err := w.Write([]byte(",")); err != nil {
145 return err
146 }
147 }
148 update.DBName = updates.DBName()
149 update.Type = updates.Type()
150 update.Seq = updates.Seq()
151 if err := enc.Encode(&update); err != nil {
152 return err
153 }
154 }
155 if err := updates.Err(); err != nil {
156 return err
157 }
158
159 lastSeq, err := updates.LastSeq()
160 if err != nil {
161 return err
162 }
163
164 if _, err := w.Write([]byte(`],"last_seq":"` + lastSeq + "\"}")); err != nil {
165 return err
166 }
167
168 return nil
169 }
170
171 func (s *Server) serveContinuousDBUpdates(w http.ResponseWriter, r *http.Request) error {
172 req := struct {
173 Heartbeat heartbeat `form:"heartbeat"`
174 }{
175 Heartbeat: defaultHeartbeat,
176 }
177 if err := s.bind(r, &req); err != nil {
178 return err
179 }
180
181 ticker := time.NewTicker(time.Duration(req.Heartbeat))
182 updates := s.client.DBUpdates(r.Context(), options(r))
183 if err := updates.Err(); err != nil {
184 return err
185 }
186
187 defer updates.Close()
188
189 w.Header().Set("Content-Type", "application/json; charset=utf-8")
190 w.WriteHeader(http.StatusOK)
191
192 nextUpdate := make(chan *driver.DBUpdate)
193 go func() {
194 for updates.Next() {
195 nextUpdate <- &driver.DBUpdate{
196 DBName: updates.DBName(),
197 Type: updates.Type(),
198 Seq: updates.Seq(),
199 }
200 }
201 close(nextUpdate)
202 }()
203
204 enc := json.NewEncoder(w)
205
206 loop:
207 for {
208 select {
209 case <-ticker.C:
210 if _, err := w.Write([]byte("\n")); err != nil {
211 return err
212 }
213 case update, ok := <-nextUpdate:
214 if !ok {
215 break loop
216 }
217 ticker.Reset(time.Duration(req.Heartbeat))
218 if err := enc.Encode(update); err != nil {
219 return err
220 }
221 if _, err := w.Write([]byte("\n")); err != nil {
222 return err
223 }
224 }
225 }
226
227 return updates.Err()
228 }
229
230
231
232 func whichView(r *http.Request) (ddoc, view string, isQueries bool) {
233 parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
234 var isQuery bool
235 if parts[len(parts)-1] == "queries" {
236 isQuery = true
237 parts = parts[:len(parts)-1]
238 }
239 if parts[1] == "_design" {
240 return parts[2], parts[4], isQuery
241 }
242 return "", parts[len(parts)-1], isQuery
243 }
244
245 func (s *Server) query() httpe.HandlerWithError {
246 return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
247 ddoc, view, isQueries := whichView(r)
248 req := map[string]interface{}{}
249 if isQueries {
250 var jsonReq struct {
251 Queries []map[string]interface{} `json:"queries"`
252 }
253 if err := s.bind(r, &jsonReq); err != nil {
254 return err
255 }
256 req["queries"] = jsonReq.Queries
257 } else {
258 if err := s.bind(r, &req); err != nil {
259 return err
260 }
261 }
262 db := chi.URLParam(r, "db")
263 var viewFunc func(context.Context, ...kivik.Option) *kivik.ResultSet
264 if ddoc == "" {
265 switch view {
266 case "_all_docs":
267 viewFunc = s.client.DB(db).AllDocs
268 case "_local_docs":
269 viewFunc = s.client.DB(db).LocalDocs
270 case "_design_docs":
271 viewFunc = s.client.DB(db).DesignDocs
272 default:
273 return &internal.Error{Status: http.StatusNotFound, Message: fmt.Sprintf("kivik: view %q not found", view)}
274 }
275 } else {
276 viewFunc = func(ctx context.Context, opts ...kivik.Option) *kivik.ResultSet {
277 return s.client.DB(db).Query(ctx, ddoc, view, opts...)
278 }
279 }
280 rows := viewFunc(r.Context(), options(r))
281 defer rows.Close()
282
283 if err := rows.Err(); err != nil {
284 return err
285 }
286
287 if _, err := fmt.Fprint(w, `{"rows":[`); err != nil {
288 return err
289 }
290
291 var row struct {
292 ID string `json:"id"`
293 Key json.RawMessage `json:"key"`
294 Value json.RawMessage `json:"value"`
295 }
296 var err error
297 enc := json.NewEncoder(w)
298 for rows.Next() {
299 if row.ID != "" {
300 if _, err = w.Write([]byte(",")); err != nil {
301 return err
302 }
303 }
304 row.ID, err = rows.ID()
305 if err != nil {
306 return err
307 }
308 if err := rows.ScanKey(&row.Key); err != nil {
309 return err
310 }
311 if err := rows.ScanValue(&row.Value); err != nil {
312 return err
313 }
314 if err := enc.Encode(&row); err != nil {
315 return err
316 }
317 }
318 meta, err := rows.Metadata()
319 if err != nil {
320 return err
321 }
322 _, err = fmt.Fprintf(w, `],"offset":%d,"total_rows":%d}`, meta.Offset, meta.TotalRows)
323 return err
324 })
325 }
326
View as plain text