...

Source file src/github.com/go-kivik/kivik/v4/couchdb/scheduler.go

Documentation: github.com/go-kivik/kivik/v4/couchdb

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package couchdb
    14  
    15  import (
    16  	"bytes"
    17  	"context"
    18  	"encoding/json"
    19  	"fmt"
    20  	"net/http"
    21  	"time"
    22  
    23  	kivik "github.com/go-kivik/kivik/v4"
    24  	"github.com/go-kivik/kivik/v4/couchdb/chttp"
    25  	"github.com/go-kivik/kivik/v4/driver"
    26  )
    27  
    28  type schedulerDoc struct {
    29  	Database      string    `json:"database"`
    30  	DocID         string    `json:"doc_id"`
    31  	ReplicationID string    `json:"id"`
    32  	Source        string    `json:"source"`
    33  	Target        string    `json:"target"`
    34  	StartTime     time.Time `json:"start_time"`
    35  	LastUpdated   time.Time `json:"last_updated"`
    36  	State         string    `json:"state"`
    37  	Info          repInfo   `json:"info"`
    38  }
    39  
    40  type repInfo struct {
    41  	Error            error
    42  	DocsRead         int64 `json:"docs_read"`
    43  	DocsWritten      int64 `json:"docs_written"`
    44  	DocWriteFailures int64 `json:"doc_write_failures"`
    45  	Pending          int64 `json:"changes_pending"`
    46  }
    47  
    48  func (i *repInfo) UnmarshalJSON(data []byte) error {
    49  	switch {
    50  	case string(data) == "null":
    51  		return nil
    52  	case bytes.HasPrefix(data, []byte(`{"error":`)):
    53  		var e struct {
    54  			Error *replicationError `json:"error"`
    55  		}
    56  		if err := json.Unmarshal(data, &e); err != nil {
    57  			return err
    58  		}
    59  		i.Error = e.Error
    60  	case data[0] == '{':
    61  		type repInfoClone repInfo
    62  		var x repInfoClone
    63  		if err := json.Unmarshal(data, &x); err != nil {
    64  			return err
    65  		}
    66  		*i = repInfo(x)
    67  	default:
    68  		var e replicationError
    69  		if err := json.Unmarshal(data, &e); err != nil {
    70  			return err
    71  		}
    72  		i.Error = &e
    73  	}
    74  	return nil
    75  }
    76  
    77  type schedulerReplication struct {
    78  	docID         string
    79  	database      string
    80  	replicationID string
    81  	source        string
    82  	target        string
    83  	startTime     time.Time
    84  	lastUpdated   time.Time
    85  	state         string
    86  	info          repInfo
    87  
    88  	*db
    89  }
    90  
    91  var _ driver.Replication = &schedulerReplication{}
    92  
    93  func (c *client) schedulerSupported(ctx context.Context) (bool, error) {
    94  	c.sdMU.Lock()
    95  	defer c.sdMU.Unlock()
    96  	if c.schedulerDetected != nil {
    97  		return *c.schedulerDetected, nil
    98  	}
    99  	resp, err := c.DoReq(ctx, http.MethodHead, "_scheduler/jobs", nil)
   100  	if err != nil {
   101  		return false, err
   102  	}
   103  	var supported bool
   104  	switch resp.StatusCode {
   105  	case http.StatusBadRequest:
   106  		// 1.6.x, 1.7.x
   107  		supported = false
   108  	case http.StatusNotFound:
   109  		// 2.0.x
   110  		supported = false
   111  	case http.StatusOK, http.StatusUnauthorized:
   112  		// 2.1.x +
   113  		supported = true
   114  	default:
   115  		// Assume not supported
   116  		supported = false
   117  	}
   118  	c.schedulerDetected = &supported
   119  	return supported, nil
   120  }
   121  
   122  func (c *client) newSchedulerReplication(doc *schedulerDoc) *schedulerReplication {
   123  	rep := &schedulerReplication{
   124  		db: &db{
   125  			client: c,
   126  			dbName: doc.Database,
   127  		},
   128  	}
   129  	rep.setFromDoc(doc)
   130  	return rep
   131  }
   132  
   133  func (r *schedulerReplication) setFromDoc(doc *schedulerDoc) {
   134  	if r.source == "" {
   135  		r.docID = doc.DocID
   136  		r.database = doc.Database
   137  		r.replicationID = doc.ReplicationID
   138  		r.source = doc.Source
   139  		r.target = doc.Target
   140  		r.startTime = doc.StartTime
   141  	}
   142  	r.lastUpdated = doc.LastUpdated
   143  	r.state = doc.State
   144  	r.info = doc.Info
   145  }
   146  
   147  func (c *client) fetchSchedulerReplication(ctx context.Context, docID string) (*schedulerReplication, error) {
   148  	rep := &schedulerReplication{
   149  		docID:    docID,
   150  		database: "_replicator",
   151  		db: &db{
   152  			client: c,
   153  			dbName: "_replicator",
   154  		},
   155  	}
   156  	for rep.source == "" {
   157  		if err := rep.update(ctx); err != nil {
   158  			return rep, err
   159  		}
   160  		time.Sleep(100 * time.Millisecond) // nolint:gomnd
   161  	}
   162  	return rep, nil
   163  }
   164  
   165  func (r *schedulerReplication) StartTime() time.Time { return r.startTime }
   166  func (r *schedulerReplication) EndTime() time.Time {
   167  	if r.state == "failed" || r.state == "completed" {
   168  		return r.lastUpdated
   169  	}
   170  	return time.Time{}
   171  }
   172  func (r *schedulerReplication) Err() error            { return r.info.Error }
   173  func (r *schedulerReplication) ReplicationID() string { return r.replicationID }
   174  func (r *schedulerReplication) Source() string        { return r.source }
   175  func (r *schedulerReplication) Target() string        { return r.target }
   176  func (r *schedulerReplication) State() string         { return r.state }
   177  
   178  func (r *schedulerReplication) Update(ctx context.Context, rep *driver.ReplicationInfo) error {
   179  	if err := r.update(ctx); err != nil {
   180  		return err
   181  	}
   182  	rep.DocWriteFailures = r.info.DocWriteFailures
   183  	rep.DocsRead = r.info.DocsRead
   184  	rep.DocsWritten = r.info.DocsWritten
   185  	return nil
   186  }
   187  
   188  func (r *schedulerReplication) Delete(ctx context.Context) error {
   189  	rev, err := r.GetRev(ctx, r.docID, kivik.Params(nil))
   190  	if err != nil {
   191  		return err
   192  	}
   193  	_, err = r.db.Delete(ctx, r.docID, kivik.Rev(rev))
   194  	return err
   195  }
   196  
   197  // isBug1000 detects a race condition bug in CouchDB 2.1.x so the attempt can
   198  // be retried. See https://github.com/apache/couchdb/issues/1000
   199  func isBug1000(err error) bool {
   200  	if err == nil {
   201  		return false
   202  	}
   203  	cerr, ok := err.(*chttp.HTTPError)
   204  	if !ok {
   205  		// should never happen
   206  		return false
   207  	}
   208  	if cerr.Response.StatusCode != http.StatusInternalServerError {
   209  		return false
   210  	}
   211  	return cerr.Reason == "function_clause"
   212  }
   213  
   214  func (r *schedulerReplication) update(ctx context.Context) error {
   215  	path := fmt.Sprintf("/_scheduler/docs/%s/%s", r.database, chttp.EncodeDocID(r.docID))
   216  	var doc schedulerDoc
   217  	if err := r.db.Client.DoJSON(ctx, http.MethodGet, path, nil, &doc); err != nil {
   218  		if isBug1000(err) {
   219  			return r.update(ctx)
   220  		}
   221  		return err
   222  	}
   223  	r.setFromDoc(&doc)
   224  	return nil
   225  }
   226  
   227  func (c *client) getReplicationsFromScheduler(ctx context.Context, opts map[string]interface{}) ([]driver.Replication, error) {
   228  	params, err := optionsToParams(opts)
   229  	if err != nil {
   230  		return nil, err
   231  	}
   232  	var result struct {
   233  		Docs []schedulerDoc `json:"docs"`
   234  	}
   235  	path := "/_scheduler/docs"
   236  	if params != nil {
   237  		path = path + "?" + params.Encode()
   238  	}
   239  	if err = c.DoJSON(ctx, http.MethodGet, path, nil, &result); err != nil {
   240  		return nil, err
   241  	}
   242  	reps := make([]driver.Replication, 0, len(result.Docs))
   243  	for _, row := range result.Docs {
   244  		rep := c.newSchedulerReplication(&row)
   245  		reps = append(reps, rep)
   246  	}
   247  	return reps, nil
   248  }
   249  

View as plain text