Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DedicatedChannel attribute + connection refactoring #44

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions .readthedocs.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
version 2
version: 2

build
os ubuntu-22.04
tools
python 3.12
build:
os: "ubuntu-22.04"
tools:
python: "3.12"

sphinx
configuration docs/conf.py
sphinx:
configuration: "docs/conf.py"

python:
install:
- requirements: "docs/requirements.txt"
82 changes: 82 additions & 0 deletions Tapeti.Tests/Client/Controller/DedicatedChannelController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System.Threading;
using System.Threading.Tasks;
using Shouldly;
using Tapeti.Config.Annotations;
using Xunit.Abstractions;

namespace Tapeti.Tests.Client.Controller
{
public class DedicatedChannelWaitMessage
{
}


public class DedicatedChannelNoWaitMessage
{
}


[MessageController]
public class DedicatedChannelController
{
private readonly ITestOutputHelper testOutputHelper;
public const int WaitMessageCount = 10;
public const int NoWaitMessageCount = 10;

private static readonly TaskCompletionSource WaitContinue = new();
private static readonly TaskCompletionSource WaitCompleted = new();
private static long waitCount;

private static readonly TaskCompletionSource NoWaitCompleted = new();
private static long noWaitCount;


[NoBinding]
public static async Task WaitForNoWaitMessages()
{
await NoWaitCompleted.Task;
Interlocked.Read(ref waitCount).ShouldBe(0, "NoWait messages should still be waiting");

WaitContinue.TrySetResult();
}

[NoBinding]
public static async Task WaitForWaitMessages()
{
await WaitCompleted.Task;
}


public DedicatedChannelController(ITestOutputHelper testOutputHelper)
{
this.testOutputHelper = testOutputHelper;
}

[DurableQueue("dedicated.channel.wait")]
[DedicatedChannel]
public async Task WaitMessage(DedicatedChannelWaitMessage message)
{
// To see the issue when the DedicatedChannel attribute is removed
//testOutputHelper.WriteLine("Received wait message");

await WaitContinue.Task;

var count = Interlocked.Increment(ref waitCount);
testOutputHelper.WriteLine($"Handled wait message #{count}");

if (count == WaitMessageCount)
WaitCompleted.TrySetResult();
}


[DurableQueue("dedicated.channel.nowait")]
public void NoWaitMessage(DedicatedChannelNoWaitMessage message)
{
var count = Interlocked.Increment(ref noWaitCount);
testOutputHelper.WriteLine($"Handled no-wait message #{count}");

if (count == NoWaitMessageCount)
NoWaitCompleted.TrySetResult();
}
}
}
100 changes: 100 additions & 0 deletions Tapeti.Tests/Client/Controller/ReconnectController.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;
using Tapeti.Config.Annotations;
using Tapeti.Tests.Helpers;
using Xunit.Abstractions;
using Xunit.Sdk;

namespace Tapeti.Tests.Client.Controller
{
public class ReconnectDurableMessage
{
public int Number;
}


public class ReconnectDurableDedicatedMessage
{
public int Number;
}


public class ReconnectDynamicMessage
{
public int Number;
}


[MessageController]
public class ReconnectController
{
private readonly ITestOutputHelper testOutputHelper;
private static bool durableBlock = true;
private static readonly AsyncAutoResetEvent DurableMessageReceived = new();
private static readonly AsyncAutoResetEvent DurableDedicatedMessageReceived = new();
private static readonly AsyncAutoResetEvent DynamicMessageReceived = new ();


[NoBinding]
public static void SetBlockDurableMessage(bool block)
{
durableBlock = block;
}


[NoBinding]
public static async Task WaitForDurableMessages()
{
await Task.WhenAll(
DurableMessageReceived.WaitAsync(),
DurableDedicatedMessageReceived.WaitAsync()
).WithTimeout(TimeSpan.FromSeconds(10));
}


[NoBinding]
public static async Task WaitForDynamicMessage()
{
await DynamicMessageReceived.WaitAsync().WithTimeout(TimeSpan.FromSeconds(10));
}


public ReconnectController(ITestOutputHelper testOutputHelper)
{
this.testOutputHelper = testOutputHelper;
}


[DurableQueue("reconnect.durable")]
public async Task DurableMessage(ReconnectDurableMessage message, CancellationToken cancellationToken)
{
testOutputHelper.WriteLine($"- Received message {message.Number} in durable queue");
DurableMessageReceived.Set();

if (durableBlock)
await Task.Delay(Timeout.Infinite, cancellationToken);
}


[DurableQueue("reconnect.durable.dedicated")]
[DedicatedChannel]
public async Task DurableMessage(ReconnectDurableDedicatedMessage message, CancellationToken cancellationToken)
{
testOutputHelper.WriteLine($"- Received message {message.Number} in durable queue on dedicated channel");
DurableDedicatedMessageReceived.Set();

if (durableBlock)
await Task.Delay(Timeout.Infinite, cancellationToken);
}


[DynamicQueue("reconnect.dynamic")]
public void NoWaitMessage(ReconnectDynamicMessage message)
{
testOutputHelper.WriteLine($"- Received message {message.Number} in dynamic queue");
DynamicMessageReceived.Set();
}
}
}
112 changes: 102 additions & 10 deletions Tapeti.Tests/Client/ControllerTests.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
using System.Threading.Tasks;
using FluentAssertions;
using System;
using System.Threading.Tasks;
using Shouldly;
using SimpleInjector;
using Tapeti.Config;
using Tapeti.SimpleInjector;
using Tapeti.Tests.Client.Controller;
using Tapeti.Tests.Helpers;
using Tapeti.Tests.Mock;
using Xunit;
using Xunit.Abstractions;
Expand All @@ -15,29 +17,35 @@ namespace Tapeti.Tests.Client
public class ControllerTests : IAsyncLifetime
{
private readonly RabbitMQFixture fixture;
private readonly ITestOutputHelper testOutputHelper;
private readonly Container container = new();

private TapetiConnection? connection;
private RabbitMQFixture.RabbitMQTestProxy proxy = null!;


public ControllerTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper)
{
this.fixture = fixture;
this.testOutputHelper = testOutputHelper;

container.RegisterInstance<ILogger>(new MockLogger(testOutputHelper));
container.RegisterInstance(testOutputHelper);
}


