...

Source file src/github.com/go-kivik/kivik/v4/x/fsdb/cdb/document.go

Documentation: github.com/go-kivik/kivik/v4/x/fsdb/cdb

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package cdb
    14  
    15  import (
    16  	"context"
    17  	"encoding/json"
    18  	"errors"
    19  	"fmt"
    20  	"net/http"
    21  	"os"
    22  	"path/filepath"
    23  	"sort"
    24  	"strings"
    25  
    26  	"github.com/go-kivik/kivik/v4/driver"
    27  	"github.com/go-kivik/kivik/v4/x/fsdb/filesystem"
    28  )
    29  
    30  // Document is a CouchDB document.
    31  type Document struct {
    32  	ID        string    `json:"_id" yaml:"_id"`
    33  	Revisions Revisions `json:"-" yaml:"-"`
    34  	// RevsInfo is only used during JSON marshaling when revs_info=true, and
    35  	// should never be consulted as authoritative.
    36  	RevsInfo []RevInfo `json:"_revs_info,omitempty" yaml:"-"`
    37  	// RevHistory is only used during JSON marshaling, when revs=true, and
    38  	// should never be consulted as authoritative.
    39  	RevHistory *RevHistory `json:"_revisions,omitempty" yaml:"-"`
    40  
    41  	Options map[string]interface{} `json:"-" yaml:"-"`
    42  
    43  	cdb *FS
    44  }
    45  
    46  // NewDocument creates a new document.
    47  func (fs *FS) NewDocument(docID string) *Document {
    48  	return &Document{
    49  		ID:  docID,
    50  		cdb: fs,
    51  	}
    52  }
    53  
    54  // MarshalJSON satisfies the json.Marshaler interface.
    55  func (d *Document) MarshalJSON() ([]byte, error) {
    56  	d.revsInfo()
    57  	d.revs()
    58  	rev := d.Revisions[0]
    59  	rev.options = d.Options
    60  	revJSON, err := json.Marshal(rev)
    61  	if err != nil {
    62  		return nil, err
    63  	}
    64  	docJSON, _ := json.Marshal(*d)
    65  	return joinJSON(docJSON, revJSON), nil
    66  }
    67  
    68  // revs populates the Rev
    69  func (d *Document) revs() {
    70  	d.RevHistory = nil
    71  	if ok, _ := d.Options["revs"].(bool); !ok {
    72  		return
    73  	}
    74  	if len(d.Revisions) < 1 {
    75  		return
    76  	}
    77  	d.RevHistory = d.Revisions[0].RevHistory
    78  }
    79  
    80  // revsInfo populates the RevsInfo field, if appropriate according to options.
    81  func (d *Document) revsInfo() {
    82  	d.RevsInfo = nil
    83  	if ok, _ := d.Options["revs_info"].(bool); !ok {
    84  		return
    85  	}
    86  	if _, ok := d.Options["rev"]; ok {
    87  		return
    88  	}
    89  	d.RevsInfo = make([]RevInfo, len(d.Revisions))
    90  	for i, rev := range d.Revisions {
    91  		d.RevsInfo[i] = RevInfo{
    92  			Rev:    rev.Rev.String(),
    93  			Status: "available",
    94  		}
    95  	}
    96  }
    97  
    98  // RevInfo is revisions information as presented in the _revs_info key.
    99  type RevInfo struct {
   100  	Rev    string `json:"rev"`
   101  	Status string `json:"status"`
   102  }
   103  
   104  // Compact cleans up any non-leaf revs, and attempts to consolidate attachments.
   105  func (d *Document) Compact(ctx context.Context) error {
   106  	revTree := make(map[string]*Revision, 1)
   107  	// An index of ancestor -> leaf revision
   108  	index := map[string][]string{}
   109  	keep := make([]*Revision, 0, 1)
   110  	for _, rev := range d.Revisions {
   111  		revID := rev.Rev.String()
   112  		if leafIDs, ok := index[revID]; ok {
   113  			for _, leafID := range leafIDs {
   114  				if err := copyAttachments(d.cdb.fs, revTree[leafID], rev); err != nil {
   115  					return err
   116  				}
   117  			}
   118  			if err := rev.Delete(ctx); err != nil {
   119  				return err
   120  			}
   121  			continue
   122  		}
   123  		keep = append(keep, rev)
   124  		for _, ancestor := range rev.RevHistory.Ancestors()[1:] {
   125  			index[ancestor] = append(index[ancestor], revID)
   126  		}
   127  		revTree[revID] = rev
   128  	}
   129  	d.Revisions = keep
   130  	return nil
   131  }
   132  
   133  const tempPerms = 0o777
   134  
   135  func copyAttachments(fs filesystem.Filesystem, leaf, old *Revision) error {
   136  	leafpath := strings.TrimSuffix(leaf.path, filepath.Ext(leaf.path)) + "/"
   137  	basepath := strings.TrimSuffix(old.path, filepath.Ext(old.path)) + "/"
   138  	for filename, att := range old.Attachments {
   139  		if _, ok := leaf.Attachments[filename]; !ok {
   140  			continue
   141  		}
   142  		if strings.HasPrefix(att.path, basepath) {
   143  			name := filepath.Base(att.path)
   144  			if err := os.MkdirAll(leafpath, tempPerms); err != nil {
   145  				return err
   146  			}
   147  			if err := fs.Link(att.path, filepath.Join(leafpath, name)); err != nil {
   148  				if os.IsExist(err) {
   149  					if err := fs.Remove(att.path); err != nil {
   150  						return err
   151  					}
   152  					continue
   153  				}
   154  				return err
   155  			}
   156  		}
   157  	}
   158  	return nil
   159  }
   160  
   161  // AddRevision adds rev to the existing document, according to options, and
   162  // persists it to disk. The return value is the new revision ID.
   163  func (d *Document) AddRevision(ctx context.Context, rev *Revision, options driver.Options) (string, error) {
   164  	revid, err := d.addRevision(ctx, rev, options)
   165  	if err != nil {
   166  		return "", err
   167  	}
   168  	err = d.persist(ctx)
   169  	return revid, err
   170  }
   171  
   172  func (d *Document) addOldEdit(rev *Revision) (string, error) {
   173  	if rev.Rev.IsZero() {
   174  		return "", statusError{status: http.StatusBadRequest, error: errors.New("_rev required with new_edits=false")}
   175  	}
   176  	for _, r := range d.Revisions {
   177  		if r.Rev.Equal(rev.Rev) {
   178  			// If the rev already exists, do nothing, but report success.
   179  			return r.Rev.String(), nil
   180  		}
   181  	}
   182  	d.Revisions = append(d.Revisions, rev)
   183  	sort.Sort(d.Revisions)
   184  	return rev.Rev.String(), nil
   185  }
   186  
   187  func (d *Document) addRevision(ctx context.Context, rev *Revision, options driver.Options) (string, error) {
   188  	opts := map[string]interface{}{}
   189  	options.Apply(opts)
   190  	if newEdits, ok := opts["new_edits"].(bool); ok && !newEdits {
   191  		return d.addOldEdit(rev)
   192  	}
   193  	if revid, ok := opts["rev"].(string); ok {
   194  		var newrev RevID
   195  		if err := newrev.UnmarshalText([]byte(revid)); err != nil {
   196  			return "", err
   197  		}
   198  		if !rev.Rev.IsZero() && rev.Rev.String() != newrev.String() {
   199  			return "", statusError{status: http.StatusBadRequest, error: errors.New("document rev from request body and query string have different values")}
   200  		}
   201  		rev.Rev = newrev
   202  	}
   203  	needRev := len(d.Revisions) > 0
   204  	haveRev := !rev.Rev.IsZero()
   205  	if needRev != haveRev {
   206  		return "", errConflict
   207  	}
   208  	var oldrev *Revision
   209  	if len(d.Revisions) > 0 {
   210  		var ok bool
   211  		if oldrev, ok = d.leaves()[rev.Rev.String()]; !ok {
   212  			return "", errConflict
   213  		}
   214  	}
   215  
   216  	hash, err := rev.hash()
   217  	if err != nil {
   218  		return "", err
   219  	}
   220  	rev.Rev = RevID{
   221  		Seq: rev.Rev.Seq + 1,
   222  		Sum: hash,
   223  	}
   224  	if oldrev != nil {
   225  		rev.RevHistory = oldrev.RevHistory.AddRevision(rev.Rev)
   226  	}
   227  
   228  	revpath := filepath.Join(d.cdb.root, "."+EscapeID(d.ID), rev.Rev.String())
   229  	var dirMade bool
   230  	for filename, att := range rev.Attachments {
   231  		att.fs = d.cdb.fs
   232  		if err := ctx.Err(); err != nil {
   233  			return "", err
   234  		}
   235  		var oldatt *Attachment
   236  		if oldrev != nil {
   237  			oldatt = oldrev.Attachments[filename]
   238  		}
   239  		if !att.Stub {
   240  			revpos := rev.Rev.Seq
   241  			att.RevPos = &revpos
   242  			if !dirMade {
   243  				if err := d.cdb.fs.MkdirAll(revpath, tempPerms); err != nil && !os.IsExist(err) {
   244  					return "", err
   245  				}
   246  				dirMade = true
   247  			}
   248  			if err := att.persist(revpath, filename); err != nil {
   249  				return "", err
   250  			}
   251  			if oldatt != nil && oldatt.Digest == att.Digest {
   252  				if err := att.fs.Remove(att.path); err != nil {
   253  					return "", err
   254  				}
   255  				att.path = ""
   256  				att.Stub = true
   257  				att.RevPos = oldatt.RevPos
   258  			}
   259  			continue
   260  		}
   261  		if oldrev == nil {
   262  			// Can't upload stubs if there's no previous revision
   263  			return "", statusError{status: http.StatusInternalServerError, error: fmt.Errorf("attachment %s: %w", filename, err)}
   264  		}
   265  		if att.Digest != "" && att.Digest != oldatt.Digest {
   266  			return "", statusError{status: http.StatusBadRequest, error: fmt.Errorf("invalid attachment data for %s", filename)}
   267  		}
   268  		if att.RevPos != nil && *att.RevPos != *oldatt.RevPos {
   269  			return "", statusError{status: http.StatusBadRequest, error: fmt.Errorf("invalid attachment data for %s", filename)}
   270  		}
   271  	}
   272  
   273  	if len(d.Revisions) == 0 {
   274  		rev.RevHistory = &RevHistory{
   275  			Start: rev.Rev.Seq,
   276  			IDs:   []string{rev.Rev.Sum},
   277  		}
   278  	}
   279  	d.Revisions = append(d.Revisions, rev)
   280  	sort.Sort(d.Revisions)
   281  	return rev.Rev.String(), nil
   282  }
   283  
   284  /*
   285  	persist updates the current rev state on disk.
   286  
   287  Persist strategy:
   288  
   289    - For every rev that doesn't exist on disk, create it in {db}/.{docid}/{rev}
   290    - If winning rev does not exist in {db}/{docid}:
   291    - Move old winning rev to {db}/.{docid}/{rev}
   292    - Move new winning rev to {db}/{docid}
   293  */
   294  func (d *Document) persist(ctx context.Context) error {
   295  	if d == nil || len(d.Revisions) == 0 {
   296  		return statusError{status: http.StatusBadRequest, error: errors.New("document has no revisions")}
   297  	}
   298  	docID := EscapeID(d.ID)
   299  	for _, rev := range d.Revisions {
   300  		if rev.path != "" {
   301  			continue
   302  		}
   303  		if err := ctx.Err(); err != nil {
   304  			return err
   305  		}
   306  		if err := rev.persist(ctx, filepath.Join(d.cdb.root, "."+docID, rev.Rev.String())); err != nil {
   307  			return err
   308  		}
   309  	}
   310  
   311  	// Make sure the winner is in the first position
   312  	sort.Sort(d.Revisions)
   313  
   314  	winningRev := d.Revisions[0]
   315  	winningPath := filepath.Join(d.cdb.root, docID)
   316  	if winningPath+filepath.Ext(winningRev.path) == winningRev.path {
   317  		// Winner already in place, our job is done here
   318  		return nil
   319  	}
   320  
   321  	// See if some other rev is currently the winning rev, and move it if necessary
   322  	for _, rev := range d.Revisions[1:] {
   323  		if winningPath+filepath.Ext(rev.path) == rev.path {
   324  			if err := ctx.Err(); err != nil {
   325  				return err
   326  			}
   327  			// We need to move this rev
   328  			revpath := filepath.Join(d.cdb.root, "."+EscapeID(d.ID), rev.Rev.String())
   329  			if err := d.cdb.fs.Mkdir(revpath, tempPerms); err != nil && !os.IsExist(err) {
   330  				return err
   331  			}
   332  			// First move attachments, since they can exit both places legally.
   333  			for attname, att := range rev.Attachments {
   334  				if !strings.HasPrefix(att.path, rev.path+"/") {
   335  					// This attachment is part of another rev, so skip it
   336  					continue
   337  				}
   338  				newpath := filepath.Join(revpath, attname)
   339  				if err := d.cdb.fs.Rename(att.path, newpath); err != nil {
   340  					return err
   341  				}
   342  				att.path = newpath
   343  			}
   344  			// Try to remove the attachments dir, but don't worry if we fail.
   345  			_ = d.cdb.fs.Remove(rev.path + "/")
   346  			// Then make the move final by moving the json doc
   347  			if err := d.cdb.fs.Rename(rev.path, revpath+filepath.Ext(rev.path)); err != nil {
   348  				return err
   349  			}
   350  			// And remove the old rev path, if it's empty
   351  			_ = d.cdb.fs.Remove(filepath.Dir(rev.path))
   352  			break
   353  		}
   354  	}
   355  
   356  	// Now finally put the new winner in place, first the doc, then attachments
   357  	if err := d.cdb.fs.Rename(winningRev.path, winningPath+filepath.Ext(winningRev.path)); err != nil {
   358  		return err
   359  	}
   360  	winningRev.path = winningPath + filepath.Ext(winningRev.path)
   361  
   362  	if err := d.cdb.fs.Mkdir(winningPath, tempPerms); err != nil && !os.IsExist(err) {
   363  		return err
   364  	}
   365  	revpath := filepath.Join(d.cdb.root, "."+EscapeID(d.ID), winningRev.Rev.String()) + "/"
   366  	for attname, att := range winningRev.Attachments {
   367  		if !strings.HasPrefix(att.path, revpath) {
   368  			// This attachment is part of another rev, so skip it
   369  			continue
   370  		}
   371  		if err := ctx.Err(); err != nil {
   372  			return err
   373  		}
   374  		newpath := filepath.Join(winningPath, attname)
   375  		if err := d.cdb.fs.Rename(att.path, newpath); err != nil {
   376  			return err
   377  		}
   378  		att.path = newpath
   379  	}
   380  	// And remove the old rev path, if it's empty
   381  	_ = d.cdb.fs.Remove(filepath.Dir(revpath))
   382  	_ = d.cdb.fs.Remove(filepath.Dir(filepath.Dir(revpath)))
   383  
   384  	return nil
   385  }
   386  
   387  // leaves returns a map of leave revid to rev
   388  func (d *Document) leaves() map[string]*Revision {
   389  	if len(d.Revisions) == 1 {
   390  		return map[string]*Revision{
   391  			d.Revisions[0].Rev.String(): d.Revisions[0],
   392  		}
   393  	}
   394  	leaves := make(map[string]*Revision, len(d.Revisions))
   395  	for _, rev := range d.Revisions {
   396  		leaves[rev.Rev.String()] = rev
   397  	}
   398  	for _, rev := range d.Revisions {
   399  		// Skp over the known leaf
   400  		for _, revid := range rev.RevHistory.Ancestors()[1:] {
   401  			delete(leaves, revid)
   402  		}
   403  	}
   404  	return leaves
   405  }
   406  

View as plain text