package cushion import ( "edge-infra.dev/pkg/edge/chariot" ) type MessageState int const ( Processing MessageState = iota Acked Nacked ) type Message struct { state MessageState Rev string Req *Request Msg chariot.IPubSubMessage } func NewMessage(req *Request, msg chariot.IPubSubMessage) *Message { return &Message{state: Processing, Req: req, Msg: msg} } func (m *Message) Ack() { if m.state == Processing { m.Msg.Ack() m.state = Acked } } func (m *Message) Nack() { if m.state == Processing { m.Msg.Nack() m.state = Nacked } } func (m *Message) NackAndLog(err error) { if m.state == Processing { nackAndLogPubSubRequest(m.Msg, "", err) m.state = Nacked } } func (m *Message) Acked() bool { return m.state == Acked } func (m *Message) Nacked() bool { return m.state == Nacked } func (m *Message) Processed() bool { return m.state != Processing } type Messages []*Message func (m Messages) Processed() bool { for _, msg := range m { if msg.state == Processing { return false } } return true } // NackAll check the state of all messages and nack if still processes func (m Messages) NackAll() { for _, msg := range m { msg.Nack() } }