public Task InitializeAsync()
public async Task InitializeAsync()
{
return Task.CompletedTask;
proxy = await fixture.AcquireProxy();
}


public async Task DisposeAsync()
{
if (connection != null)
await connection.DisposeAsync();

proxy.Dispose();
}


Expand All @@ -61,25 +69,109 @@ await connection.GetPublisher().PublishRequest<RequestResponseFilterController,


var handler = await RequestResponseFilterController.ValidResponse.Task;
handler.Should().Be(2);
handler.ShouldBe(2);

var invalidHandler = await Task.WhenAny(RequestResponseFilterController.InvalidResponse.Task, Task.Delay(1000));
invalidHandler.Should().NotBe(RequestResponseFilterController.InvalidResponse.Task);
invalidHandler.ShouldNotBe(RequestResponseFilterController.InvalidResponse.Task);
}


[Fact]
public async Task DedicatedChannel()
{
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
.EnableDeclareDurableQueues()
.RegisterController<DedicatedChannelController>()
.Build();

connection = CreateConnection(config);
await connection!.Subscribe();


var publisher = connection.GetPublisher();
for (var i = 0; i < DedicatedChannelController.WaitMessageCount; i++)
await publisher.Publish(new DedicatedChannelWaitMessage());

for (var i = 0; i < DedicatedChannelController.NoWaitMessageCount; i++)
await publisher.Publish(new DedicatedChannelNoWaitMessage());


await DedicatedChannelController.WaitForNoWaitMessages();
await DedicatedChannelController.WaitForWaitMessages();
}


[Fact]
public async Task Reconnect()
{
var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container))
.EnableDeclareDurableQueues()
.RegisterController<ReconnectController>()
.Build();

connection = CreateConnection(config);

var disconnectedCompletion = new TaskCompletionSource();
var reconnectedCompletion = new TaskCompletionSource();

connection.Disconnected += (_, _) => disconnectedCompletion.TrySetResult();
connection.Reconnected += (_, _) => reconnectedCompletion.TrySetResult();

await connection.Subscribe();


ReconnectController.SetBlockDurableMessage(true);
await connection.GetPublisher().Publish(new ReconnectDurableMessage { Number = 1 });
await connection.GetPublisher().Publish(new ReconnectDurableDedicatedMessage { Number = 1 });
await connection.GetPublisher().Publish(new ReconnectDynamicMessage { Number = 1 });

// Both messages should arrive. The message for the durable queue will not be acked.
testOutputHelper.WriteLine("> Waiting for initial messages");
await Task.WhenAll(ReconnectController.WaitForDurableMessages(), ReconnectController.WaitForDynamicMessage());


testOutputHelper.WriteLine("> Disabling proxy");
proxy.RabbitMQProxy.Enabled = false;
await proxy.RabbitMQProxy.UpdateAsync();

await disconnectedCompletion.Task.WithTimeout(TimeSpan.FromSeconds(60));


testOutputHelper.WriteLine("> Re-enabling proxy");
ReconnectController.SetBlockDurableMessage(false);


proxy.RabbitMQProxy.Enabled = true;
await proxy.RabbitMQProxy.UpdateAsync();

await reconnectedCompletion.Task.WithTimeout(TimeSpan.FromSeconds(60));


// Message in the durable queue should be delivered again
testOutputHelper.WriteLine("> Waiting for durable message redelivery");
await ReconnectController.WaitForDurableMessages();


// Dynamic queue is of course empty but should be recreated
testOutputHelper.WriteLine("> Sending and waiting for dynamic message");
await connection.GetPublisher().Publish(new ReconnectDynamicMessage { Number = 2 });
await ReconnectController.WaitForDynamicMessage();
}


private TapetiConnection CreateConnection(ITapetiConfig config)
private TapetiConnection CreateConnection(ITapetiConfig config, ushort prefetchCount = 1, int? consumerDispatchConcurrency = null)
{
return new TapetiConnection(config)
{
Params = new TapetiConnectionParams
{
HostName = "127.0.0.1",
Port = fixture.RabbitMQPort,
ManagementPort = fixture.RabbitMQManagementPort,
Port = proxy.RabbitMQPort,
ManagementPort = proxy.RabbitMQManagementPort,
Username = RabbitMQFixture.RabbitMQUsername,
Password = RabbitMQFixture.RabbitMQPassword,
PrefetchCount = 1
PrefetchCount = prefetchCount,
ConsumerDispatchConcurrency = consumerDispatchConcurrency ?? Environment.ProcessorCount
}
};
}
Expand Down
Loading