...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "fmt"
19 "time"
20
21 ipubsub "cloud.google.com/go/internal/pubsub"
22 pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
23 )
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 type Message = ipubsub.Message
45
46
47 func msgAckHandler(m *Message, eod bool) (*psAckHandler, bool) {
48 ackh, ok := ipubsub.MessageAckHandler(m).(*psAckHandler)
49 ackh.exactlyOnceDelivery = eod
50 return ackh, ok
51 }
52
53 func msgAckID(m *Message) string {
54 if ackh, ok := msgAckHandler(m, false); ok {
55 return ackh.ackID
56 }
57 return ""
58 }
59
60
61 type iterDoneFunc func(string, bool, *AckResult, time.Time)
62
63 func convertMessages(rms []*pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDoneFunc) ([]*Message, error) {
64 msgs := make([]*Message, 0, len(rms))
65 for i, m := range rms {
66 msg, err := toMessage(m, receiveTime, doneFunc)
67 if err != nil {
68 return nil, fmt.Errorf("pubsub: cannot decode the retrieved message at index: %d, message: %+v", i, m)
69 }
70 msgs = append(msgs, msg)
71 }
72 return msgs, nil
73 }
74
75 func toMessage(resp *pb.ReceivedMessage, receiveTime time.Time, doneFunc iterDoneFunc) (*Message, error) {
76 ackh := &psAckHandler{ackID: resp.AckId}
77 msg := ipubsub.NewMessage(ackh)
78 if resp.Message == nil {
79 return msg, nil
80 }
81
82 pubTime := resp.Message.PublishTime.AsTime()
83
84 var deliveryAttempt *int
85 if resp.DeliveryAttempt > 0 {
86 da := int(resp.DeliveryAttempt)
87 deliveryAttempt = &da
88 }
89
90 msg.Data = resp.Message.Data
91 msg.Attributes = resp.Message.Attributes
92 msg.ID = resp.Message.MessageId
93 msg.PublishTime = pubTime
94 msg.DeliveryAttempt = deliveryAttempt
95 msg.OrderingKey = resp.Message.OrderingKey
96 ackh.receiveTime = receiveTime
97 ackh.doneFunc = doneFunc
98 ackh.ackResult = ipubsub.NewAckResult()
99 return msg, nil
100 }
101
102
103
104
105
106
107
108
109
110
111 type AckResult = ipubsub.AckResult
112
113
114 type AcknowledgeStatus = ipubsub.AcknowledgeStatus
115
116 const (
117
118 AcknowledgeStatusSuccess AcknowledgeStatus = iota
119
120 AcknowledgeStatusPermissionDenied
121
122 AcknowledgeStatusFailedPrecondition
123
124 AcknowledgeStatusInvalidAckID
125
126 AcknowledgeStatusOther
127 )
128
129
130 type psAckHandler struct {
131
132 ackID string
133
134
135 receiveTime time.Time
136
137 calledDone bool
138
139
140 doneFunc iterDoneFunc
141
142
143
144 ackResult *AckResult
145
146
147
148 exactlyOnceDelivery bool
149 }
150
151 func (ah *psAckHandler) OnAck() {
152 ah.done(true)
153 }
154
155 func (ah *psAckHandler) OnNack() {
156 ah.done(false)
157 }
158
159 func (ah *psAckHandler) OnAckWithResult() *AckResult {
160
161 ah.done(true)
162 if !ah.exactlyOnceDelivery {
163 return newSuccessAckResult()
164 }
165 return ah.ackResult
166 }
167
168 func (ah *psAckHandler) OnNackWithResult() *AckResult {
169
170 ah.done(false)
171 if !ah.exactlyOnceDelivery {
172 return newSuccessAckResult()
173 }
174 return ah.ackResult
175 }
176
177 func (ah *psAckHandler) done(ack bool) {
178 if ah.calledDone {
179 return
180 }
181 ah.calledDone = true
182 if ah.doneFunc != nil {
183 ah.doneFunc(ah.ackID, ack, ah.ackResult, ah.receiveTime)
184 }
185 }
186
187
188 func newSuccessAckResult() *AckResult {
189 ar := ipubsub.NewAckResult()
190 ipubsub.SetAckResult(ar, AcknowledgeStatusSuccess, nil)
191 return ar
192 }
193
View as plain text