...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package recipe
16
17 import (
18 "context"
19 "fmt"
20
21 "go.etcd.io/etcd/api/v3/mvccpb"
22 v3 "go.etcd.io/etcd/client/v3"
23 )
24
25
26 type PriorityQueue struct {
27 client *v3.Client
28 ctx context.Context
29 key string
30 }
31
32
33 func NewPriorityQueue(client *v3.Client, key string) *PriorityQueue {
34 return &PriorityQueue{client, context.TODO(), key + "/"}
35 }
36
37
38 func (q *PriorityQueue) Enqueue(val string, pr uint16) error {
39 prefix := fmt.Sprintf("%s%05d", q.key, pr)
40 _, err := newSequentialKV(q.client, prefix, val)
41 return err
42 }
43
44
45
46 func (q *PriorityQueue) Dequeue() (string, error) {
47
48 resp, err := q.client.Get(q.ctx, q.key, v3.WithFirstKey()...)
49 if err != nil {
50 return "", err
51 }
52
53 kv, err := claimFirstKey(q.client, resp.Kvs)
54 if err != nil {
55 return "", err
56 } else if kv != nil {
57 return string(kv.Value), nil
58 } else if resp.More {
59
60 return q.Dequeue()
61 }
62
63
64 ev, err := WaitPrefixEvents(
65 q.client,
66 q.key,
67 resp.Header.Revision,
68 []mvccpb.Event_EventType{mvccpb.PUT})
69 if err != nil {
70 return "", err
71 }
72
73 ok, err := deleteRevKey(q.client, string(ev.Kv.Key), ev.Kv.ModRevision)
74 if err != nil {
75 return "", err
76 } else if !ok {
77 return q.Dequeue()
78 }
79 return string(ev.Kv.Value), err
80 }
81
View as plain text