...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package recipe
16
17 import (
18 "context"
19
20 "go.etcd.io/etcd/api/v3/mvccpb"
21 v3 "go.etcd.io/etcd/client/v3"
22 )
23
24
25 type Queue struct {
26 client *v3.Client
27 ctx context.Context
28
29 keyPrefix string
30 }
31
32 func NewQueue(client *v3.Client, keyPrefix string) *Queue {
33 return &Queue{client, context.TODO(), keyPrefix}
34 }
35
36 func (q *Queue) Enqueue(val string) error {
37 _, err := newUniqueKV(q.client, q.keyPrefix, val)
38 return err
39 }
40
41
42
43 func (q *Queue) Dequeue() (string, error) {
44
45 resp, err := q.client.Get(q.ctx, q.keyPrefix, v3.WithFirstRev()...)
46 if err != nil {
47 return "", err
48 }
49
50 kv, err := claimFirstKey(q.client, resp.Kvs)
51 if err != nil {
52 return "", err
53 } else if kv != nil {
54 return string(kv.Value), nil
55 } else if resp.More {
56
57 return q.Dequeue()
58 }
59
60
61 ev, err := WaitPrefixEvents(
62 q.client,
63 q.keyPrefix,
64 resp.Header.Revision,
65 []mvccpb.Event_EventType{mvccpb.PUT})
66 if err != nil {
67 return "", err
68 }
69
70 ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
71 if err != nil {
72 return "", err
73 } else if !ok {
74 return q.Dequeue()
75 }
76 return string(ev.Kv.Value), err
77 }
78
View as plain text