...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package concurrency
16
17 import (
18 "context"
19 "fmt"
20
21 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
22 "go.etcd.io/etcd/api/v3/mvccpb"
23 v3 "go.etcd.io/etcd/client/v3"
24 )
25
26 func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
27 cctx, cancel := context.WithCancel(ctx)
28 defer cancel()
29
30 var wr v3.WatchResponse
31 wch := client.Watch(cctx, key, v3.WithRev(rev))
32 for wr = range wch {
33 for _, ev := range wr.Events {
34 if ev.Type == mvccpb.DELETE {
35 return nil
36 }
37 }
38 }
39 if err := wr.Err(); err != nil {
40 return err
41 }
42 if err := ctx.Err(); err != nil {
43 return err
44 }
45 return fmt.Errorf("lost watcher waiting for delete")
46 }
47
48
49
50 func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
51 getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
52 for {
53 resp, err := client.Get(ctx, pfx, getOpts...)
54 if err != nil {
55 return nil, err
56 }
57 if len(resp.Kvs) == 0 {
58 return resp.Header, nil
59 }
60 lastKey := string(resp.Kvs[0].Key)
61 if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
62 return nil, err
63 }
64 }
65 }
66
View as plain text