...
1 package cushion
2
3 import (
4 "edge-infra.dev/pkg/edge/chariot"
5 )
6
7 type MessageState int
8
9 const (
10 Processing MessageState = iota
11 Acked
12 Nacked
13 )
14
15 type Message struct {
16 state MessageState
17 Rev string
18 Req *Request
19 Msg chariot.IPubSubMessage
20 }
21
22 func NewMessage(req *Request, msg chariot.IPubSubMessage) *Message {
23 return &Message{state: Processing, Req: req, Msg: msg}
24 }
25
26 func (m *Message) Ack() {
27 if m.state == Processing {
28 m.Msg.Ack()
29 m.state = Acked
30 }
31 }
32
33 func (m *Message) Nack() {
34 if m.state == Processing {
35 m.Msg.Nack()
36 m.state = Nacked
37 }
38 }
39
40 func (m *Message) NackAndLog(err error) {
41 if m.state == Processing {
42 nackAndLogPubSubRequest(m.Msg, "", err)
43 m.state = Nacked
44 }
45 }
46
47 func (m *Message) Acked() bool {
48 return m.state == Acked
49 }
50
51 func (m *Message) Nacked() bool {
52 return m.state == Nacked
53 }
54
55 func (m *Message) Processed() bool {
56 return m.state != Processing
57 }
58
59 type Messages []*Message
60
61 func (m Messages) Processed() bool {
62 for _, msg := range m {
63 if msg.state == Processing {
64 return false
65 }
66 }
67 return true
68 }
69
70
71 func (m Messages) NackAll() {
72 for _, msg := range m {
73 msg.Nack()
74 }
75 }
76
View as plain text