...

Source file src/go.etcd.io/etcd/client/v3/mirror/syncer.go

Documentation: go.etcd.io/etcd/client/v3/mirror

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Package mirror implements etcd mirroring operations.
    16  package mirror
    17  
    18  import (
    19  	"context"
    20  
    21  	clientv3 "go.etcd.io/etcd/client/v3"
    22  )
    23  
    24  const (
    25  	batchLimit = 1000
    26  )
    27  
    28  // Syncer syncs with the key-value state of an etcd cluster.
    29  type Syncer interface {
    30  	// SyncBase syncs the base state of the key-value state.
    31  	// The key-value state are sent through the returned chan.
    32  	SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error)
    33  	// SyncUpdates syncs the updates of the key-value state.
    34  	// The update events are sent through the returned chan.
    35  	SyncUpdates(ctx context.Context) clientv3.WatchChan
    36  }
    37  
    38  // NewSyncer creates a Syncer.
    39  func NewSyncer(c *clientv3.Client, prefix string, rev int64) Syncer {
    40  	return &syncer{c: c, prefix: prefix, rev: rev}
    41  }
    42  
    43  type syncer struct {
    44  	c      *clientv3.Client
    45  	rev    int64
    46  	prefix string
    47  }
    48  
    49  func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) {
    50  	respchan := make(chan clientv3.GetResponse, 1024)
    51  	errchan := make(chan error, 1)
    52  
    53  	// if rev is not specified, we will choose the most recent revision.
    54  	if s.rev == 0 {
    55  		// If len(s.prefix) == 0, we will check a random key to fetch the most recent
    56  		// revision (foo), otherwise we use the provided prefix.
    57  		checkPath := "foo"
    58  		if len(s.prefix) != 0 {
    59  			checkPath = s.prefix
    60  		}
    61  		resp, err := s.c.Get(ctx, checkPath)
    62  		if err != nil {
    63  			errchan <- err
    64  			close(respchan)
    65  			close(errchan)
    66  			return respchan, errchan
    67  		}
    68  		s.rev = resp.Header.Revision
    69  	}
    70  
    71  	go func() {
    72  		defer close(respchan)
    73  		defer close(errchan)
    74  
    75  		var key string
    76  
    77  		opts := []clientv3.OpOption{clientv3.WithLimit(batchLimit), clientv3.WithRev(s.rev)}
    78  
    79  		if len(s.prefix) == 0 {
    80  			// If len(s.prefix) == 0, we will sync the entire key-value space.
    81  			// We then range from the smallest key (0x00) to the end.
    82  			opts = append(opts, clientv3.WithFromKey())
    83  			key = "\x00"
    84  		} else {
    85  			// If len(s.prefix) != 0, we will sync key-value space with given prefix.
    86  			// We then range from the prefix to the next prefix if exists. Or we will
    87  			// range from the prefix to the end if the next prefix does not exists.
    88  			opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
    89  			key = s.prefix
    90  		}
    91  
    92  		for {
    93  			resp, err := s.c.Get(ctx, key, opts...)
    94  			if err != nil {
    95  				errchan <- err
    96  				return
    97  			}
    98  
    99  			respchan <- *resp
   100  
   101  			if !resp.More {
   102  				return
   103  			}
   104  			// move to next key
   105  			key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
   106  		}
   107  	}()
   108  
   109  	return respchan, errchan
   110  }
   111  
   112  func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {
   113  	if s.rev == 0 {
   114  		panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?")
   115  	}
   116  	return s.c.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev+1))
   117  }
   118  

View as plain text