...

Source file src/github.com/go-kivik/kivik/v4/pouchdb/replicationEvents.go

Documentation: github.com/go-kivik/kivik/v4/pouchdb

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  //go:build js
    14  
    15  package pouchdb
    16  
    17  import (
    18  	"fmt"
    19  	"io"
    20  	"sync"
    21  	"time"
    22  
    23  	"github.com/gopherjs/gopherjs/js"
    24  	"github.com/gopherjs/jsbuiltin"
    25  
    26  	"github.com/go-kivik/kivik/v4/pouchdb/bindings"
    27  )
    28  
    29  type replicationState struct {
    30  	*js.Object
    31  	startTime        time.Time `js:"start_time"`
    32  	endTime          time.Time `js:"end_time"`
    33  	DocsRead         int64     `js:"docs_read"`
    34  	DocsWritten      int64     `js:"docs_written"`
    35  	DocWriteFailures int64     `js:"doc_write_failures"`
    36  	LastSeq          string    `js:"last_seq"`
    37  }
    38  
    39  func (rs *replicationState) StartTime() time.Time {
    40  	value := rs.Get("start_time")
    41  	if jsbuiltin.InstanceOf(value, js.Global.Get("Date")) {
    42  		return rs.startTime
    43  	}
    44  	t, err := convertTime(value)
    45  	if err != nil {
    46  		panic("start time: " + err.Error())
    47  	}
    48  	return t
    49  }
    50  
    51  func (rs *replicationState) EndTime() time.Time {
    52  	value := rs.Get("end_time")
    53  	if jsbuiltin.InstanceOf(value, js.Global.Get("Date")) {
    54  		return rs.endTime
    55  	}
    56  	t, err := convertTime(value)
    57  	if err != nil {
    58  		panic("end time: " + err.Error())
    59  	}
    60  	return t
    61  }
    62  
    63  func convertTime(value fmt.Stringer) (time.Time, error) {
    64  	if value == js.Undefined {
    65  		return time.Time{}, nil
    66  	}
    67  	if jsbuiltin.TypeOf(value) == jsbuiltin.TypeString {
    68  		return time.Parse(time.RFC3339, value.String())
    69  	}
    70  	return time.Time{}, fmt.Errorf("unsupported type")
    71  }
    72  
    73  type replicationHandler struct {
    74  	event *string
    75  	state *replicationState
    76  
    77  	mu       sync.Mutex
    78  	wg       sync.WaitGroup
    79  	complete bool
    80  	obj      *js.Object
    81  }
    82  
    83  func (r *replicationHandler) Cancel() {
    84  	r.obj.Call("cancel")
    85  }
    86  
    87  // Status returns the last-read status. If the last-read status was already read,
    88  // this blocks until the next event.  If the replication is complete, it will
    89  // return io.EOF immediately.
    90  func (r *replicationHandler) Status() (string, *replicationState, error) {
    91  	if r.complete && r.event == nil {
    92  		return "", nil, io.EOF
    93  	}
    94  	r.mu.Lock()
    95  	if r.event == nil {
    96  		r.mu.Unlock()
    97  		// Wait for an event to be ready to read
    98  		r.wg.Wait()
    99  		r.mu.Lock()
   100  	}
   101  	event, state := r.event, r.state
   102  	r.event = nil
   103  	r.mu.Unlock()
   104  	r.wg.Add(1)
   105  	return *event, state, nil
   106  }
   107  
   108  func (r *replicationHandler) handleEvent(event string, info *js.Object) {
   109  	if r.complete {
   110  		panic(fmt.Sprintf("Unexpected replication event after complete. %v %v", event, info))
   111  	}
   112  	r.mu.Lock()
   113  	defer r.mu.Unlock()
   114  	r.event = &event
   115  	switch event {
   116  	case bindings.ReplicationEventDenied, bindings.ReplicationEventError, bindings.ReplicationEventComplete:
   117  		r.complete = true
   118  	}
   119  	if info != nil && info != js.Undefined {
   120  		r.state = &replicationState{Object: info}
   121  	}
   122  	r.wg.Done()
   123  }
   124  
   125  func newReplicationHandler(rep *js.Object) *replicationHandler {
   126  	r := &replicationHandler{obj: rep}
   127  	for _, event := range []string{
   128  		bindings.ReplicationEventChange,
   129  		bindings.ReplicationEventComplete,
   130  		bindings.ReplicationEventPaused,
   131  		bindings.ReplicationEventActive,
   132  		bindings.ReplicationEventDenied,
   133  		bindings.ReplicationEventError,
   134  	} {
   135  		func(e string) {
   136  			rep.Call("on", e, func(info *js.Object) {
   137  				r.handleEvent(e, info)
   138  			})
   139  		}(event)
   140  	}
   141  	r.wg.Add(1)
   142  	return r
   143  }
   144  

View as plain text