...

Source file src/edge-infra.dev/pkg/edge/datasync/cushion/message.go

Documentation: edge-infra.dev/pkg/edge/datasync/cushion

     1  package cushion
     2  
     3  import (
     4  	"edge-infra.dev/pkg/edge/chariot"
     5  )
     6  
     7  type MessageState int
     8  
     9  const (
    10  	Processing MessageState = iota
    11  	Acked
    12  	Nacked
    13  )
    14  
    15  type Message struct {
    16  	state MessageState
    17  	Rev   string
    18  	Req   *Request
    19  	Msg   chariot.IPubSubMessage
    20  }
    21  
    22  func NewMessage(req *Request, msg chariot.IPubSubMessage) *Message {
    23  	return &Message{state: Processing, Req: req, Msg: msg}
    24  }
    25  
    26  func (m *Message) Ack() {
    27  	if m.state == Processing {
    28  		m.Msg.Ack()
    29  		m.state = Acked
    30  	}
    31  }
    32  
    33  func (m *Message) Nack() {
    34  	if m.state == Processing {
    35  		m.Msg.Nack()
    36  		m.state = Nacked
    37  	}
    38  }
    39  
    40  func (m *Message) NackAndLog(err error) {
    41  	if m.state == Processing {
    42  		nackAndLogPubSubRequest(m.Msg, "", err)
    43  		m.state = Nacked
    44  	}
    45  }
    46  
    47  func (m *Message) Acked() bool {
    48  	return m.state == Acked
    49  }
    50  
    51  func (m *Message) Nacked() bool {
    52  	return m.state == Nacked
    53  }
    54  
    55  func (m *Message) Processed() bool {
    56  	return m.state != Processing
    57  }
    58  
    59  type Messages []*Message
    60  
    61  func (m Messages) Processed() bool {
    62  	for _, msg := range m {
    63  		if msg.state == Processing {
    64  			return false
    65  		}
    66  	}
    67  	return true
    68  }
    69  
    70  // NackAll check the state of all messages and nack if still processes
    71  func (m Messages) NackAll() {
    72  	for _, msg := range m {
    73  		msg.Nack()
    74  	}
    75  }
    76  

View as plain text