1
2
3
4
5
6
7
8
9
10
11
12
13 package kivik
14
15 import (
16 "context"
17 "encoding/json"
18 "errors"
19 "io"
20 "net/http"
21 "strings"
22 "sync"
23
24 "github.com/google/uuid"
25
26 "github.com/go-kivik/kivik/v4/driver"
27 internal "github.com/go-kivik/kivik/v4/int/errors"
28 )
29
30
31 type DB struct {
32 client *Client
33 name string
34 driverDB driver.DB
35 err error
36
37 closed bool
38 mu sync.Mutex
39 wg sync.WaitGroup
40 }
41
42 func (db *DB) startQuery() (end func(), _ error) {
43 db.mu.Lock()
44 defer db.mu.Unlock()
45 if db.closed {
46 return nil, ErrDatabaseClosed
47 }
48 endQuery, err := db.client.startQuery()
49 if err != nil {
50 return nil, err
51 }
52 var once sync.Once
53 db.wg.Add(1)
54 return func() {
55 once.Do(func() {
56 db.mu.Lock()
57 db.wg.Done()
58 endQuery()
59 db.mu.Unlock()
60 })
61 }, nil
62 }
63
64
65 func (db *DB) Client() *Client {
66 return db.client
67 }
68
69
70 func (db *DB) Name() string {
71 return db.name
72 }
73
74
75
76
77
78 func (db *DB) Err() error {
79 return db.err
80 }
81
82
83 func (db *DB) AllDocs(ctx context.Context, options ...Option) *ResultSet {
84 if db.err != nil {
85 return &ResultSet{iter: errIterator(db.err)}
86 }
87 endQuery, err := db.startQuery()
88 if err != nil {
89 return &ResultSet{iter: errIterator(err)}
90 }
91 rowsi, err := db.driverDB.AllDocs(ctx, multiOptions(options))
92 if err != nil {
93 endQuery()
94 return &ResultSet{iter: errIterator(err)}
95 }
96 return newResultSet(ctx, endQuery, rowsi)
97 }
98
99
100 func (db *DB) DesignDocs(ctx context.Context, options ...Option) *ResultSet {
101 if db.err != nil {
102 return &ResultSet{iter: errIterator(db.err)}
103 }
104 ddocer, ok := db.driverDB.(driver.DesignDocer)
105 if !ok {
106 return &ResultSet{iter: errIterator(&internal.Error{Status: http.StatusNotImplemented, Err: errors.New("kivik: design doc view not supported by driver")})}
107 }
108
109 endQuery, err := db.startQuery()
110 if err != nil {
111 return &ResultSet{iter: errIterator(err)}
112 }
113 rowsi, err := ddocer.DesignDocs(ctx, multiOptions(options))
114 if err != nil {
115 endQuery()
116 return &ResultSet{iter: errIterator(err)}
117 }
118 return newResultSet(ctx, endQuery, rowsi)
119 }
120
121
122 func (db *DB) LocalDocs(ctx context.Context, options ...Option) *ResultSet {
123 if db.err != nil {
124 return &ResultSet{iter: errIterator(db.err)}
125 }
126 ldocer, ok := db.driverDB.(driver.LocalDocer)
127 if !ok {
128 return &ResultSet{iter: errIterator(&internal.Error{Status: http.StatusNotImplemented, Err: errors.New("kivik: local doc view not supported by driver")})}
129 }
130 endQuery, err := db.startQuery()
131 if err != nil {
132 return &ResultSet{iter: errIterator(err)}
133 }
134 rowsi, err := ldocer.LocalDocs(ctx, multiOptions(options))
135 if err != nil {
136 endQuery()
137 return &ResultSet{iter: errIterator(err)}
138 }
139 return newResultSet(ctx, endQuery, rowsi)
140 }
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156 func (db *DB) Query(ctx context.Context, ddoc, view string, options ...Option) *ResultSet {
157 if db.err != nil {
158 return &ResultSet{iter: errIterator(db.err)}
159 }
160 endQuery, err := db.startQuery()
161 if err != nil {
162 return &ResultSet{iter: errIterator(err)}
163 }
164 ddoc = strings.TrimPrefix(ddoc, "_design/")
165 view = strings.TrimPrefix(view, "_view/")
166 rowsi, err := db.driverDB.Query(ctx, ddoc, view, multiOptions(options))
167 if err != nil {
168 endQuery()
169 return &ResultSet{iter: errIterator(err)}
170 }
171 return newResultSet(ctx, endQuery, rowsi)
172 }
173
174
175 type Document struct {
176 err error
177 rev string
178 body io.Reader
179 attachments driver.Attachments
180
181 mu sync.Mutex
182 }
183
184
185 func (r *Document) Err() error {
186 return r.err
187 }
188
189
190 func (r *Document) Rev() (string, error) {
191 return r.rev, r.err
192 }
193
194
195 func (r *Document) ScanDoc(i interface{}) error {
196 if r.err != nil {
197 return r.err
198 }
199 return json.NewDecoder(r.body).Decode(i)
200 }
201
202
203
204 func (r *Document) Attachments() (*AttachmentsIterator, error) {
205 if r.err != nil {
206 return nil, r.err
207 }
208 if r.attachments == nil {
209 return nil, errNoAttachments
210 }
211 r.mu.Lock()
212 return &AttachmentsIterator{
213 atti: r.attachments,
214 onClose: r.mu.Unlock,
215 }, nil
216 }
217
218
219 func (r *Document) Close() error {
220 r.mu.Lock()
221 defer r.mu.Unlock()
222 if atts := r.attachments; atts != nil {
223 _ = atts.Close()
224 }
225 if c, ok := r.body.(io.Closer); ok {
226 return c.Close()
227 }
228 return nil
229 }
230
231
232
233 func (db *DB) Get(ctx context.Context, docID string, options ...Option) *Document {
234 if db.err != nil {
235 return &Document{err: db.err}
236 }
237 endQuery, err := db.startQuery()
238 if err != nil {
239 return &Document{err: err}
240 }
241 defer endQuery()
242 result, err := db.driverDB.Get(ctx, docID, multiOptions(options))
243 if err != nil {
244 return &Document{err: err}
245 }
246 return &Document{
247 rev: result.Rev,
248 body: result.Body,
249 attachments: result.Attachments,
250 }
251 }
252
253
254
255
256
257 func (db *DB) OpenRevs(ctx context.Context, docID string, revs []string, options ...Option) *ResultSet {
258 if db.err != nil {
259 return &ResultSet{iter: errIterator(db.err)}
260 }
261 if openRever, ok := db.driverDB.(driver.OpenRever); ok {
262 endQuery, err := db.startQuery()
263 if err != nil {
264 return &ResultSet{iter: errIterator(err)}
265 }
266 rowsi, err := openRever.OpenRevs(ctx, docID, revs, multiOptions(options))
267 if err != nil {
268 endQuery()
269 return &ResultSet{iter: errIterator(err)}
270 }
271 return newResultSet(ctx, endQuery, rowsi)
272 }
273 return &ResultSet{iter: errIterator(errOpenRevsNotImplemented)}
274 }
275
276
277
278 func (db *DB) GetRev(ctx context.Context, docID string, options ...Option) (rev string, err error) {
279 if db.err != nil {
280 return "", db.err
281 }
282 opts := multiOptions(options)
283 if r, ok := db.driverDB.(driver.RevGetter); ok {
284 endQuery, err := db.startQuery()
285 if err != nil {
286 return "", err
287 }
288 defer endQuery()
289 return r.GetRev(ctx, docID, opts)
290 }
291 row := db.Get(ctx, docID, opts)
292 var doc struct {
293 Rev string `json:"_rev"`
294 }
295
296
297 err = row.ScanDoc(&doc)
298 return doc.Rev, err
299 }
300
301
302
303 func (db *DB) CreateDoc(ctx context.Context, doc interface{}, options ...Option) (docID, rev string, err error) {
304 if db.err != nil {
305 return "", "", db.err
306 }
307 if docCreator, ok := db.driverDB.(driver.DocCreator); ok {
308 endQuery, err := db.startQuery()
309 if err != nil {
310 return "", "", err
311 }
312 defer endQuery()
313 return docCreator.CreateDoc(ctx, doc, multiOptions(options))
314 }
315 docID, ok := extractDocID(doc)
316 if !ok {
317
318 docID = uuid.NewString()
319 }
320 rev, err = db.Put(ctx, docID, doc, options...)
321 return docID, rev, err
322 }
323
324
325
326 func normalizeFromJSON(i interface{}) (interface{}, error) {
327 switch t := i.(type) {
328 case json.Marshaler:
329 return t, nil
330 case io.Reader:
331 body, err := io.ReadAll(t)
332 if err != nil {
333 return nil, &internal.Error{Status: http.StatusBadRequest, Err: err}
334 }
335 return json.RawMessage(body), nil
336 default:
337 return i, nil
338 }
339 }
340
341 func extractDocID(i interface{}) (string, bool) {
342 if i == nil {
343 return "", false
344 }
345 var id string
346 var ok bool
347 switch t := i.(type) {
348 case map[string]interface{}:
349 id, ok = t["_id"].(string)
350 case map[string]string:
351 id, ok = t["_id"]
352 default:
353 data, err := json.Marshal(i)
354 if err != nil {
355 return "", false
356 }
357 var result struct {
358 ID string `json:"_id"`
359 }
360 if err := json.Unmarshal(data, &result); err != nil {
361 return "", false
362 }
363 id = result.ID
364 ok = result.ID != ""
365 }
366 if !ok {
367 return "", false
368 }
369 return id, true
370 }
371
372
373
374
375
376
377
378
379
380
381
382
383 func (db *DB) Put(ctx context.Context, docID string, doc interface{}, options ...Option) (rev string, err error) {
384 if db.err != nil {
385 return "", db.err
386 }
387 if docID == "" {
388 return "", missingArg("docID")
389 }
390 endQuery, err := db.startQuery()
391 if err != nil {
392 return "", err
393 }
394 defer endQuery()
395 i, err := normalizeFromJSON(doc)
396 if err != nil {
397 return "", err
398 }
399 return db.driverDB.Put(ctx, docID, i, multiOptions(options))
400 }
401
402
403
404 func (db *DB) Delete(ctx context.Context, docID, rev string, options ...Option) (newRev string, err error) {
405 if db.err != nil {
406 return "", db.err
407 }
408 endQuery, err := db.startQuery()
409 if err != nil {
410 return "", err
411 }
412 defer endQuery()
413 if docID == "" {
414 return "", missingArg("docID")
415 }
416 opts := append(multiOptions{Rev(rev)}, options...)
417 return db.driverDB.Delete(ctx, docID, opts)
418 }
419
420
421
422
423
424
425 func (db *DB) Flush(ctx context.Context) error {
426 if db.err != nil {
427 return db.err
428 }
429 endQuery, err := db.startQuery()
430 if err != nil {
431 return err
432 }
433 defer endQuery()
434 if flusher, ok := db.driverDB.(driver.Flusher); ok {
435 return flusher.Flush(ctx)
436 }
437 return &internal.Error{Status: http.StatusNotImplemented, Err: errors.New("kivik: flush not supported by driver")}
438 }
439
440
441 type DBStats struct {
442
443 Name string `json:"db_name"`
444
445 CompactRunning bool `json:"compact_running"`
446
447 DocCount int64 `json:"doc_count"`
448
449
450 DeletedCount int64 `json:"doc_del_count"`
451
452 UpdateSeq string `json:"update_seq"`
453
454 DiskSize int64 `json:"disk_size"`
455
456
457
458 ActiveSize int64 `json:"data_size"`
459
460
461 ExternalSize int64 `json:"-"`
462
463 Cluster *ClusterConfig `json:"cluster,omitempty"`
464
465
466
467
468
469
470 RawResponse json.RawMessage `json:"-"`
471 }
472
473
474 type ClusterConfig struct {
475 Replicas int `json:"n"`
476 Shards int `json:"q"`
477 ReadQuorum int `json:"r"`
478 WriteQuorum int `json:"w"`
479 }
480
481
482
483
484 func (db *DB) Stats(ctx context.Context) (*DBStats, error) {
485 if db.err != nil {
486 return nil, db.err
487 }
488 endQuery, err := db.startQuery()
489 if err != nil {
490 return nil, err
491 }
492 defer endQuery()
493 i, err := db.driverDB.Stats(ctx)
494 if err != nil {
495 return nil, err
496 }
497 return driverStats2kivikStats(i), nil
498 }
499
500 func driverStats2kivikStats(i *driver.DBStats) *DBStats {
501 var cluster *ClusterConfig
502 if i.Cluster != nil {
503 c := ClusterConfig(*i.Cluster)
504 cluster = &c
505 }
506 return &DBStats{
507 Name: i.Name,
508 CompactRunning: i.CompactRunning,
509 DocCount: i.DocCount,
510 DeletedCount: i.DeletedCount,
511 UpdateSeq: i.UpdateSeq,
512 DiskSize: i.DiskSize,
513 ActiveSize: i.ActiveSize,
514 ExternalSize: i.ExternalSize,
515 Cluster: cluster,
516 RawResponse: i.RawResponse,
517 }
518 }
519
520
521
522
523
524
525
526
527
528
529
530
531 func (db *DB) Compact(ctx context.Context) error {
532 if db.err != nil {
533 return db.err
534 }
535 endQuery, err := db.startQuery()
536 if err != nil {
537 return err
538 }
539 defer endQuery()
540 return db.driverDB.Compact(ctx)
541 }
542
543
544
545
546
547
548
549
550
551
552
553
554 func (db *DB) CompactView(ctx context.Context, ddocID string) error {
555 if db.err != nil {
556 return db.err
557 }
558 endQuery, err := db.startQuery()
559 if err != nil {
560 return err
561 }
562 defer endQuery()
563 return db.driverDB.CompactView(ctx, ddocID)
564 }
565
566
567
568
569
570
571
572 func (db *DB) ViewCleanup(ctx context.Context) error {
573 if db.err != nil {
574 return db.err
575 }
576 endQuery, err := db.startQuery()
577 if err != nil {
578 return err
579 }
580 defer endQuery()
581 return db.driverDB.ViewCleanup(ctx)
582 }
583
584
585
586
587
588
589 func (db *DB) Security(ctx context.Context) (*Security, error) {
590 if db.err != nil {
591 return nil, db.err
592 }
593 secDB, ok := db.driverDB.(driver.SecurityDB)
594 if !ok {
595 return nil, errSecurityNotImplemented
596 }
597 endQuery, err := db.startQuery()
598 if err != nil {
599 return nil, err
600 }
601 defer endQuery()
602 s, err := secDB.Security(ctx)
603 if err != nil {
604 return nil, err
605 }
606 return &Security{
607 Admins: Members(s.Admins),
608 Members: Members(s.Members),
609 Cloudant: s.Cloudant,
610 CouchdbAuthOnly: s.CouchdbAuthOnly,
611 }, err
612 }
613
614
615
616
617
618
619 func (db *DB) SetSecurity(ctx context.Context, security *Security) error {
620 if db.err != nil {
621 return db.err
622 }
623 secDB, ok := db.driverDB.(driver.SecurityDB)
624 if !ok {
625 return errSecurityNotImplemented
626 }
627 if security == nil {
628 return missingArg("security")
629 }
630 endQuery, err := db.startQuery()
631 if err != nil {
632 return err
633 }
634 defer endQuery()
635 sec := &driver.Security{
636 Admins: driver.Members(security.Admins),
637 Members: driver.Members(security.Members),
638 Cloudant: security.Cloudant,
639 CouchdbAuthOnly: security.CouchdbAuthOnly,
640 }
641 return secDB.SetSecurity(ctx, sec)
642 }
643
644
645
646
647
648
649
650
651
652 func (db *DB) Copy(ctx context.Context, targetID, sourceID string, options ...Option) (targetRev string, err error) {
653 if db.err != nil {
654 return "", db.err
655 }
656 if targetID == "" {
657 return "", missingArg("targetID")
658 }
659 if sourceID == "" {
660 return "", missingArg("sourceID")
661 }
662 opts := multiOptions(options)
663 if copier, ok := db.driverDB.(driver.Copier); ok {
664 endQuery, err := db.startQuery()
665 if err != nil {
666 return "", err
667 }
668 defer endQuery()
669 return copier.Copy(ctx, targetID, sourceID, opts)
670 }
671 var doc map[string]interface{}
672 if err = db.Get(ctx, sourceID, options...).ScanDoc(&doc); err != nil {
673 return "", err
674 }
675 delete(doc, "_rev")
676 doc["_id"] = targetID
677 opts2 := map[string]interface{}{}
678 opts.Apply(opts2)
679 delete(opts2, "rev")
680 return db.Put(ctx, targetID, doc, Params(opts2))
681 }
682
683
684
685 func (db *DB) PutAttachment(ctx context.Context, docID string, att *Attachment, options ...Option) (newRev string, err error) {
686 if db.err != nil {
687 return "", db.err
688 }
689 if docID == "" {
690 return "", missingArg("docID")
691 }
692 if e := att.validate(); e != nil {
693 return "", e
694 }
695 endQuery, err := db.startQuery()
696 if err != nil {
697 return "", err
698 }
699 defer endQuery()
700 a := driver.Attachment(*att)
701 return db.driverDB.PutAttachment(ctx, docID, &a, multiOptions(options))
702 }
703
704
705 func (db *DB) GetAttachment(ctx context.Context, docID, filename string, options ...Option) (*Attachment, error) {
706 if db.err != nil {
707 return nil, db.err
708 }
709 endQuery, err := db.startQuery()
710 if err != nil {
711 return nil, err
712 }
713 defer endQuery()
714 if docID == "" {
715 return nil, missingArg("docID")
716 }
717 if filename == "" {
718 return nil, missingArg("filename")
719 }
720 att, err := db.driverDB.GetAttachment(ctx, docID, filename, multiOptions(options))
721 if err != nil {
722 return nil, err
723 }
724 a := Attachment(*att)
725 return &a, nil
726 }
727
728 type nilContentReader struct{}
729
730 var _ io.ReadCloser = &nilContentReader{}
731
732 func (c nilContentReader) Read(_ []byte) (int, error) { return 0, io.EOF }
733 func (c nilContentReader) Close() error { return nil }
734
735 var nilContent = nilContentReader{}
736
737
738
739 func (db *DB) GetAttachmentMeta(ctx context.Context, docID, filename string, options ...Option) (*Attachment, error) {
740 if db.err != nil {
741 return nil, db.err
742 }
743 if docID == "" {
744 return nil, missingArg("docID")
745 }
746 if filename == "" {
747 return nil, missingArg("filename")
748 }
749 var att *Attachment
750 if metaer, ok := db.driverDB.(driver.AttachmentMetaGetter); ok {
751 endQuery, err := db.startQuery()
752 if err != nil {
753 return nil, err
754 }
755 defer endQuery()
756 a, err := metaer.GetAttachmentMeta(ctx, docID, filename, multiOptions(options))
757 if err != nil {
758 return nil, err
759 }
760 att = new(Attachment)
761 *att = Attachment(*a)
762 } else {
763 var err error
764 att, err = db.GetAttachment(ctx, docID, filename, options...)
765 if err != nil {
766 return nil, err
767 }
768 }
769 if att.Content != nil {
770 _ = att.Content.Close()
771 }
772 att.Content = nilContent
773 return att, nil
774 }
775
776
777
778
779 func (db *DB) DeleteAttachment(ctx context.Context, docID, rev, filename string, options ...Option) (newRev string, err error) {
780 if db.err != nil {
781 return "", db.err
782 }
783 endQuery, err := db.startQuery()
784 if err != nil {
785 return "", err
786 }
787 defer endQuery()
788 if docID == "" {
789 return "", missingArg("docID")
790 }
791 if filename == "" {
792 return "", missingArg("filename")
793 }
794 opts := append(multiOptions{Rev(rev)}, options...)
795 return db.driverDB.DeleteAttachment(ctx, docID, filename, opts)
796 }
797
798
799 type PurgeResult struct {
800
801 Seq int64 `json:"purge_seq"`
802
803
804 Purged map[string][]string `json:"purged"`
805 }
806
807
808
809
810
811
812
813
814
815
816 func (db *DB) Purge(ctx context.Context, docRevMap map[string][]string) (*PurgeResult, error) {
817 if db.err != nil {
818 return nil, db.err
819 }
820 endQuery, err := db.startQuery()
821 if err != nil {
822 return nil, err
823 }
824 defer endQuery()
825 if purger, ok := db.driverDB.(driver.Purger); ok {
826 res, err := purger.Purge(ctx, docRevMap)
827 if err != nil {
828 return nil, err
829 }
830 r := PurgeResult(*res)
831 return &r, nil
832 }
833 return nil, &internal.Error{Status: http.StatusNotImplemented, Message: "kivik: purge not supported by driver"}
834 }
835
836
837 type BulkGetReference struct {
838 ID string `json:"id"`
839 Rev string `json:"rev,omitempty"`
840 AttsSince string `json:"atts_since,omitempty"`
841 }
842
843
844
845
846
847
848
849
850 func (db *DB) BulkGet(ctx context.Context, docs []BulkGetReference, options ...Option) *ResultSet {
851 if db.err != nil {
852 return &ResultSet{iter: errIterator(db.err)}
853 }
854 bulkGetter, ok := db.driverDB.(driver.BulkGetter)
855 if !ok {
856 return &ResultSet{iter: errIterator(&internal.Error{Status: http.StatusNotImplemented, Message: "kivik: bulk get not supported by driver"})}
857 }
858
859 endQuery, err := db.startQuery()
860 if err != nil {
861 return &ResultSet{iter: errIterator(err)}
862 }
863 refs := make([]driver.BulkGetReference, len(docs))
864 for i, ref := range docs {
865 refs[i] = driver.BulkGetReference(ref)
866 }
867 rowsi, err := bulkGetter.BulkGet(ctx, refs, multiOptions(options))
868 if err != nil {
869 endQuery()
870 return &ResultSet{iter: errIterator(err)}
871 }
872 return newResultSet(ctx, endQuery, rowsi)
873 }
874
875
876
877
878
879 func (db *DB) Close() error {
880 if db.err != nil {
881 return db.err
882 }
883 db.mu.Lock()
884 db.closed = true
885 db.mu.Unlock()
886 db.wg.Wait()
887 return db.driverDB.Close()
888 }
889
890
891
892 type RevDiff struct {
893 Missing []string `json:"missing,omitempty"`
894 PossibleAncestors []string `json:"possible_ancestors,omitempty"`
895 }
896
897
898
899 type Diffs map[string]RevDiff
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916 func (db *DB) RevsDiff(ctx context.Context, revMap interface{}) *ResultSet {
917 if db.err != nil {
918 return &ResultSet{iter: errIterator(db.err)}
919 }
920 if rd, ok := db.driverDB.(driver.RevsDiffer); ok {
921 endQuery, err := db.startQuery()
922 if err != nil {
923 return &ResultSet{iter: errIterator(err)}
924 }
925 rowsi, err := rd.RevsDiff(ctx, revMap)
926 if err != nil {
927 endQuery()
928 return &ResultSet{iter: errIterator(err)}
929 }
930 return newResultSet(ctx, endQuery, rowsi)
931 }
932 return &ResultSet{iter: errIterator(&internal.Error{Status: http.StatusNotImplemented, Message: "kivik: _revs_diff not supported by driver"})}
933 }
934
935
936 type PartitionStats struct {
937 DBName string
938 DocCount int64
939 DeletedDocCount int64
940 Partition string
941 ActiveSize int64
942 ExternalSize int64
943 RawResponse json.RawMessage
944 }
945
946
947
948
949
950
951 func (db *DB) PartitionStats(ctx context.Context, name string) (*PartitionStats, error) {
952 if db.err != nil {
953 return nil, db.err
954 }
955 endQuery, err := db.startQuery()
956 if err != nil {
957 return nil, err
958 }
959 defer endQuery()
960 if pdb, ok := db.driverDB.(driver.PartitionedDB); ok {
961 stats, err := pdb.PartitionStats(ctx, name)
962 if err != nil {
963 return nil, err
964 }
965 s := PartitionStats(*stats)
966 return &s, nil
967 }
968 return nil, &internal.Error{Status: http.StatusNotImplemented, Message: "kivik: partitions not supported by driver"}
969 }
970
View as plain text