...
1 package eventsource
2
3 import (
4 "sort"
5 "sync"
6 )
7
8
9 type SliceRepository struct {
10 events map[string][]Event
11 lock *sync.RWMutex
12 }
13
14
15 func NewSliceRepository() *SliceRepository {
16 return &SliceRepository{
17 events: make(map[string][]Event),
18 lock: &sync.RWMutex{},
19 }
20 }
21
22 func (repo SliceRepository) indexOfEvent(channel, id string) int {
23 return sort.Search(len(repo.events[channel]), func(i int) bool {
24 return repo.events[channel][i].Id() >= id
25 })
26 }
27
28
29 func (repo SliceRepository) Replay(channel, id string) (out chan Event) {
30 out = make(chan Event)
31 go func() {
32 defer close(out)
33 repo.lock.RLock()
34 defer repo.lock.RUnlock()
35 events := repo.events[channel][repo.indexOfEvent(channel, id):]
36 for i := range events {
37 out <- events[i]
38 }
39 }()
40 return
41 }
42
43
44 func (repo *SliceRepository) Add(channel string, event Event) {
45 repo.lock.Lock()
46 defer repo.lock.Unlock()
47 i := repo.indexOfEvent(channel, event.Id())
48 if i < len(repo.events[channel]) && repo.events[channel][i].Id() == event.Id() {
49 repo.events[channel][i] = event
50 } else {
51 repo.events[channel] = append(repo.events[channel][:i], append([]Event{event}, repo.events[channel][i:]...)...)
52 }
53 }
54
View as plain text