...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package mirror
17
18 import (
19 "context"
20
21 clientv3 "go.etcd.io/etcd/client/v3"
22 )
23
24 const (
25 batchLimit = 1000
26 )
27
28
29 type Syncer interface {
30
31
32 SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error)
33
34
35 SyncUpdates(ctx context.Context) clientv3.WatchChan
36 }
37
38
39 func NewSyncer(c *clientv3.Client, prefix string, rev int64) Syncer {
40 return &syncer{c: c, prefix: prefix, rev: rev}
41 }
42
43 type syncer struct {
44 c *clientv3.Client
45 rev int64
46 prefix string
47 }
48
49 func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) {
50 respchan := make(chan clientv3.GetResponse, 1024)
51 errchan := make(chan error, 1)
52
53
54 if s.rev == 0 {
55
56
57 checkPath := "foo"
58 if len(s.prefix) != 0 {
59 checkPath = s.prefix
60 }
61 resp, err := s.c.Get(ctx, checkPath)
62 if err != nil {
63 errchan <- err
64 close(respchan)
65 close(errchan)
66 return respchan, errchan
67 }
68 s.rev = resp.Header.Revision
69 }
70
71 go func() {
72 defer close(respchan)
73 defer close(errchan)
74
75 var key string
76
77 opts := []clientv3.OpOption{clientv3.WithLimit(batchLimit), clientv3.WithRev(s.rev)}
78
79 if len(s.prefix) == 0 {
80
81
82 opts = append(opts, clientv3.WithFromKey())
83 key = "\x00"
84 } else {
85
86
87
88 opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
89 key = s.prefix
90 }
91
92 for {
93 resp, err := s.c.Get(ctx, key, opts...)
94 if err != nil {
95 errchan <- err
96 return
97 }
98
99 respchan <- *resp
100
101 if !resp.More {
102 return
103 }
104
105 key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
106 }
107 }()
108
109 return respchan, errchan
110 }
111
112 func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {
113 if s.rev == 0 {
114 panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?")
115 }
116 return s.c.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev+1))
117 }
118
View as plain text