...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pouchdb
16
17 import (
18 "fmt"
19 "io"
20 "sync"
21 "time"
22
23 "github.com/gopherjs/gopherjs/js"
24 "github.com/gopherjs/jsbuiltin"
25
26 "github.com/go-kivik/kivik/v4/pouchdb/bindings"
27 )
28
29 type replicationState struct {
30 *js.Object
31 startTime time.Time `js:"start_time"`
32 endTime time.Time `js:"end_time"`
33 DocsRead int64 `js:"docs_read"`
34 DocsWritten int64 `js:"docs_written"`
35 DocWriteFailures int64 `js:"doc_write_failures"`
36 LastSeq string `js:"last_seq"`
37 }
38
39 func (rs *replicationState) StartTime() time.Time {
40 value := rs.Get("start_time")
41 if jsbuiltin.InstanceOf(value, js.Global.Get("Date")) {
42 return rs.startTime
43 }
44 t, err := convertTime(value)
45 if err != nil {
46 panic("start time: " + err.Error())
47 }
48 return t
49 }
50
51 func (rs *replicationState) EndTime() time.Time {
52 value := rs.Get("end_time")
53 if jsbuiltin.InstanceOf(value, js.Global.Get("Date")) {
54 return rs.endTime
55 }
56 t, err := convertTime(value)
57 if err != nil {
58 panic("end time: " + err.Error())
59 }
60 return t
61 }
62
63 func convertTime(value fmt.Stringer) (time.Time, error) {
64 if value == js.Undefined {
65 return time.Time{}, nil
66 }
67 if jsbuiltin.TypeOf(value) == jsbuiltin.TypeString {
68 return time.Parse(time.RFC3339, value.String())
69 }
70 return time.Time{}, fmt.Errorf("unsupported type")
71 }
72
73 type replicationHandler struct {
74 event *string
75 state *replicationState
76
77 mu sync.Mutex
78 wg sync.WaitGroup
79 complete bool
80 obj *js.Object
81 }
82
83 func (r *replicationHandler) Cancel() {
84 r.obj.Call("cancel")
85 }
86
87
88
89
90 func (r *replicationHandler) Status() (string, *replicationState, error) {
91 if r.complete && r.event == nil {
92 return "", nil, io.EOF
93 }
94 r.mu.Lock()
95 if r.event == nil {
96 r.mu.Unlock()
97
98 r.wg.Wait()
99 r.mu.Lock()
100 }
101 event, state := r.event, r.state
102 r.event = nil
103 r.mu.Unlock()
104 r.wg.Add(1)
105 return *event, state, nil
106 }
107
108 func (r *replicationHandler) handleEvent(event string, info *js.Object) {
109 if r.complete {
110 panic(fmt.Sprintf("Unexpected replication event after complete. %v %v", event, info))
111 }
112 r.mu.Lock()
113 defer r.mu.Unlock()
114 r.event = &event
115 switch event {
116 case bindings.ReplicationEventDenied, bindings.ReplicationEventError, bindings.ReplicationEventComplete:
117 r.complete = true
118 }
119 if info != nil && info != js.Undefined {
120 r.state = &replicationState{Object: info}
121 }
122 r.wg.Done()
123 }
124
125 func newReplicationHandler(rep *js.Object) *replicationHandler {
126 r := &replicationHandler{obj: rep}
127 for _, event := range []string{
128 bindings.ReplicationEventChange,
129 bindings.ReplicationEventComplete,
130 bindings.ReplicationEventPaused,
131 bindings.ReplicationEventActive,
132 bindings.ReplicationEventDenied,
133 bindings.ReplicationEventError,
134 } {
135 func(e string) {
136 rep.Call("on", e, func(info *js.Object) {
137 r.handleEvent(e, info)
138 })
139 }(event)
140 }
141 r.wg.Add(1)
142 return r
143 }
144
View as plain text