-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevents_test.go
125 lines (96 loc) · 2.74 KB
/
events_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package web
import (
"math/rand"
"sync"
"testing"
"time"
)
// run COUNT goroutines to read messages, every 0..INTERVAL
const READER_COUNT = 100
const READER_INTERVAL = 0.01 * float32(time.Second)
// ...which will each run for 0..TIME
const READER_TIME = 1.0 * float32(time.Second)
// ...and may delay processing of each event by 0..DELAY
const READER_DELAY = 0.01 * float32(time.Second)
// run COUNT goroutines to write messages...
const WRITER_COUNT = 5
// ...which will each write COUNT events at an interval of 0..1ms
const EVENT_INTERVAL = 0.001 * float32(time.Second)
const EVENT_COUNT = 1000
type testEvent struct {
writer int
}
type testEvents struct {
writeGroup sync.WaitGroup
waitGroup sync.WaitGroup
eventChan chan Event
events Events
}
// write events while sleeping
func (test *testEvents) writer(t *testing.T, i int) {
defer test.writeGroup.Done()
for count := 0; count <= EVENT_COUNT; count++ {
time.Sleep(time.Duration(rand.Float32() * float32(EVENT_INTERVAL)))
event := testEvent{writer: i}
t.Logf("writer %d: write %v", i, event)
test.eventChan <- event
}
}
// read events while sleeping
func (test *testEvents) reader(t *testing.T, eventsClient eventsClient) {
defer test.waitGroup.Done()
var count = 0
var startTime = time.Now()
var stopChan = time.After(time.Duration(rand.Float32() * READER_TIME))
for {
select {
case event, ok := <-eventsClient:
if !ok {
t.Logf("reader %p: closed @ %d messages after %v", eventsClient, count, time.Now().Sub(startTime))
return
} else {
t.Logf("reader %p: read %v", eventsClient, event)
count++
// sleep 0..10ms while processing to trigger eventChan overflows
time.Sleep(time.Duration(rand.ExpFloat64() * float64(READER_DELAY)))
}
case stopTime := <-stopChan:
t.Logf("reader %p: stop @ %d messages after %v", eventsClient, count, stopTime.Sub(startTime))
test.events.stop(eventsClient)
return
}
}
}
func TestEvents(t *testing.T) {
var test = testEvents{
eventChan: make(chan Event),
}
test.events = MakeEvents(EventConfig{EventPush: test.eventChan})
// add followers
test.waitGroup.Add(1)
go func() {
defer test.waitGroup.Done()
for count := 0; count <= READER_COUNT; count++ {
time.Sleep(time.Duration(rand.Float32() * READER_INTERVAL))
_, eventsClient := test.events.listen()
test.waitGroup.Add(1)
go test.reader(t, eventsClient)
}
}()
// add writers
test.waitGroup.Add(1)
go func() {
defer test.waitGroup.Done()
t.Log("Starting writers...")
for i := 1; i < WRITER_COUNT; i++ {
test.writeGroup.Add(1)
go test.writer(t, i)
}
t.Log("Waiting writers...")
test.writeGroup.Wait()
t.Log("Completed writers...")
//close(test.eventChan)
}()
t.Log("Running...")
test.waitGroup.Wait()
}