forked from tmaxmax/go-sse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreplay_provider.go
248 lines (206 loc) · 5.9 KB
/
replay_provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package sse
import (
"errors"
"strconv"
"time"
)
// NewFiniteReplayProvider creates a finite replay provider with the given max
// count and auto ID behaviour.
//
// Count is the maximum number of events FiniteReplayProvider should hold as
// valid. It must be greater than zero.
//
// AutoIDs configures FiniteReplayProvider to automatically set the IDs of
// events.
func NewFiniteReplayProvider(
count int, autoIDs bool,
) (*FiniteReplayProvider, error) {
if count < 2 {
return nil, errors.New("count must be at least 2")
}
return &FiniteReplayProvider{
cap: count,
buf: make([]messageWithTopics, count),
autoIDs: autoIDs,
}, nil
}
// FiniteReplayProvider is a replay provider that replays at maximum a certain number of events.
// The events must have an ID unless the AutoIDs flag is toggled.
type FiniteReplayProvider struct {
buf []messageWithTopics
cap int
head int
tail int
autoIDs bool
currentID int64
}
// Put puts a message into the provider's buffer. If there are more messages than the maximum
// number, the oldest message is removed.
func (f *FiniteReplayProvider) Put(message *Message, topics []string) *Message {
if len(topics) == 0 {
panic(errors.New(
"go-sse: no topics provided for Message.\n" +
formatMessagePanicString(message)))
}
if f.autoIDs {
f.currentID++
message.ID = ID(strconv.FormatInt(f.currentID, 10))
} else if !message.ID.IsSet() {
panicString := "go-sse: a Message without an ID was given to a provider that doesn't set IDs automatically.\n" + formatMessagePanicString(message)
panic(errors.New(panicString))
}
f.buf[f.tail] = messageWithTopics{message: message, topics: topics}
f.tail++
if f.tail >= f.cap {
f.tail = 0
}
if f.tail == f.head {
f.head = f.tail + 1
if f.head > f.cap {
f.head = 0
}
}
return message
}
// Replay replays the messages in the buffer to the listener.
// It doesn't take into account the messages' expiry times.
func (f *FiniteReplayProvider) Replay(subscription Subscription) error {
if f.head == f.tail {
return nil
}
// Replay head to end and start to tail when head is after tail.
if f.tail < f.head {
foundFirst, err := replay(subscription, f.buf[f.tail:], false)
if err != nil {
return err
}
_, err = replay(subscription, f.buf[0:f.tail], foundFirst)
if err != nil {
return err
}
} else {
_, err := replay(subscription, f.buf[0:f.tail], false)
if err != nil {
return err
}
}
return subscription.Client.Flush()
}
func replay(
sub Subscription, events []messageWithTopics, foundFirstEvent bool,
) (hasFoundFirstEvent bool, err error) {
for _, e := range events {
if !foundFirstEvent && e.message.ID == sub.LastEventID {
foundFirstEvent = true
continue
}
if foundFirstEvent && topicsIntersect(sub.Topics, e.topics) {
if err := sub.Client.Send(e.message); err != nil {
return false, err
}
}
}
return foundFirstEvent, nil
}
// ValidReplayProvider is a ReplayProvider that replays all the buffered non-expired events.
// You can use this provider for replaying an infinite number of events, if the events never
// expire.
// The provider removes any expired events when a new event is put and after at least
// a GCInterval period passed.
// The events must have an ID unless the AutoIDs flag is toggled.
type ValidReplayProvider struct {
// The function used to retrieve the current time. Defaults to time.Now.
// Useful when testing.
Now func() time.Time
lastGC time.Time
b buffer
expiries []time.Time
// TTL is for how long a message is valid, since it was added.
TTL time.Duration
// After how long the ReplayProvider should attempt to clean up expired events.
// By default cleanup is done after a fourth of the TTL has passed; this means
// that messages may be stored for a duration equal to 5/4*TTL. If this is not
// desired, set the GC interval to a value sensible for your use case or set
// it to -1 – this disables automatic cleanup, enabling you to do it manually
// using the GC method.
GCInterval time.Duration
// AutoIDs configures ValidReplayProvider to automatically set the IDs of events.
AutoIDs bool
}
// Put puts the message into the provider's buffer.
func (v *ValidReplayProvider) Put(message *Message, topics []string) *Message {
now := v.now()
if v.b == nil {
v.b = getBuffer(v.AutoIDs, 0)
v.lastGC = now
}
if v.shouldGC(now) {
v.doGC(now)
v.lastGC = now
}
v.expiries = append(v.expiries, v.now().Add(v.TTL))
return v.b.queue(message, topics)
}
func (v *ValidReplayProvider) shouldGC(now time.Time) bool {
if v.GCInterval < 0 {
return false
}
gcInterval := v.GCInterval
if gcInterval == 0 {
gcInterval = v.TTL / 4
}
return now.Sub(v.lastGC) >= gcInterval
}
// GC removes all the expired messages from the provider's buffer.
func (v *ValidReplayProvider) GC() {
if v.b != nil {
v.doGC(v.now())
}
}
func (v *ValidReplayProvider) doGC(now time.Time) {
for {
e := v.b.front()
if e == nil || v.expiries[0].After(now) {
break
}
v.b.dequeue()
v.expiries = v.expiries[1:]
}
}
// Replay replays all the valid messages to the listener.
func (v *ValidReplayProvider) Replay(subscription Subscription) error {
if v.b == nil {
return nil
}
events := v.b.slice(subscription.LastEventID)
if len(events) == 0 {
return nil
}
now := v.now()
expiriesOffset := v.b.len() - len(events)
for i, e := range events {
if v.expiries[i+expiriesOffset].After(now) && topicsIntersect(subscription.Topics, e.topics) {
if err := subscription.Client.Send(e.message); err != nil {
return err
}
}
}
return subscription.Client.Flush()
}
func (v *ValidReplayProvider) now() time.Time {
if v.Now == nil {
return time.Now()
}
return v.Now()
}
// topicsIntersect returns true if the given topic slices have at least one topic in common.
func topicsIntersect(a, b []string) bool {
for _, at := range a {
for _, bt := range b {
if at == bt {
return true
}
}
}
return false
}