...

Source file src/github.com/go-kivik/kivik/v4/x/server/db.go

Documentation: github.com/go-kivik/kivik/v4/x/server

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  //go:build !js
    14  
    15  package server
    16  
    17  import (
    18  	"context"
    19  	"encoding/json"
    20  	"fmt"
    21  	"net/http"
    22  	"strconv"
    23  	"strings"
    24  	"time"
    25  
    26  	"github.com/go-chi/chi/v5"
    27  	"gitlab.com/flimzy/httpe"
    28  
    29  	"github.com/go-kivik/kivik/v4"
    30  	"github.com/go-kivik/kivik/v4/driver"
    31  	internal "github.com/go-kivik/kivik/v4/int/errors"
    32  )
    33  
    34  func (s *Server) db() httpe.HandlerWithError {
    35  	return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
    36  		db := chi.URLParam(r, "db")
    37  		stats, err := s.client.DB(db).Stats(r.Context())
    38  		if err != nil {
    39  			return err
    40  		}
    41  		return serveJSON(w, http.StatusOK, stats)
    42  	})
    43  }
    44  
    45  func (s *Server) dbExists() httpe.HandlerWithError {
    46  	return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
    47  		db := chi.URLParam(r, "db")
    48  		exists, err := s.client.DBExists(r.Context(), db, options(r))
    49  		if err != nil {
    50  			return err
    51  		}
    52  		if !exists {
    53  			w.WriteHeader(http.StatusNotFound)
    54  			return nil
    55  		}
    56  		w.WriteHeader(http.StatusOK)
    57  		return nil
    58  	})
    59  }
    60  
    61  func (s *Server) createDB() httpe.HandlerWithError {
    62  	return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
    63  		db := chi.URLParam(r, "db")
    64  		if err := s.client.CreateDB(r.Context(), db, options(r)); err != nil {
    65  			return err
    66  		}
    67  		return serveJSON(w, http.StatusCreated, map[string]interface{}{
    68  			"ok": true,
    69  		})
    70  	})
    71  }
    72  
    73  func (s *Server) deleteDB() httpe.HandlerWithError {
    74  	return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
    75  		db := chi.URLParam(r, "db")
    76  		if err := s.client.DestroyDB(r.Context(), db, options(r)); err != nil {
    77  			return err
    78  		}
    79  		return serveJSON(w, http.StatusOK, map[string]interface{}{
    80  			"ok": true,
    81  		})
    82  	})
    83  }
    84  
    85  const (
    86  	defaultHeartbeat = heartbeat(60 * time.Second)
    87  	defaultTimeout   = 60000 // milliseconds
    88  
    89  	feedTypeNormal     = "normal"
    90  	feedTypeLongpoll   = "longpoll"
    91  	feedTypeContinuous = "continuous"
    92  )
    93  
    94  type heartbeat time.Duration
    95  
    96  func (h *heartbeat) UnmarshalText(text []byte) error {
    97  	var value heartbeat
    98  	if string(text) == "true" {
    99  		value = defaultHeartbeat
   100  	} else {
   101  		ms, err := strconv.Atoi(string(text))
   102  		if err != nil {
   103  			return err
   104  		}
   105  		value = heartbeat(ms) * heartbeat(time.Millisecond)
   106  	}
   107  	*h = value
   108  	return nil
   109  }
   110  
   111  func (s *Server) dbUpdates() httpe.HandlerWithError {
   112  	return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
   113  		switch feed := r.URL.Query().Get("feed"); feed {
   114  		case "", feedTypeNormal:
   115  			return s.serveNormalDBUpdates(w, r)
   116  		case feedTypeContinuous, feedTypeLongpoll:
   117  			return s.serveContinuousDBUpdates(w, r)
   118  		default:
   119  			return &internal.Error{Status: http.StatusBadRequest, Message: fmt.Sprintf("kivik: feed type %q not supported", feed)}
   120  		}
   121  	})
   122  }
   123  
   124  func (s *Server) serveNormalDBUpdates(w http.ResponseWriter, r *http.Request) error {
   125  	updates := s.client.DBUpdates(r.Context(), options(r))
   126  	if err := updates.Err(); err != nil {
   127  		return err
   128  	}
   129  
   130  	defer updates.Close()
   131  
   132  	w.Header().Set("Content-Type", "application/json; charset=utf-8")
   133  	w.WriteHeader(http.StatusOK)
   134  
   135  	if _, err := w.Write([]byte(`{"results":[`)); err != nil {
   136  		return err
   137  	}
   138  
   139  	enc := json.NewEncoder(w)
   140  
   141  	var update driver.DBUpdate
   142  	for updates.Next() {
   143  		if update.DBName != "" { // Easy way to tell if this is the first result
   144  			if _, err := w.Write([]byte(",")); err != nil {
   145  				return err
   146  			}
   147  		}
   148  		update.DBName = updates.DBName()
   149  		update.Type = updates.Type()
   150  		update.Seq = updates.Seq()
   151  		if err := enc.Encode(&update); err != nil {
   152  			return err
   153  		}
   154  	}
   155  	if err := updates.Err(); err != nil {
   156  		return err
   157  	}
   158  
   159  	lastSeq, err := updates.LastSeq()
   160  	if err != nil {
   161  		return err
   162  	}
   163  
   164  	if _, err := w.Write([]byte(`],"last_seq":"` + lastSeq + "\"}")); err != nil {
   165  		return err
   166  	}
   167  
   168  	return nil
   169  }
   170  
   171  func (s *Server) serveContinuousDBUpdates(w http.ResponseWriter, r *http.Request) error {
   172  	req := struct {
   173  		Heartbeat heartbeat `form:"heartbeat"`
   174  	}{
   175  		Heartbeat: defaultHeartbeat,
   176  	}
   177  	if err := s.bind(r, &req); err != nil {
   178  		return err
   179  	}
   180  
   181  	ticker := time.NewTicker(time.Duration(req.Heartbeat))
   182  	updates := s.client.DBUpdates(r.Context(), options(r))
   183  	if err := updates.Err(); err != nil {
   184  		return err
   185  	}
   186  
   187  	defer updates.Close()
   188  
   189  	w.Header().Set("Content-Type", "application/json; charset=utf-8")
   190  	w.WriteHeader(http.StatusOK)
   191  
   192  	nextUpdate := make(chan *driver.DBUpdate)
   193  	go func() {
   194  		for updates.Next() {
   195  			nextUpdate <- &driver.DBUpdate{
   196  				DBName: updates.DBName(),
   197  				Type:   updates.Type(),
   198  				Seq:    updates.Seq(),
   199  			}
   200  		}
   201  		close(nextUpdate)
   202  	}()
   203  
   204  	enc := json.NewEncoder(w)
   205  
   206  loop:
   207  	for {
   208  		select {
   209  		case <-ticker.C:
   210  			if _, err := w.Write([]byte("\n")); err != nil {
   211  				return err
   212  			}
   213  		case update, ok := <-nextUpdate:
   214  			if !ok {
   215  				break loop
   216  			}
   217  			ticker.Reset(time.Duration(req.Heartbeat))
   218  			if err := enc.Encode(update); err != nil {
   219  				return err
   220  			}
   221  			if _, err := w.Write([]byte("\n")); err != nil {
   222  				return err
   223  			}
   224  		}
   225  	}
   226  
   227  	return updates.Err()
   228  }
   229  
   230  // whichView returns `_all_docs`, `_local_docs`, of `_design_docs`, and whether
   231  // the path ends with /queries.
   232  func whichView(r *http.Request) (ddoc, view string, isQueries bool) {
   233  	parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
   234  	var isQuery bool
   235  	if parts[len(parts)-1] == "queries" {
   236  		isQuery = true
   237  		parts = parts[:len(parts)-1]
   238  	}
   239  	if parts[1] == "_design" {
   240  		return parts[2], parts[4], isQuery
   241  	}
   242  	return "", parts[len(parts)-1], isQuery
   243  }
   244  
   245  func (s *Server) query() httpe.HandlerWithError {
   246  	return httpe.HandlerWithErrorFunc(func(w http.ResponseWriter, r *http.Request) error {
   247  		ddoc, view, isQueries := whichView(r)
   248  		req := map[string]interface{}{}
   249  		if isQueries {
   250  			var jsonReq struct {
   251  				Queries []map[string]interface{} `json:"queries"`
   252  			}
   253  			if err := s.bind(r, &jsonReq); err != nil {
   254  				return err
   255  			}
   256  			req["queries"] = jsonReq.Queries
   257  		} else {
   258  			if err := s.bind(r, &req); err != nil {
   259  				return err
   260  			}
   261  		}
   262  		db := chi.URLParam(r, "db")
   263  		var viewFunc func(context.Context, ...kivik.Option) *kivik.ResultSet
   264  		if ddoc == "" {
   265  			switch view {
   266  			case "_all_docs":
   267  				viewFunc = s.client.DB(db).AllDocs
   268  			case "_local_docs":
   269  				viewFunc = s.client.DB(db).LocalDocs
   270  			case "_design_docs":
   271  				viewFunc = s.client.DB(db).DesignDocs
   272  			default:
   273  				return &internal.Error{Status: http.StatusNotFound, Message: fmt.Sprintf("kivik: view %q not found", view)}
   274  			}
   275  		} else {
   276  			viewFunc = func(ctx context.Context, opts ...kivik.Option) *kivik.ResultSet {
   277  				return s.client.DB(db).Query(ctx, ddoc, view, opts...)
   278  			}
   279  		}
   280  		rows := viewFunc(r.Context(), options(r))
   281  		defer rows.Close()
   282  
   283  		if err := rows.Err(); err != nil {
   284  			return err
   285  		}
   286  
   287  		if _, err := fmt.Fprint(w, `{"rows":[`); err != nil {
   288  			return err
   289  		}
   290  
   291  		var row struct {
   292  			ID    string          `json:"id"`
   293  			Key   json.RawMessage `json:"key"`
   294  			Value json.RawMessage `json:"value"`
   295  		}
   296  		var err error
   297  		enc := json.NewEncoder(w)
   298  		for rows.Next() {
   299  			if row.ID != "" { // Easy way to tell if this is the first row
   300  				if _, err = w.Write([]byte(",")); err != nil {
   301  					return err
   302  				}
   303  			}
   304  			row.ID, err = rows.ID()
   305  			if err != nil {
   306  				return err
   307  			}
   308  			if err := rows.ScanKey(&row.Key); err != nil {
   309  				return err
   310  			}
   311  			if err := rows.ScanValue(&row.Value); err != nil {
   312  				return err
   313  			}
   314  			if err := enc.Encode(&row); err != nil {
   315  				return err
   316  			}
   317  		}
   318  		meta, err := rows.Metadata()
   319  		if err != nil {
   320  			return err
   321  		}
   322  		_, err = fmt.Fprintf(w, `],"offset":%d,"total_rows":%d}`, meta.Offset, meta.TotalRows)
   323  		return err
   324  	})
   325  }
   326  

View as plain text