Skip to content

Commit dec1ea0

Browse files
authoredOct 11, 2018
Merge pull request #17 from hikhvar/improve-resilience
Improve resilience
2 parents b625d25 + cad31fa commit dec1ea0

File tree

3 files changed

+113
-104
lines changed

3 files changed

+113
-104
lines changed
 

‎main.go

+55-58
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"flag"
56
"fmt"
67
"io/ioutil"
@@ -16,7 +17,6 @@ import (
1617

1718
var (
1819
resultChan = make(chan Result)
19-
abortChan = make(chan bool)
2020
stopWaitLoop = false
2121
tearDownInProgress = false
2222
randomSource = rand.New(rand.NewSource(time.Now().UnixNano()))
@@ -25,16 +25,14 @@ var (
2525
publisherClientIdTemplate = "mqtt-stresser-pub-%s-worker%d-%d"
2626
topicNameTemplate = "internal/mqtt-stresser/%s/worker%d-%d"
2727

28-
opTimeout = 5 * time.Second
29-
3028
errorLogger = log.New(os.Stderr, "ERROR: ", log.Lmicroseconds|log.Ltime|log.Lshortfile)
3129
verboseLogger = log.New(os.Stderr, "DEBUG: ", log.Lmicroseconds|log.Ltime|log.Lshortfile)
3230

3331
argNumClients = flag.Int("num-clients", 10, "Number of concurrent clients")
3432
argNumMessages = flag.Int("num-messages", 10, "Number of messages shipped by client")
35-
argTimeout = flag.String("timeout", "5s", "Timeout for pub/sub loop")
33+
argTimeout = flag.String("timeout", "5s", "Timeout for pub/sub actions")
3634
argGlobalTimeout = flag.String("global-timeout", "60s", "Timeout spanning all operations")
37-
argRampUpSize = flag.Int("rampup-size", 100, "Size of rampup batch")
35+
argRampUpSize = flag.Int("rampup-size", 100, "Size of rampup batch. Default rampup batch size is 100.")
3836
argRampUpDelay = flag.String("rampup-delay", "500ms", "Time between batch rampups")
3937
argTearDownDelay = flag.String("teardown-delay", "5s", "Graceperiod to complete remaining workers")
4038
argBrokerUrl = flag.String("broker", "", "Broker URL")
@@ -47,15 +45,6 @@ var (
4745
argHelp = flag.Bool("help", false, "Show help")
4846
)
4947

50-
type Worker struct {
51-
WorkerId int
52-
BrokerUrl string
53-
Username string
54-
Password string
55-
Nmessages int
56-
Timeout time.Duration
57-
}
58-
5948
type Result struct {
6049
WorkerId int
6150
Event string
@@ -67,6 +56,11 @@ type Result struct {
6756
ErrorMessage error
6857
}
6958

59+
type TimeoutError interface {
60+
Timeout() bool
61+
Error() string
62+
}
63+
7064
func main() {
7165
flag.Parse()
7266

@@ -83,18 +77,23 @@ func main() {
8377

8478
if err != nil {
8579
fmt.Printf("Could not create CPU profile: %s\n", err)
80+
os.Exit(1)
8681
}
8782

8883
if err := pprof.StartCPUProfile(f); err != nil {
8984
fmt.Printf("Could not start CPU profile: %s\n", err)
85+
os.Exit(1)
9086
}
9187
}
9288

9389
num := *argNumMessages
94-
brokerUrl := *argBrokerUrl
9590
username := *argUsername
9691
password := *argPassword
97-
testTimeout, _ := time.ParseDuration(*argTimeout)
92+
actionTimeout, err := time.ParseDuration(*argTimeout)
93+
if err != nil {
94+
fmt.Printf("Could not parse '--timeout': '%s' is not a valid duration string. See https://golang.org/pkg/time/#ParseDuration for valid duration strings\n", *argGlobalTimeout)
95+
os.Exit(1)
96+
}
9897

9998
verboseLogger.SetOutput(ioutil.Discard)
10099
errorLogger.SetOutput(ioutil.Discard)
@@ -107,7 +106,8 @@ func main() {
107106
verboseLogger.SetOutput(os.Stderr)
108107
}
109108

110-
if brokerUrl == "" {
109+
if *argBrokerUrl == "" {
110+
fmt.Println("'--broker' is empty. Abort.")
111111
os.Exit(1)
112112
}
113113

@@ -123,41 +123,49 @@ func main() {
123123

124124
resultChan = make(chan Result, *argNumClients**argNumMessages)
125125

126-
for cid := 0; cid < *argNumClients; cid++ {
126+
globalTimeout, err := time.ParseDuration(*argGlobalTimeout)
127+
if err != nil {
128+
fmt.Printf("Could not parse '--global-timeout': '%s' is not a valid duration string. See https://golang.org/pkg/time/#ParseDuration for valid duration strings\n", *argGlobalTimeout)
129+
os.Exit(1)
130+
}
131+
testCtx, cancelFunc := context.WithTimeout(context.Background(), globalTimeout)
132+
133+
stopStartLoop := false
134+
for cid := 0; cid < *argNumClients && !stopStartLoop; cid++ {
127135

128136
if cid%rampUpSize == 0 && cid > 0 {
129137
fmt.Printf("%d worker started - waiting %s\n", cid, rampUpDelay)
130138
time.Sleep(rampUpDelay)
139+
select {
140+
case <-time.NewTimer(rampUpDelay).C:
141+
case s := <-signalChan:
142+
fmt.Printf("Got signal %s. Cancel test.\n", s.String())
143+
cancelFunc()
144+
stopStartLoop = true
145+
}
131146
}
132147

133148
go (&Worker{
134-
WorkerId: cid,
135-
BrokerUrl: brokerUrl,
136-
Username: username,
137-
Password: password,
138-
Nmessages: num,
139-
Timeout: testTimeout,
140-
}).Run()
149+
WorkerId: cid,
150+
BrokerUrl: *argBrokerUrl,
151+
Username: username,
152+
Password: password,
153+
NumberOfMessages: num,
154+
Timeout: actionTimeout,
155+
}).Run(testCtx)
141156
}
142157
fmt.Printf("%d worker started\n", *argNumClients)
143158

144159
finEvents := 0
145160

146-
timeout := make(chan bool, 1)
147-
globalTimeout, _ := time.ParseDuration(*argGlobalTimeout)
148161
results := make([]Result, *argNumClients)
149162

150-
go func() {
151-
time.Sleep(globalTimeout)
152-
timeout <- true
153-
}()
154-
155163
for finEvents < *argNumClients && !stopWaitLoop {
156164
select {
157165
case msg := <-resultChan:
158166
results[msg.WorkerId] = msg
159167

160-
if msg.Event == "Completed" || msg.Error {
168+
if msg.Event == CompletedEvent || msg.Error {
161169
finEvents++
162170
verboseLogger.Printf("%d/%d events received\n", finEvents, *argNumClients)
163171
}
@@ -167,7 +175,7 @@ func main() {
167175
}
168176

169177
if *argHideProgress == false {
170-
if msg.Event == "Completed" {
178+
if msg.Event == CompletedEvent {
171179
fmt.Print(".")
172180
}
173181

@@ -176,16 +184,19 @@ func main() {
176184
}
177185
}
178186

179-
case <-timeout:
180-
fmt.Println()
181-
fmt.Printf("Aborted because global timeout (%s) was reached.\n", *argGlobalTimeout)
182-
183-
go tearDownWorkers()
184-
case signal := <-signalChan:
185-
fmt.Println()
186-
fmt.Printf("Received %s. Aborting.\n", signal)
187-
188-
go tearDownWorkers()
187+
case <-testCtx.Done():
188+
switch testCtx.Err().(type) {
189+
case TimeoutError:
190+
fmt.Println("Test timeout. Wait 5s to allow disconnection of clients.")
191+
default:
192+
fmt.Println("Test canceled. Wait 5s to allow disconnection of clients.")
193+
}
194+
time.Sleep(5 * time.Second)
195+
stopWaitLoop = true
196+
case s := <-signalChan:
197+
fmt.Printf("Got signal %s. Cancel test.\n", s.String())
198+
cancelFunc()
199+
stopWaitLoop = true
189200
}
190201
}
191202

@@ -217,17 +228,3 @@ func main() {
217228

218229
os.Exit(exitCode)
219230
}
220-
221-
func tearDownWorkers() {
222-
if !tearDownInProgress {
223-
tearDownInProgress = true
224-
225-
close(abortChan)
226-
227-
delay, _ := time.ParseDuration(*argTearDownDelay)
228-
fmt.Printf("Waiting %s for remaining workers\n", delay)
229-
time.Sleep(delay)
230-
231-
stopWaitLoop = true
232-
}
233-
}

‎report.go

+16-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,15 @@ import (
66
"sort"
77
)
88

9+
const (
10+
ConnectFailedEvent = "ConnectFailed"
11+
SubscribeFailedEvent = "SubscribeFailed"
12+
TimeoutExceededEvent = "TimeoutExceeded"
13+
AbortedEvent = "Aborted"
14+
CompletedEvent = "Completed"
15+
ProgressReportEvent = "ProgressReport"
16+
)
17+
918
type Summary struct {
1019
Clients int
1120
TotalMessages int
@@ -60,26 +69,26 @@ func buildSummary(nClient int, nMessages int, results []Result) (Summary, error)
6069
nErrors++
6170

6271
switch r.Event {
63-
case "ConnectFailed":
72+
case ConnectFailedEvent:
6473
nConnectFailed++
65-
case "SubscribeFailed":
74+
case SubscribeFailedEvent:
6675
nSubscribeFailed++
67-
case "TimeoutExceeded":
76+
case TimeoutExceededEvent:
6877
nTimeoutExceeded++
69-
case "Aborted":
78+
case AbortedEvent:
7079
nAborted++
7180
}
7281
}
7382

74-
if r.Event == "Completed" {
83+
if r.Event == CompletedEvent {
7584
nCompleted++
7685
}
7786

78-
if r.Event == "ProgressReport" {
87+
if r.Event == ProgressReportEvent {
7988
nInProgress++
8089
}
8190

82-
if r.Event == "Completed" || r.Event == "ProgressReport" {
91+
if r.Event == CompletedEvent || r.Event == ProgressReportEvent {
8392
publishPerformance = append(publishPerformance, float64(r.MessagesPublished)/r.PublishTime.Seconds())
8493
receivePerformance = append(receivePerformance, float64(r.MessagesReceived)/r.ReceiveTime.Seconds())
8594
}

‎worker.go

+42-39
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
package main
22

33
import (
4+
"context"
45
"fmt"
5-
mqtt "github.com/eclipse/paho.mqtt.golang"
66
"os"
77
"time"
8+
9+
mqtt "github.com/eclipse/paho.mqtt.golang"
810
)
911

10-
func (w *Worker) Run() {
12+
type Worker struct {
13+
WorkerId int
14+
BrokerUrl string
15+
Username string
16+
Password string
17+
NumberOfMessages int
18+
Timeout time.Duration
19+
}
20+
21+
func (w *Worker) Run(ctx context.Context) {
1122
verboseLogger.Printf("[%d] initializing\n", w.WorkerId)
1223

1324
queue := make(chan [2]string)
@@ -20,7 +31,7 @@ func (w *Worker) Run() {
2031
}
2132

2233
topicName := fmt.Sprintf(topicNameTemplate, hostname, w.WorkerId, t)
23-
subscriberClientId := fmt.Sprintf(subscriberClientIdTemplate, hostname, w.WorkerId, t,)
34+
subscriberClientId := fmt.Sprintf(subscriberClientIdTemplate, hostname, w.WorkerId, t)
2435
publisherClientId := fmt.Sprintf(publisherClientIdTemplate, hostname, w.WorkerId, t)
2536

2637
verboseLogger.Printf("[%d] topic=%s subscriberClientId=%s publisherClientId=%s\n", cid, topicName, subscriberClientId, publisherClientId)
@@ -37,21 +48,21 @@ func (w *Worker) Run() {
3748
subscriber := mqtt.NewClient(subscriberOptions)
3849

3950
verboseLogger.Printf("[%d] connecting publisher\n", w.WorkerId)
40-
if token := publisher.Connect(); token.Wait() && token.Error() != nil {
51+
if token := publisher.Connect(); token.WaitTimeout(w.Timeout) && token.Error() != nil {
4152
resultChan <- Result{
4253
WorkerId: w.WorkerId,
43-
Event: "ConnectFailed",
54+
Event: ConnectFailedEvent,
4455
Error: true,
4556
ErrorMessage: token.Error(),
4657
}
4758
return
4859
}
4960

5061
verboseLogger.Printf("[%d] connecting subscriber\n", w.WorkerId)
51-
if token := subscriber.Connect(); token.WaitTimeout(opTimeout) && token.Error() != nil {
62+
if token := subscriber.Connect(); token.WaitTimeout(w.Timeout) && token.Error() != nil {
5263
resultChan <- Result{
5364
WorkerId: w.WorkerId,
54-
Event: "ConnectFailed",
65+
Event: ConnectFailedEvent,
5566
Error: true,
5667
ErrorMessage: token.Error(),
5768
}
@@ -62,7 +73,7 @@ func (w *Worker) Run() {
6273
defer func() {
6374
verboseLogger.Printf("[%d] unsubscribe\n", w.WorkerId)
6475

65-
if token := subscriber.Unsubscribe(topicName); token.WaitTimeout(opTimeout) && token.Error() != nil {
76+
if token := subscriber.Unsubscribe(topicName); token.WaitTimeout(w.Timeout) && token.Error() != nil {
6677
fmt.Println(token.Error())
6778
os.Exit(1)
6879
}
@@ -71,10 +82,10 @@ func (w *Worker) Run() {
7182
}()
7283

7384
verboseLogger.Printf("[%d] subscribing to topic\n", w.WorkerId)
74-
if token := subscriber.Subscribe(topicName, 0, nil); token.WaitTimeout(opTimeout) && token.Error() != nil {
85+
if token := subscriber.Subscribe(topicName, 0, nil); token.WaitTimeout(w.Timeout) && token.Error() != nil {
7586
resultChan <- Result{
7687
WorkerId: w.WorkerId,
77-
Event: "SubscribeFailed",
88+
Event: SubscribeFailedEvent,
7889
Error: true,
7990
ErrorMessage: token.Error(),
8091
}
@@ -84,39 +95,33 @@ func (w *Worker) Run() {
8495

8596
verboseLogger.Printf("[%d] starting control loop %s\n", w.WorkerId, topicName)
8697

87-
timeout := make(chan bool, 1)
8898
stopWorker := false
8999
receivedCount := 0
90100
publishedCount := 0
91101

92102
t0 := time.Now()
93-
for i := 0; i < w.Nmessages; i++ {
103+
for i := 0; i < w.NumberOfMessages; i++ {
94104
text := fmt.Sprintf("this is msg #%d!", i)
95105
token := publisher.Publish(topicName, 0, false, text)
96106
publishedCount++
97-
token.Wait()
107+
token.WaitTimeout(w.Timeout)
98108
}
99109
publisher.Disconnect(5)
100110

101111
publishTime := time.Since(t0)
102112
verboseLogger.Printf("[%d] all messages published\n", w.WorkerId)
103113

104-
go func() {
105-
time.Sleep(w.Timeout)
106-
timeout <- true
107-
}()
108-
109114
t0 = time.Now()
110-
for receivedCount < w.Nmessages && !stopWorker {
115+
for receivedCount < w.NumberOfMessages && !stopWorker {
111116
select {
112117
case <-queue:
113118
receivedCount++
114119

115-
verboseLogger.Printf("[%d] %d/%d received\n", w.WorkerId, receivedCount, w.Nmessages)
116-
if receivedCount == w.Nmessages {
120+
verboseLogger.Printf("[%d] %d/%d received\n", w.WorkerId, receivedCount, w.NumberOfMessages)
121+
if receivedCount == w.NumberOfMessages {
117122
resultChan <- Result{
118123
WorkerId: w.WorkerId,
119-
Event: "Completed",
124+
Event: CompletedEvent,
120125
PublishTime: publishTime,
121126
ReceiveTime: time.Since(t0),
122127
MessagesReceived: receivedCount,
@@ -125,36 +130,34 @@ func (w *Worker) Run() {
125130
} else {
126131
resultChan <- Result{
127132
WorkerId: w.WorkerId,
128-
Event: "ProgressReport",
133+
Event: ProgressReportEvent,
129134
PublishTime: publishTime,
130135
ReceiveTime: time.Since(t0),
131136
MessagesReceived: receivedCount,
132137
MessagesPublished: publishedCount,
133138
}
134139
}
135-
case <-timeout:
136-
verboseLogger.Printf("[%d] timeout!!\n", cid)
137-
stopWorker = true
138-
139-
resultChan <- Result{
140-
WorkerId: w.WorkerId,
141-
Event: "TimeoutExceeded",
142-
PublishTime: publishTime,
143-
MessagesReceived: receivedCount,
144-
MessagesPublished: publishedCount,
145-
Error: true,
140+
case <-ctx.Done():
141+
var event string
142+
var isError bool
143+
switch ctx.Err().(type) {
144+
case TimeoutError:
145+
verboseLogger.Printf("[%d] received abort signal due to test timeout", w.WorkerId)
146+
event = TimeoutExceededEvent
147+
isError = true
148+
default:
149+
verboseLogger.Printf("[%d] received abort signal", w.WorkerId)
150+
event = AbortedEvent
151+
isError = false
146152
}
147-
case <-abortChan:
148-
verboseLogger.Printf("[%d] received abort signal", w.WorkerId)
149153
stopWorker = true
150-
151154
resultChan <- Result{
152155
WorkerId: w.WorkerId,
153-
Event: "Aborted",
156+
Event: event,
154157
PublishTime: publishTime,
155158
MessagesReceived: receivedCount,
156159
MessagesPublished: publishedCount,
157-
Error: false,
160+
Error: isError,
158161
}
159162
}
160163
}

0 commit comments

Comments
 (0)
Please sign in to comment.