...

Source file src/github.com/go-kivik/kivik/v4/pouchdb/changes.go

Documentation: github.com/go-kivik/kivik/v4/pouchdb

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  //go:build js
    14  
    15  package pouchdb
    16  
    17  import (
    18  	"context"
    19  	"encoding/json"
    20  	"fmt"
    21  	"io"
    22  	"sync"
    23  
    24  	"github.com/gopherjs/gopherjs/js"
    25  
    26  	"github.com/go-kivik/kivik/v4/driver"
    27  	"github.com/go-kivik/kivik/v4/pouchdb/bindings"
    28  )
    29  
    30  type changesFeed struct {
    31  	changes *js.Object
    32  	ctx     context.Context
    33  	feed    chan *driver.Change
    34  	errMu   sync.Mutex
    35  	err     error
    36  	lastSeq string
    37  }
    38  
    39  var _ driver.Changes = &changesFeed{}
    40  
    41  func newChangesFeed(ctx context.Context, changes *js.Object) *changesFeed {
    42  	const chanLen = 32
    43  	feed := make(chan *driver.Change, chanLen)
    44  	c := &changesFeed{
    45  		ctx:     ctx,
    46  		changes: changes,
    47  		feed:    feed,
    48  	}
    49  
    50  	changes.Call("on", "change", c.change)
    51  	changes.Call("on", "complete", c.complete)
    52  	changes.Call("on", "error", c.error)
    53  	return c
    54  }
    55  
    56  type changeRow struct {
    57  	*js.Object
    58  	ID      string     `js:"id"`
    59  	Seq     string     `js:"seq"`
    60  	Changes *js.Object `js:"changes"`
    61  	Doc     *js.Object `js:"doc"`
    62  	Deleted bool       `js:"deleted"`
    63  }
    64  
    65  func (c *changesFeed) setErr(err error) {
    66  	c.errMu.Lock()
    67  	c.err = err
    68  	c.errMu.Unlock()
    69  }
    70  
    71  func (c *changesFeed) Next(row *driver.Change) error {
    72  	c.errMu.Lock()
    73  	if c.err != nil {
    74  		c.errMu.Unlock()
    75  		return c.err
    76  	}
    77  	c.errMu.Unlock()
    78  	select {
    79  	case <-c.ctx.Done():
    80  		err := c.ctx.Err()
    81  		c.setErr(err)
    82  		return err
    83  	case newRow, ok := <-c.feed:
    84  		if !ok {
    85  			c.setErr(io.EOF)
    86  			return io.EOF
    87  		}
    88  		*row = *newRow
    89  	}
    90  	return nil
    91  }
    92  
    93  func (c *changesFeed) Close() error {
    94  	c.changes.Call("cancel")
    95  	return nil
    96  }
    97  
    98  // LastSeq returns the last_seq id, as returned by PouchDB.
    99  func (c *changesFeed) LastSeq() string {
   100  	return c.lastSeq
   101  }
   102  
   103  // Pending returns 0 for PouchDB.
   104  func (c *changesFeed) Pending() int64 {
   105  	return 0
   106  }
   107  
   108  // ETag returns an empty string for PouchDB.
   109  func (c *changesFeed) ETag() string {
   110  	return ""
   111  }
   112  
   113  func (c *changesFeed) change(change *changeRow) {
   114  	go func() {
   115  		defer func() {
   116  			if r := recover(); r != nil {
   117  				_ = c.Close()
   118  				if e, ok := r.(error); ok {
   119  					c.err = e
   120  				} else {
   121  					c.err = fmt.Errorf("%v", r)
   122  				}
   123  			}
   124  		}()
   125  		changedRevs := make([]string, 0, change.Changes.Length())
   126  		for i := 0; i < change.Changes.Length(); i++ {
   127  			changedRevs = append(changedRevs, change.Changes.Index(i).Get("rev").String())
   128  		}
   129  		var doc json.RawMessage
   130  		if change.Doc != js.Undefined {
   131  			doc = json.RawMessage(js.Global.Get("JSON").Call("stringify", change.Doc).String())
   132  		}
   133  		row := &driver.Change{
   134  			ID:      change.ID,
   135  			Seq:     change.Seq,
   136  			Deleted: change.Deleted,
   137  			Doc:     doc,
   138  			Changes: changedRevs,
   139  		}
   140  		c.feed <- row
   141  	}()
   142  }
   143  
   144  func (c *changesFeed) complete(info *js.Object) {
   145  	if results := info.Get("results"); results != js.Undefined {
   146  		for _, result := range results.Interface().([]interface{}) {
   147  			c.change(&changeRow{
   148  				Object: result.(*js.Object),
   149  			})
   150  		}
   151  	}
   152  
   153  	c.lastSeq = info.Get("last_seq").String()
   154  
   155  	close(c.feed)
   156  }
   157  
   158  func (c *changesFeed) error(e *js.Object) {
   159  	c.setErr(bindings.NewPouchError(e))
   160  }
   161  
   162  func (d *db) Changes(ctx context.Context, options driver.Options) (driver.Changes, error) {
   163  	opts := map[string]interface{}{}
   164  	options.Apply(opts)
   165  	changes, err := d.db.Changes(ctx, opts)
   166  	if err != nil {
   167  		return nil, err
   168  	}
   169  
   170  	return newChangesFeed(ctx, changes), nil
   171  }
   172  

View as plain text