...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pouchdb
16
17 import (
18 "context"
19 "encoding/json"
20 "fmt"
21 "io"
22 "sync"
23
24 "github.com/gopherjs/gopherjs/js"
25
26 "github.com/go-kivik/kivik/v4/driver"
27 "github.com/go-kivik/kivik/v4/pouchdb/bindings"
28 )
29
30 type changesFeed struct {
31 changes *js.Object
32 ctx context.Context
33 feed chan *driver.Change
34 errMu sync.Mutex
35 err error
36 lastSeq string
37 }
38
39 var _ driver.Changes = &changesFeed{}
40
41 func newChangesFeed(ctx context.Context, changes *js.Object) *changesFeed {
42 const chanLen = 32
43 feed := make(chan *driver.Change, chanLen)
44 c := &changesFeed{
45 ctx: ctx,
46 changes: changes,
47 feed: feed,
48 }
49
50 changes.Call("on", "change", c.change)
51 changes.Call("on", "complete", c.complete)
52 changes.Call("on", "error", c.error)
53 return c
54 }
55
56 type changeRow struct {
57 *js.Object
58 ID string `js:"id"`
59 Seq string `js:"seq"`
60 Changes *js.Object `js:"changes"`
61 Doc *js.Object `js:"doc"`
62 Deleted bool `js:"deleted"`
63 }
64
65 func (c *changesFeed) setErr(err error) {
66 c.errMu.Lock()
67 c.err = err
68 c.errMu.Unlock()
69 }
70
71 func (c *changesFeed) Next(row *driver.Change) error {
72 c.errMu.Lock()
73 if c.err != nil {
74 c.errMu.Unlock()
75 return c.err
76 }
77 c.errMu.Unlock()
78 select {
79 case <-c.ctx.Done():
80 err := c.ctx.Err()
81 c.setErr(err)
82 return err
83 case newRow, ok := <-c.feed:
84 if !ok {
85 c.setErr(io.EOF)
86 return io.EOF
87 }
88 *row = *newRow
89 }
90 return nil
91 }
92
93 func (c *changesFeed) Close() error {
94 c.changes.Call("cancel")
95 return nil
96 }
97
98
99 func (c *changesFeed) LastSeq() string {
100 return c.lastSeq
101 }
102
103
104 func (c *changesFeed) Pending() int64 {
105 return 0
106 }
107
108
109 func (c *changesFeed) ETag() string {
110 return ""
111 }
112
113 func (c *changesFeed) change(change *changeRow) {
114 go func() {
115 defer func() {
116 if r := recover(); r != nil {
117 _ = c.Close()
118 if e, ok := r.(error); ok {
119 c.err = e
120 } else {
121 c.err = fmt.Errorf("%v", r)
122 }
123 }
124 }()
125 changedRevs := make([]string, 0, change.Changes.Length())
126 for i := 0; i < change.Changes.Length(); i++ {
127 changedRevs = append(changedRevs, change.Changes.Index(i).Get("rev").String())
128 }
129 var doc json.RawMessage
130 if change.Doc != js.Undefined {
131 doc = json.RawMessage(js.Global.Get("JSON").Call("stringify", change.Doc).String())
132 }
133 row := &driver.Change{
134 ID: change.ID,
135 Seq: change.Seq,
136 Deleted: change.Deleted,
137 Doc: doc,
138 Changes: changedRevs,
139 }
140 c.feed <- row
141 }()
142 }
143
144 func (c *changesFeed) complete(info *js.Object) {
145 if results := info.Get("results"); results != js.Undefined {
146 for _, result := range results.Interface().([]interface{}) {
147 c.change(&changeRow{
148 Object: result.(*js.Object),
149 })
150 }
151 }
152
153 c.lastSeq = info.Get("last_seq").String()
154
155 close(c.feed)
156 }
157
158 func (c *changesFeed) error(e *js.Object) {
159 c.setErr(bindings.NewPouchError(e))
160 }
161
162 func (d *db) Changes(ctx context.Context, options driver.Options) (driver.Changes, error) {
163 opts := map[string]interface{}{}
164 options.Apply(opts)
165 changes, err := d.db.Changes(ctx, opts)
166 if err != nil {
167 return nil, err
168 }
169
170 return newChangesFeed(ctx, changes), nil
171 }
172
View as plain text