...
1 package eventsource
2
3 import (
4 "sort"
5 "sync"
6 )
7
8
9 type SliceRepository struct {
10 events map[string][]Event
11 lock sync.RWMutex
12 }
13
14 func NewSliceRepository() *SliceRepository {
15 return &SliceRepository{
16 events: make(map[string][]Event),
17 }
18 }
19
20 func (repo SliceRepository) indexOfEvent(channel, id string) int {
21 return sort.Search(len(repo.events[channel]), func(i int) bool {
22 return repo.events[channel][i].Id() >= id
23 })
24 }
25
26 func (repo SliceRepository) Replay(channel, id string) (out chan Event) {
27 out = make(chan Event)
28 go func() {
29 defer close(out)
30 repo.lock.RLock()
31 defer repo.lock.RUnlock()
32 events := repo.events[channel][repo.indexOfEvent(channel, id):]
33 for i := range events {
34 out <- events[i]
35 }
36 }()
37 return
38 }
39
40 func (repo *SliceRepository) Add(channel string, event Event) {
41 repo.lock.Lock()
42 defer repo.lock.Unlock()
43 i := repo.indexOfEvent(channel, event.Id())
44 if i < len(repo.events[channel]) && repo.events[channel][i].Id() == event.Id() {
45 repo.events[channel][i] = event
46 } else {
47 repo.events[channel] = append(repo.events[channel][:i], append([]Event{event}, repo.events[channel][i:]...)...)
48 }
49 return
50 }
51
View as plain text