1
2
3
4
5
6
7
8
9
10
11
12
13 package couchdb
14
15 import (
16 "bytes"
17 "context"
18 "encoding/json"
19 "fmt"
20 "net/http"
21 "time"
22
23 kivik "github.com/go-kivik/kivik/v4"
24 "github.com/go-kivik/kivik/v4/couchdb/chttp"
25 "github.com/go-kivik/kivik/v4/driver"
26 )
27
28 type schedulerDoc struct {
29 Database string `json:"database"`
30 DocID string `json:"doc_id"`
31 ReplicationID string `json:"id"`
32 Source string `json:"source"`
33 Target string `json:"target"`
34 StartTime time.Time `json:"start_time"`
35 LastUpdated time.Time `json:"last_updated"`
36 State string `json:"state"`
37 Info repInfo `json:"info"`
38 }
39
40 type repInfo struct {
41 Error error
42 DocsRead int64 `json:"docs_read"`
43 DocsWritten int64 `json:"docs_written"`
44 DocWriteFailures int64 `json:"doc_write_failures"`
45 Pending int64 `json:"changes_pending"`
46 }
47
48 func (i *repInfo) UnmarshalJSON(data []byte) error {
49 switch {
50 case string(data) == "null":
51 return nil
52 case bytes.HasPrefix(data, []byte(`{"error":`)):
53 var e struct {
54 Error *replicationError `json:"error"`
55 }
56 if err := json.Unmarshal(data, &e); err != nil {
57 return err
58 }
59 i.Error = e.Error
60 case data[0] == '{':
61 type repInfoClone repInfo
62 var x repInfoClone
63 if err := json.Unmarshal(data, &x); err != nil {
64 return err
65 }
66 *i = repInfo(x)
67 default:
68 var e replicationError
69 if err := json.Unmarshal(data, &e); err != nil {
70 return err
71 }
72 i.Error = &e
73 }
74 return nil
75 }
76
77 type schedulerReplication struct {
78 docID string
79 database string
80 replicationID string
81 source string
82 target string
83 startTime time.Time
84 lastUpdated time.Time
85 state string
86 info repInfo
87
88 *db
89 }
90
91 var _ driver.Replication = &schedulerReplication{}
92
93 func (c *client) schedulerSupported(ctx context.Context) (bool, error) {
94 c.sdMU.Lock()
95 defer c.sdMU.Unlock()
96 if c.schedulerDetected != nil {
97 return *c.schedulerDetected, nil
98 }
99 resp, err := c.DoReq(ctx, http.MethodHead, "_scheduler/jobs", nil)
100 if err != nil {
101 return false, err
102 }
103 var supported bool
104 switch resp.StatusCode {
105 case http.StatusBadRequest:
106
107 supported = false
108 case http.StatusNotFound:
109
110 supported = false
111 case http.StatusOK, http.StatusUnauthorized:
112
113 supported = true
114 default:
115
116 supported = false
117 }
118 c.schedulerDetected = &supported
119 return supported, nil
120 }
121
122 func (c *client) newSchedulerReplication(doc *schedulerDoc) *schedulerReplication {
123 rep := &schedulerReplication{
124 db: &db{
125 client: c,
126 dbName: doc.Database,
127 },
128 }
129 rep.setFromDoc(doc)
130 return rep
131 }
132
133 func (r *schedulerReplication) setFromDoc(doc *schedulerDoc) {
134 if r.source == "" {
135 r.docID = doc.DocID
136 r.database = doc.Database
137 r.replicationID = doc.ReplicationID
138 r.source = doc.Source
139 r.target = doc.Target
140 r.startTime = doc.StartTime
141 }
142 r.lastUpdated = doc.LastUpdated
143 r.state = doc.State
144 r.info = doc.Info
145 }
146
147 func (c *client) fetchSchedulerReplication(ctx context.Context, docID string) (*schedulerReplication, error) {
148 rep := &schedulerReplication{
149 docID: docID,
150 database: "_replicator",
151 db: &db{
152 client: c,
153 dbName: "_replicator",
154 },
155 }
156 for rep.source == "" {
157 if err := rep.update(ctx); err != nil {
158 return rep, err
159 }
160 time.Sleep(100 * time.Millisecond)
161 }
162 return rep, nil
163 }
164
165 func (r *schedulerReplication) StartTime() time.Time { return r.startTime }
166 func (r *schedulerReplication) EndTime() time.Time {
167 if r.state == "failed" || r.state == "completed" {
168 return r.lastUpdated
169 }
170 return time.Time{}
171 }
172 func (r *schedulerReplication) Err() error { return r.info.Error }
173 func (r *schedulerReplication) ReplicationID() string { return r.replicationID }
174 func (r *schedulerReplication) Source() string { return r.source }
175 func (r *schedulerReplication) Target() string { return r.target }
176 func (r *schedulerReplication) State() string { return r.state }
177
178 func (r *schedulerReplication) Update(ctx context.Context, rep *driver.ReplicationInfo) error {
179 if err := r.update(ctx); err != nil {
180 return err
181 }
182 rep.DocWriteFailures = r.info.DocWriteFailures
183 rep.DocsRead = r.info.DocsRead
184 rep.DocsWritten = r.info.DocsWritten
185 return nil
186 }
187
188 func (r *schedulerReplication) Delete(ctx context.Context) error {
189 rev, err := r.GetRev(ctx, r.docID, kivik.Params(nil))
190 if err != nil {
191 return err
192 }
193 _, err = r.db.Delete(ctx, r.docID, kivik.Rev(rev))
194 return err
195 }
196
197
198
199 func isBug1000(err error) bool {
200 if err == nil {
201 return false
202 }
203 cerr, ok := err.(*chttp.HTTPError)
204 if !ok {
205
206 return false
207 }
208 if cerr.Response.StatusCode != http.StatusInternalServerError {
209 return false
210 }
211 return cerr.Reason == "function_clause"
212 }
213
214 func (r *schedulerReplication) update(ctx context.Context) error {
215 path := fmt.Sprintf("/_scheduler/docs/%s/%s", r.database, chttp.EncodeDocID(r.docID))
216 var doc schedulerDoc
217 if err := r.db.Client.DoJSON(ctx, http.MethodGet, path, nil, &doc); err != nil {
218 if isBug1000(err) {
219 return r.update(ctx)
220 }
221 return err
222 }
223 r.setFromDoc(&doc)
224 return nil
225 }
226
227 func (c *client) getReplicationsFromScheduler(ctx context.Context, opts map[string]interface{}) ([]driver.Replication, error) {
228 params, err := optionsToParams(opts)
229 if err != nil {
230 return nil, err
231 }
232 var result struct {
233 Docs []schedulerDoc `json:"docs"`
234 }
235 path := "/_scheduler/docs"
236 if params != nil {
237 path = path + "?" + params.Encode()
238 }
239 if err = c.DoJSON(ctx, http.MethodGet, path, nil, &result); err != nil {
240 return nil, err
241 }
242 reps := make([]driver.Replication, 0, len(result.Docs))
243 for _, row := range result.Docs {
244 rep := c.newSchedulerReplication(&row)
245 reps = append(reps, rep)
246 }
247 return reps, nil
248 }
249
View as plain text