-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathReconnectController.cs
100 lines (77 loc) · 2.83 KB
/
ReconnectController.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
using System;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;
using Tapeti.Config.Annotations;
using Tapeti.Tests.Helpers;
using Xunit.Abstractions;
using Xunit.Sdk;
namespace Tapeti.Tests.Client.Controller
{
public class ReconnectDurableMessage
{
public int Number;
}
public class ReconnectDurableDedicatedMessage
{
public int Number;
}
public class ReconnectDynamicMessage
{
public int Number;
}
[MessageController]
public class ReconnectController
{
private readonly ITestOutputHelper testOutputHelper;
private static bool durableBlock = true;
private static readonly AsyncAutoResetEvent DurableMessageReceived = new();
private static readonly AsyncAutoResetEvent DurableDedicatedMessageReceived = new();
private static readonly AsyncAutoResetEvent DynamicMessageReceived = new ();
[NoBinding]
public static void SetBlockDurableMessage(bool block)
{
durableBlock = block;
}
[NoBinding]
public static async Task WaitForDurableMessages()
{
await Task.WhenAll(
DurableMessageReceived.WaitAsync(),
DurableDedicatedMessageReceived.WaitAsync()
).WithTimeout(TimeSpan.FromSeconds(10));
}
[NoBinding]
public static async Task WaitForDynamicMessage()
{
await DynamicMessageReceived.WaitAsync().WithTimeout(TimeSpan.FromSeconds(10));
}
public ReconnectController(ITestOutputHelper testOutputHelper)
{
this.testOutputHelper = testOutputHelper;
}
[DurableQueue("reconnect.durable")]
public async Task DurableMessage(ReconnectDurableMessage message, CancellationToken cancellationToken)
{
testOutputHelper.WriteLine($"- Received message {message.Number} in durable queue");
DurableMessageReceived.Set();
if (durableBlock)
await Task.Delay(Timeout.Infinite, cancellationToken);
}
[DurableQueue("reconnect.durable.dedicated")]
[DedicatedChannel]
public async Task DurableMessage(ReconnectDurableDedicatedMessage message, CancellationToken cancellationToken)
{
testOutputHelper.WriteLine($"- Received message {message.Number} in durable queue on dedicated channel");
DurableDedicatedMessageReceived.Set();
if (durableBlock)
await Task.Delay(Timeout.Infinite, cancellationToken);
}
[DynamicQueue("reconnect.dynamic")]
public void NoWaitMessage(ReconnectDynamicMessage message)
{
testOutputHelper.WriteLine($"- Received message {message.Number} in dynamic queue");
DynamicMessageReceived.Set();
}
}
}