...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package pubsub
19
20 import (
21 "sync"
22 "time"
23 )
24
25 var (
26 dmu sync.Mutex
27 msgTraces = map[string][]Event{}
28 ackIDToMsgID = map[string]string{}
29 )
30
31 type Event struct {
32 Desc string
33 At time.Time
34 }
35
36 func MessageEvents(msgID string) []Event {
37 dmu.Lock()
38 defer dmu.Unlock()
39 return msgTraces[msgID]
40 }
41
42 func addRecv(msgID, ackID string, t time.Time) {
43 dmu.Lock()
44 defer dmu.Unlock()
45 ackIDToMsgID[ackID] = msgID
46 addEvent(msgID, "recv", t)
47 }
48
49 func addAcks(ackIDs []string) {
50 dmu.Lock()
51 defer dmu.Unlock()
52 now := time.Now()
53 for _, id := range ackIDs {
54 addEvent(ackIDToMsgID[id], "ack", now)
55 }
56 }
57
58 func addModAcks(ackIDs []string, deadlineSecs int32) {
59 dmu.Lock()
60 defer dmu.Unlock()
61 desc := "modack"
62 if deadlineSecs == 0 {
63 desc = "nack"
64 }
65 now := time.Now()
66 for _, id := range ackIDs {
67 addEvent(ackIDToMsgID[id], desc, now)
68 }
69 }
70
71 func addEvent(msgID, desc string, t time.Time) {
72 msgTraces[msgID] = append(msgTraces[msgID], Event{desc, t})
73 }
74
View as plain text