From c7f5d365ef5f85f7b467a367979194baf1a83660 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 10 Feb 2025 13:33:05 +0100 Subject: [PATCH 1/8] Added support for dedicated consumer channels - Refactored connection - Replaced FluentAssertions with Shouldly --- .../Controller/DedicatedChannelController.cs | 82 ++++ Tapeti.Tests/Client/ControllerTests.cs | 40 +- Tapeti.Tests/Client/TapetiClientTests.cs | 17 +- Tapeti.Tests/Config/QueueArgumentsTest.cs | 36 +- Tapeti.Tests/Config/SimpleControllerTest.cs | 8 +- .../TypeNameRoutingKeyStrategyTests.cs | 4 +- .../Helpers/ConnectionStringParserTest.cs | 14 +- Tapeti.Tests/Helpers/ExpressionInvokerTest.cs | 22 +- Tapeti.Tests/Tapeti.Tests.csproj | 2 +- Tapeti.Transient/TransientGenericBinding.cs | 3 + Tapeti.sln.DotSettings | 1 + .../Annotations/DedicatedChannelAttribute.cs | 21 + Tapeti/Config/IBinding.cs | 10 + Tapeti/Connection/IConnectionEventListener.cs | 4 +- Tapeti/Connection/ITapetiClient.cs | 51 ++- Tapeti/Connection/TapetiChannel.cs | 41 +- Tapeti/Connection/TapetiClient.cs | 420 +++++------------- Tapeti/Connection/TapetiClientConnection.cs | 303 +++++++++++++ Tapeti/Connection/TapetiConsumer.cs | 1 + Tapeti/Connection/TapetiSubscriber.cs | 17 +- Tapeti/Default/ControllerMethodBinding.cs | 12 + Tapeti/TapetiConfigControllers.cs | 4 +- Tapeti/TapetiConnection.cs | 5 +- 23 files changed, 712 insertions(+), 406 deletions(-) create mode 100644 Tapeti.Tests/Client/Controller/DedicatedChannelController.cs create mode 100644 Tapeti/Config/Annotations/DedicatedChannelAttribute.cs create mode 100644 Tapeti/Connection/TapetiClientConnection.cs diff --git a/Tapeti.Tests/Client/Controller/DedicatedChannelController.cs b/Tapeti.Tests/Client/Controller/DedicatedChannelController.cs new file mode 100644 index 0000000..d7b7bba --- /dev/null +++ b/Tapeti.Tests/Client/Controller/DedicatedChannelController.cs @@ -0,0 +1,82 @@ +using System.Threading; +using System.Threading.Tasks; +using Shouldly; +using Tapeti.Config.Annotations; +using Xunit.Abstractions; + +namespace Tapeti.Tests.Client.Controller +{ + public class DedicatedChannelWaitMessage + { + } + + + public class DedicatedChannelNoWaitMessage + { + } + + + [MessageController] + public class DedicatedChannelController + { + private readonly ITestOutputHelper testOutputHelper; + public const int WaitMessageCount = 10; + public const int NoWaitMessageCount = 10; + + private static readonly TaskCompletionSource WaitContinue = new(); + private static readonly TaskCompletionSource WaitCompleted = new(); + private static long waitCount; + + private static readonly TaskCompletionSource NoWaitCompleted = new(); + private static long noWaitCount; + + + [NoBinding] + public static async Task WaitForNoWaitMessages() + { + await NoWaitCompleted.Task; + Interlocked.Read(ref waitCount).ShouldBe(0, "NoWait messages should still be waiting"); + + WaitContinue.TrySetResult(); + } + + [NoBinding] + public static async Task WaitForWaitMessages() + { + await WaitCompleted.Task; + } + + + public DedicatedChannelController(ITestOutputHelper testOutputHelper) + { + this.testOutputHelper = testOutputHelper; + } + + [DurableQueue("dedicated.channel.wait")] + [DedicatedChannel] + public async Task WaitMessage(DedicatedChannelWaitMessage message) + { + // To see the issue when the DedicatedChannel attribute is removed + //testOutputHelper.WriteLine("Received wait message"); + + await WaitContinue.Task; + + var count = Interlocked.Increment(ref waitCount); + testOutputHelper.WriteLine($"Handled wait message #{count}"); + + if (count == WaitMessageCount) + WaitCompleted.TrySetResult(); + } + + + [DurableQueue("dedicated.channel.nowait")] + public void NoWaitMessage(DedicatedChannelNoWaitMessage message) + { + var count = Interlocked.Increment(ref noWaitCount); + testOutputHelper.WriteLine($"Handled no-wait message #{count}"); + + if (count == NoWaitMessageCount) + NoWaitCompleted.TrySetResult(); + } + } +} diff --git a/Tapeti.Tests/Client/ControllerTests.cs b/Tapeti.Tests/Client/ControllerTests.cs index dacaba0..2baa7f8 100644 --- a/Tapeti.Tests/Client/ControllerTests.cs +++ b/Tapeti.Tests/Client/ControllerTests.cs @@ -1,5 +1,6 @@ -using System.Threading.Tasks; -using FluentAssertions; +using System; +using System.Threading.Tasks; +using Shouldly; using SimpleInjector; using Tapeti.Config; using Tapeti.SimpleInjector; @@ -25,6 +26,7 @@ public ControllerTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelp this.fixture = fixture; container.RegisterInstance(new MockLogger(testOutputHelper)); + container.RegisterInstance(testOutputHelper); } @@ -61,14 +63,39 @@ await connection.GetPublisher().PublishRequest() + .Build(); + + connection = CreateConnection(config, 50, 2); + await connection!.Subscribe(); + + + var publisher = connection.GetPublisher(); + for (var i = 0; i < DedicatedChannelController.WaitMessageCount; i++) + await publisher.Publish(new DedicatedChannelWaitMessage()); + + for (var i = 0; i < DedicatedChannelController.NoWaitMessageCount; i++) + await publisher.Publish(new DedicatedChannelNoWaitMessage()); + + + await DedicatedChannelController.WaitForNoWaitMessages(); + await DedicatedChannelController.WaitForWaitMessages(); + } + + + private TapetiConnection CreateConnection(ITapetiConfig config, ushort prefetchCount = 1, int? consumerDispatchConcurrency = null) { return new TapetiConnection(config) { @@ -79,7 +106,8 @@ private TapetiConnection CreateConnection(ITapetiConfig config) ManagementPort = fixture.RabbitMQManagementPort, Username = RabbitMQFixture.RabbitMQUsername, Password = RabbitMQFixture.RabbitMQPassword, - PrefetchCount = 1 + PrefetchCount = prefetchCount, + ConsumerDispatchConcurrency = consumerDispatchConcurrency ?? Environment.ProcessorCount } }; } diff --git a/Tapeti.Tests/Client/TapetiClientTests.cs b/Tapeti.Tests/Client/TapetiClientTests.cs index d3a7112..3a3159b 100644 --- a/Tapeti.Tests/Client/TapetiClientTests.cs +++ b/Tapeti.Tests/Client/TapetiClientTests.cs @@ -2,8 +2,8 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -using FluentAssertions; using RabbitMQ.Client; +using Shouldly; using Tapeti.Connection; using Tapeti.Default; using Tapeti.Exceptions; @@ -49,8 +49,8 @@ public async Task DisposeAsync() [Fact] public void Fixture() { - fixture.RabbitMQPort.Should().BeGreaterThan(0); - fixture.RabbitMQManagementPort.Should().BeGreaterThan(0); + ((int)fixture.RabbitMQPort).ShouldBeGreaterThan(0); + ((int)fixture.RabbitMQManagementPort).ShouldBeGreaterThan(0); } @@ -58,7 +58,7 @@ public void Fixture() public async Task DynamicQueueDeclareNoPrefix() { var queueName = await client.DynamicQueueDeclare(null, null, CancellationToken.None); - queueName.Should().NotBeNullOrEmpty(); + queueName.ShouldNotBeNullOrEmpty(); } @@ -66,7 +66,7 @@ public async Task DynamicQueueDeclareNoPrefix() public async Task DynamicQueueDeclarePrefix() { var queueName = await client.DynamicQueueDeclare("dynamicprefix", null, CancellationToken.None); - queueName.Should().StartWith("dynamicprefix"); + queueName.ShouldStartWith("dynamicprefix"); } @@ -85,7 +85,7 @@ public async Task DurableQueueDeclareIncompatibleArguments() rabbitmqClient.Close(); - ok.Should().NotBeNull(); + ok.ShouldNotBeNull(); await client.DurableQueueDeclare("incompatibleargs", new QueueBinding[] @@ -115,7 +115,7 @@ public async Task PublishHandleOverflow() var publishOverMaxLength = () => client.Publish(body, properties, null, queue1, true); - await publishOverMaxLength.Should().ThrowAsync(); + await publishOverMaxLength.ShouldThrowAsync(); // The channel should recover and allow further publishing await client.Publish(body, properties, null, queue2, true); @@ -152,7 +152,8 @@ private TapetiClient CreateClient() Username = RabbitMQFixture.RabbitMQUsername, Password = RabbitMQFixture.RabbitMQPassword, PrefetchCount = 50 - }); + }, + null); } } } \ No newline at end of file diff --git a/Tapeti.Tests/Config/QueueArgumentsTest.cs b/Tapeti.Tests/Config/QueueArgumentsTest.cs index d8d0502..14664f7 100644 --- a/Tapeti.Tests/Config/QueueArgumentsTest.cs +++ b/Tapeti.Tests/Config/QueueArgumentsTest.cs @@ -4,9 +4,8 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -using FluentAssertions; -using FluentAssertions.Execution; using Moq; +using Shouldly; using Tapeti.Config.Annotations; using Tapeti.Config; using Tapeti.Connection; @@ -18,7 +17,7 @@ internal static class UTF8StringExtensions { public static string AsUTF8String(this object value) { - value.Should().BeOfType(); + value.ShouldBeOfType(); return Encoding.UTF8.GetString((byte[])value); } } @@ -85,10 +84,10 @@ public async Task SingleQueueArguments() var config = GetControllerConfig(); var binding1 = config.Bindings.Single(b => b is IControllerMethodBinding { Method.Name: "HandleMessage1" }); - binding1.Should().NotBeNull(); + binding1.ShouldNotBeNull(); var binding2 = config.Bindings.Single(b => b is IControllerMethodBinding { Method.Name: "HandleMessage2" }); - binding2.Should().NotBeNull(); + binding2.ShouldNotBeNull(); @@ -96,15 +95,15 @@ public async Task SingleQueueArguments() await subscriber.ApplyBindings(); - declaredQueues.Should().HaveCount(1); + declaredQueues.Count.ShouldBe(1); var arguments = declaredQueues["queue-1"]; - arguments.Should().ContainKey("x-custom").WhoseValue.AsUTF8String().Should().Be("custom value"); - arguments.Should().ContainKey("x-another").WhoseValue.Should().Be(true); - arguments.Should().ContainKey("x-max-length").WhoseValue.Should().Be(100); - arguments.Should().ContainKey("x-max-length-bytes").WhoseValue.Should().Be(100000); - arguments.Should().ContainKey("x-message-ttl").WhoseValue.Should().Be(4269); - arguments.Should().ContainKey("x-overflow").WhoseValue.AsUTF8String().Should().Be("reject-publish"); + arguments["x-custom"].AsUTF8String().ShouldBe("custom value"); + arguments["x-another"].ShouldBe(true); + arguments["x-max-length"].ShouldBe(100); + arguments["x-max-length-bytes"].ShouldBe(100000); + arguments["x-message-ttl"].ShouldBe(4269); + arguments["x-overflow"].AsUTF8String().ShouldBe("reject-publish"); } @@ -116,13 +115,13 @@ public async Task ConflictingDynamicQueueArguments() var subscriber = new TapetiSubscriber(() => client.Object, config); await subscriber.ApplyBindings(); - declaredQueues.Should().HaveCount(2); + declaredQueues.Count.ShouldBe(2); var arguments1 = declaredQueues["queue-1"]; - arguments1.Should().ContainKey("x-max-length").WhoseValue.Should().Be(100); + arguments1["x-max-length"].ShouldBe(100); var arguments2 = declaredQueues["queue-2"]; - arguments2.Should().ContainKey("x-max-length-bytes").WhoseValue.Should().Be(100000); + arguments2["x-max-length-bytes"].ShouldBe(100000); } @@ -137,11 +136,8 @@ public async Task ConflictingDurableQueueArguments() return subscriber.ApplyBindings(); }; - using (new AssertionScope()) - { - await testApplyBindings.Should().ThrowAsync(); - declaredQueues.Should().HaveCount(0); - } + await testApplyBindings.ShouldThrowAsync(); + declaredQueues.Count.ShouldBe(0); } diff --git a/Tapeti.Tests/Config/SimpleControllerTest.cs b/Tapeti.Tests/Config/SimpleControllerTest.cs index f909b79..4672da2 100644 --- a/Tapeti.Tests/Config/SimpleControllerTest.cs +++ b/Tapeti.Tests/Config/SimpleControllerTest.cs @@ -1,5 +1,5 @@ using System.Linq; -using FluentAssertions; +using Shouldly; using Tapeti.Config.Annotations; using Tapeti.Config; using Xunit; @@ -12,18 +12,18 @@ public class SimpleControllerTest : BaseControllerTest public void RegisterController() { var bindings = GetControllerBindings(); - bindings.Should().HaveCount(2); + bindings.Count.ShouldBe(2); var handleSimpleMessageBinding = bindings.Single(b => b is IControllerMethodBinding cmb && cmb.Controller == typeof(TestController) && cmb.Method.Name == "HandleSimpleMessage"); - handleSimpleMessageBinding.QueueType.Should().Be(QueueType.Dynamic); + handleSimpleMessageBinding.QueueType.ShouldBe(QueueType.Dynamic); var handleSimpleMessageStaticBinding = bindings.Single(b => b is IControllerMethodBinding cmb && cmb.Controller == typeof(TestController) && cmb.Method.Name == "HandleSimpleMessageStatic"); - handleSimpleMessageStaticBinding.QueueType.Should().Be(QueueType.Dynamic); + handleSimpleMessageStaticBinding.QueueType.ShouldBe(QueueType.Dynamic); } diff --git a/Tapeti.Tests/Default/TypeNameRoutingKeyStrategyTests.cs b/Tapeti.Tests/Default/TypeNameRoutingKeyStrategyTests.cs index 26fc423..04af8f7 100644 --- a/Tapeti.Tests/Default/TypeNameRoutingKeyStrategyTests.cs +++ b/Tapeti.Tests/Default/TypeNameRoutingKeyStrategyTests.cs @@ -1,5 +1,5 @@ using System; -using FluentAssertions; +using Shouldly; using Tapeti.Annotations; using Tapeti.Default; using Xunit; @@ -109,7 +109,7 @@ public void Full() private static void AssertRoutingKey(string expected, Type messageType) { var routingKey = new TypeNameRoutingKeyStrategy().GetRoutingKey(messageType); - routingKey.Should().Be(expected); + routingKey.ShouldBe(expected); } } // ReSharper restore InconsistentNaming diff --git a/Tapeti.Tests/Helpers/ConnectionStringParserTest.cs b/Tapeti.Tests/Helpers/ConnectionStringParserTest.cs index 924f3b5..37a1c43 100644 --- a/Tapeti.Tests/Helpers/ConnectionStringParserTest.cs +++ b/Tapeti.Tests/Helpers/ConnectionStringParserTest.cs @@ -1,4 +1,4 @@ -using FluentAssertions; +using Shouldly; using Tapeti.Helpers; using Xunit; @@ -186,12 +186,12 @@ private static void AssertConnectionString(string connectionstring, TapetiConnec { var parsed = ConnectionStringParser.Parse(connectionstring); - parsed.HostName.Should().Be(expected.HostName); - parsed.Port.Should().Be(expected.Port); - parsed.VirtualHost.Should().Be(expected.VirtualHost); - parsed.Username.Should().Be(expected.Username); - parsed.Password.Should().Be(expected.Password); - parsed.PrefetchCount.Should().Be(expected.PrefetchCount); + parsed.HostName.ShouldBe(expected.HostName); + parsed.Port.ShouldBe(expected.Port); + parsed.VirtualHost.ShouldBe(expected.VirtualHost); + parsed.Username.ShouldBe(expected.Username); + parsed.Password.ShouldBe(expected.Password); + parsed.PrefetchCount.ShouldBe(expected.PrefetchCount); } } } diff --git a/Tapeti.Tests/Helpers/ExpressionInvokerTest.cs b/Tapeti.Tests/Helpers/ExpressionInvokerTest.cs index 0771dd1..b81c5f4 100644 --- a/Tapeti.Tests/Helpers/ExpressionInvokerTest.cs +++ b/Tapeti.Tests/Helpers/ExpressionInvokerTest.cs @@ -1,7 +1,7 @@ using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; -using FluentAssertions; +using Shouldly; using Tapeti.Helpers; using Xunit; @@ -32,7 +32,7 @@ public void InstanceMethodReturnValueNoParameters() var returnValue = invoker.Invoke(target); target.Verify(methodName); - returnValue.Should().Be("Hello world!"); + returnValue.ShouldBe("Hello world!"); } @@ -59,7 +59,7 @@ public void InstanceMethodReturnValueParameters() var returnValue = invoker.Invoke(target, new byte[] { 42, 69 }); target.Verify(methodName, "42,69"); - returnValue.Should().Be(true); + returnValue.ShouldBe(true); } @@ -88,7 +88,7 @@ public void StaticMethodReturnValueNoParameters() var returnValue = invoker.Invoke(null); InvokeTarget.VerifyStatic(methodName); - returnValue.Should().Be("Hello world!"); + returnValue.ShouldBe("Hello world!"); } @@ -117,7 +117,7 @@ public void StaticMethodReturnValueParameters() var returnValue = invoker.Invoke(null, new byte[] { 42, 69 }); InvokeTarget.VerifyStatic(methodName, "42,69"); - returnValue.Should().Be(true); + returnValue.ShouldBe(true); } @@ -187,7 +187,7 @@ public static bool StaticMethodReturnValueParameters(IEnumerable values) private void MethodCalled(string parameters = "", [CallerMemberName]string methodName = "") { - this.methodName.Should().BeNull(); + this.methodName.ShouldBeNull(); this.methodName = methodName; this.parameters = parameters; @@ -203,7 +203,7 @@ public static void ResetStatic() private static void StaticMethodCalled(string parameters = "", [CallerMemberName] string methodName = "") { - staticMethodName.Should().BeNull(); + staticMethodName.ShouldBeNull(); staticMethodName = methodName; staticParameters = parameters; } @@ -212,15 +212,15 @@ private static void StaticMethodCalled(string parameters = "", [CallerMemberName public void Verify(string methodName, string parameters = "") { - this.methodName.Should().Be(methodName); - this.parameters.Should().Be(parameters); + this.methodName.ShouldBe(methodName); + this.parameters.ShouldBe(parameters); } public static void VerifyStatic(string methodName, string parameters = "") { - staticMethodName.Should().Be(methodName); - staticParameters.Should().Be(parameters); + staticMethodName.ShouldBe(methodName); + staticParameters.ShouldBe(parameters); } } } diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index 1ce8661..6290394 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -10,10 +10,10 @@ - + diff --git a/Tapeti.Transient/TransientGenericBinding.cs b/Tapeti.Transient/TransientGenericBinding.cs index 14e7056..b9aa24f 100644 --- a/Tapeti.Transient/TransientGenericBinding.cs +++ b/Tapeti.Transient/TransientGenericBinding.cs @@ -19,6 +19,9 @@ internal class TransientGenericBinding : IBinding /// public QueueType? QueueType => Config.QueueType.Dynamic; + /// + public bool DedicatedChannel => false; + /// /// diff --git a/Tapeti.sln.DotSettings b/Tapeti.sln.DotSettings index 55840d7..05ff7b9 100644 --- a/Tapeti.sln.DotSettings +++ b/Tapeti.sln.DotSettings @@ -9,6 +9,7 @@ UTF <Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /> <Policy><Descriptor Staticness="Instance" AccessRightKinds="Private" Description="Instance fields (private)"><ElementKinds><Kind Name="FIELD" /><Kind Name="READONLY_FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="False" Prefix="" Suffix="" Style="aaBb" /></Policy> + <Policy><Descriptor Staticness="Static" AccessRightKinds="Private" Description="Static fields (private)"><ElementKinds><Kind Name="FIELD" /></ElementKinds></Descriptor><Policy Inspect="True" WarnAboutPrefixesAndSuffixes="True" Prefix="" Suffix="" Style="aaBb" /></Policy> True True True diff --git a/Tapeti/Config/Annotations/DedicatedChannelAttribute.cs b/Tapeti/Config/Annotations/DedicatedChannelAttribute.cs new file mode 100644 index 0000000..3f06f21 --- /dev/null +++ b/Tapeti/Config/Annotations/DedicatedChannelAttribute.cs @@ -0,0 +1,21 @@ +using System; + +namespace Tapeti.Config.Annotations +{ + /// + /// Requests a dedicated RabbitMQ Channel for consuming messages from the queue + /// the queue. + /// + /// + /// The DedicatedChannel attribute can be applied to any controller or method and will apply to the queue + /// that is used in that context. It does not need be applied to all message handlers for that queue to have + /// an effect. + ///

+ /// The intended use case is for high-traffic message handlers, or message handlers which can block for either + /// a long time or indefinitely for throttling purposes. These can clog up the channel's workers and impact + /// other queues. + ///
+ public class DedicatedChannelAttribute : Attribute + { + } +} diff --git a/Tapeti/Config/IBinding.cs b/Tapeti/Config/IBinding.cs index d67255a..2c898cb 100644 --- a/Tapeti/Config/IBinding.cs +++ b/Tapeti/Config/IBinding.cs @@ -1,5 +1,6 @@ using System; using System.Threading.Tasks; +using Tapeti.Config.Annotations; using Tapeti.Connection; namespace Tapeti.Config @@ -38,6 +39,15 @@ public interface IBinding QueueType? QueueType { get; } + /// + /// Determines if the queue is consumed on a dedicated channel or the shared default channel. + /// + /// + /// See + /// + bool DedicatedChannel { get; } + + /// /// Called after a connection is established to set up the binding. /// diff --git a/Tapeti/Connection/IConnectionEventListener.cs b/Tapeti/Connection/IConnectionEventListener.cs index db602ad..23d4589 100644 --- a/Tapeti/Connection/IConnectionEventListener.cs +++ b/Tapeti/Connection/IConnectionEventListener.cs @@ -12,13 +12,13 @@ public interface IConnectionEventListener /// - /// Called when the connection to RabbitMQ has been lost. + /// Called when the connection to RabbitMQ has been recovered after an unexpected disconnect. /// void Reconnected(ConnectedEventArgs e); /// - /// Called when the connection to RabbitMQ has been recovered after an unexpected disconnect. + /// Called when the connection to RabbitMQ has been lost. /// void Disconnected(DisconnectedEventArgs e); } diff --git a/Tapeti/Connection/ITapetiClient.cs b/Tapeti/Connection/ITapetiClient.cs index eeda4f3..7dad103 100644 --- a/Tapeti/Connection/ITapetiClient.cs +++ b/Tapeti/Connection/ITapetiClient.cs @@ -81,21 +81,15 @@ public interface ITapetiClient /// If true, an exception will be raised if the message can not be delivered to at least one queue Task Publish(byte[] body, IMessageProperties properties, string? exchange, string routingKey, bool mandatory); - /// /// Starts a consumer for the specified queue, using the provided bindings to handle messages. /// /// /// The consumer implementation which will receive the messages from the queue + /// Additional options /// Cancelled when the connection is lost - /// The consumer tag as returned by BasicConsume. - Task Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken); - - /// - /// Stops the consumer with the specified tag. - /// - /// The consumer tag as returned by Consume. - Task Cancel(TapetiConsumerTag consumerTag); + /// A representation of the consumer and channel. + Task Consume(string queueName, IConsumer consumer, TapetiConsumeOptions options, CancellationToken cancellationToken); /// /// Creates a durable queue if it does not already exist, and updates the bindings. @@ -146,9 +140,9 @@ public interface ITapetiClient /// - /// Represents a consumer for a specific connection. + /// Represents a consumer for a specific connection and channel. /// - public class TapetiConsumerTag + public interface ITapetiConsumerTag { /// /// The consumer tag as determined by the AMQP protocol. @@ -158,16 +152,37 @@ public class TapetiConsumerTag /// /// An internal reference to the connection on which the consume was started. /// - public long ConnectionReference { get;} + public long ConnectionReference { get; } /// - /// Creates a new instance of the TapetiConsumerTag class. + /// Stops the consumer. /// - public TapetiConsumerTag(long connectionReference, string consumerTag) - { - ConnectionReference = connectionReference; - ConsumerTag = consumerTag; - } + Task Cancel(); + } + + + /// + /// Describes additional options for consuming a queue. + /// + public class TapetiConsumeOptions + { + /// + /// Determines if a new channel will be allocated on the RabbitMQ Connection to handle messages. + /// + /// + /// By default all consumers are registered on a single channel, separated from the publishers. + /// Queues with heavy traffic or long delays can clog up the channel. This option allows for a + /// higher degree of concurrency and separation.

+ /// It is not recommended to enable this for every queue by default, and there is a server-defined + /// limit to the number of channels that can be opened for a connection (default 100 at the time of writing). + ///
+ public bool DedicatedChannel { get; init; } = false; + + + /// + /// Returns the default options. + /// + public static TapetiConsumeOptions Default { get; } = new(); } } \ No newline at end of file diff --git a/Tapeti/Connection/TapetiChannel.cs b/Tapeti/Connection/TapetiChannel.cs index 1ec9c87..ba093d7 100644 --- a/Tapeti/Connection/TapetiChannel.cs +++ b/Tapeti/Connection/TapetiChannel.cs @@ -11,7 +11,10 @@ internal interface ITapetiChannelModelProvider void WithChannel(Action operation); void WithRetryableChannel(Action operation); } - + + + internal delegate void AcquireModelProc(ref TapetiModelReference? modelReference); + /// /// Represents both a RabbitMQ Client Channel (IModel) as well as it's associated single-thread task queue. @@ -19,20 +22,22 @@ internal interface ITapetiChannelModelProvider /// internal class TapetiChannel { - private readonly Func modelFactory; + private TapetiModelReference? modelReference; + private readonly AcquireModelProc acquireModelProc; + private readonly object taskQueueLock = new(); private SerialTaskQueue? taskQueue; private readonly ModelProvider modelProvider; - - public TapetiChannel(Func modelFactory) + + public TapetiChannel(AcquireModelProc acquireModelProc) { - this.modelFactory = modelFactory; + this.acquireModelProc = acquireModelProc; modelProvider = new ModelProvider(this); } - public async Task Reset() + public async Task Close() { SerialTaskQueue? capturedTaskQueue; @@ -45,11 +50,22 @@ public async Task Reset() if (capturedTaskQueue == null) return; - await capturedTaskQueue.Add(() => { }).ConfigureAwait(false); + await capturedTaskQueue.Add(() => + { + modelReference?.Model.Dispose(); + modelReference = null; + }).ConfigureAwait(false); + capturedTaskQueue.Dispose(); } + public void ClearModel() + { + modelReference = null; + } + + public Task Queue(Action operation) { return GetTaskQueue().Add(() => @@ -89,6 +105,13 @@ private SerialTaskQueue GetTaskQueue() } + private IModel GetModel() + { + acquireModelProc(ref modelReference); + return modelReference?.Model ?? throw new InvalidOperationException("RabbitMQ Model is unavailable"); + } + + private class ModelProvider : ITapetiChannelModelProvider { private readonly TapetiChannel owner; @@ -102,7 +125,7 @@ public ModelProvider(TapetiChannel owner) public void WithChannel(Action operation) { - operation(owner.modelFactory()); + operation(owner.GetModel()); } @@ -112,7 +135,7 @@ public void WithRetryableChannel(Action operation) { try { - operation(owner.modelFactory()); + operation(owner.GetModel()); break; } catch (AlreadyClosedException) diff --git a/Tapeti/Connection/TapetiClient.cs b/Tapeti/Connection/TapetiClient.cs index 02cb024..853befe 100644 --- a/Tapeti/Connection/TapetiClient.cs +++ b/Tapeti/Connection/TapetiClient.cs @@ -3,7 +3,6 @@ using System.Linq; using System.Net; using System.Net.Http; -using System.Text; using System.Threading; using System.Threading.Tasks; using Newtonsoft.Json; @@ -18,21 +17,12 @@ namespace Tapeti.Connection { - internal enum TapetiChannelType - { - Consume, - Publish - } - - /// /// Implementation of ITapetiClient for the RabbitMQ Client library /// internal class TapetiClient : ITapetiClient { - private const int ReconnectDelay = 5000; private const int MandatoryReturnTimeout = 300000; - private const int MinimumConnectedReconnectDelay = 1000; private const int CloseMessageHandlersTimeout = 30000; private readonly TapetiConnectionParams connectionParams; @@ -41,27 +31,14 @@ internal class TapetiClient : ITapetiClient private readonly ILogger logger; - /// - /// Receives events when the connection state changes. - /// - public IConnectionEventListener? ConnectionEventListener { get; set; } - + private readonly TapetiClientConnection connection; + private readonly TapetiChannel defaultConsumeChannel; + private readonly TapetiChannel defaultPublishChannel; + private readonly List dedicatedChannels = new(); - private readonly TapetiChannel consumeChannel; - private readonly TapetiChannel publishChannel; private readonly HttpClient managementClient; private readonly MessageHandlerTracker messageHandlerTracker = new(); - // These fields must be locked using connectionLock - private readonly object connectionLock = new(); - private long connectionReference; - private RabbitMQ.Client.IConnection? connection; - private IModel? consumeChannelModel; - private IModel? publishChannelModel; - private bool isClosing; - private bool isReconnect; - private DateTime connectedDateTime; - // These fields are for use in a single TapetiChannel's queue only! private ulong lastDeliveryTag; private readonly HashSet deletedQueues = new(); @@ -93,15 +70,22 @@ private class ReturnInfo } - public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams) + public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParams, IConnectionEventListener? connectionEventListener) { this.config = config; this.connectionParams = connectionParams; logger = config.DependencyResolver.Resolve(); - consumeChannel = new TapetiChannel(() => GetModel(TapetiChannelType.Consume)); - publishChannel = new TapetiChannel(() => GetModel(TapetiChannelType.Publish)); + connection = new TapetiClientConnection(logger, connectionParams) + { + ConnectionEventListener = connectionEventListener + }; + + defaultConsumeChannel = connection.CreateChannel(InitConsumeModel); + defaultPublishChannel = connection.CreateChannel(InitPublishModel); + + connection.OnQueueReconnect += () => defaultConsumeChannel.QueueRetryable(_ => { }); var handler = new HttpClientHandler @@ -118,6 +102,41 @@ public TapetiClient(ITapetiConfig config, TapetiConnectionParams connectionParam } + private void InitConsumeModel(IModel model) + { + if (connectionParams.PrefetchCount > 0) + model.BasicQos(0, connectionParams.PrefetchCount, false); + } + + + private void InitPublishModel(IModel model) + { + if (config.GetFeatures().PublisherConfirms) + { + lastDeliveryTag = 0; + + Monitor.Enter(confirmLock); + try + { + foreach (var pair in confirmMessages) + pair.Value.CompletionSource.SetCanceled(); + + confirmMessages.Clear(); + } + finally + { + Monitor.Exit(confirmLock); + } + + model.ConfirmSelect(); + } + + model.BasicReturn += HandleBasicReturn; + model.BasicAcks += HandleBasicAck; + model.BasicNacks += HandleBasicNack; + } + + /// public async Task Publish(byte[] body, IMessageProperties properties, string? exchange, string routingKey, bool mandatory) { @@ -125,7 +144,7 @@ public async Task Publish(byte[] body, IMessageProperties properties, string? ex throw new ArgumentNullException(nameof(routingKey)); - await GetTapetiChannel(TapetiChannelType.Publish).QueueWithProvider(async channelProvider => + await defaultPublishChannel.QueueWithProvider(async channelProvider => { Task? publishResultTask = null; var messageInfo = new ConfirmMessageInfo(GetReturnKey(exchange ?? string.Empty, routingKey), new TaskCompletionSource()); @@ -208,7 +227,7 @@ await GetTapetiChannel(TapetiChannelType.Publish).QueueWithProvider(async channe /// - public async Task Consume(string queueName, IConsumer consumer, CancellationToken cancellationToken) + public async Task Consume(string queueName, IConsumer consumer, TapetiConsumeOptions options, CancellationToken cancellationToken) { if (deletedQueues.Contains(queueName)) return null; @@ -220,57 +239,62 @@ await GetTapetiChannel(TapetiChannelType.Publish).QueueWithProvider(async channe long capturedConnectionReference = -1; string? consumerTag = null; - await GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(channel => + var channel = options.DedicatedChannel + ? CreateDedicatedConsumeChannel() + : defaultConsumeChannel; + + await channel.QueueRetryable(model => { if (cancellationToken.IsCancellationRequested) return; - capturedConnectionReference = Interlocked.Read(ref connectionReference); - var basicConsumer = new TapetiBasicConsumer(consumer, messageHandlerTracker, capturedConnectionReference, Respond); - consumerTag = channel.BasicConsume(queueName, false, basicConsumer); + capturedConnectionReference = connection.GetConnectionReference(); + var basicConsumer = new TapetiBasicConsumer(consumer, messageHandlerTracker, capturedConnectionReference, + (connectionReference, deliveryTag, result) => Respond(channel, connectionReference, deliveryTag, result)); + + consumerTag = model.BasicConsume(queueName, false, basicConsumer); }).ConfigureAwait(false); return consumerTag == null ? null - : new TapetiConsumerTag(capturedConnectionReference, consumerTag); + : new TapetiConsumerTag(this, channel, consumerTag, capturedConnectionReference); } - /// - public async Task Cancel(TapetiConsumerTag consumerTag) + private async Task Cancel(TapetiChannel channel, string consumerTag, long connectionReference) { - if (isClosing || string.IsNullOrEmpty(consumerTag.ConsumerTag)) + if (connection.IsClosing || string.IsNullOrEmpty(consumerTag)) return; - var capturedConnectionReference = Interlocked.Read(ref connectionReference); + var capturedConnectionReference = connection.GetConnectionReference(); // If the connection was re-established in the meantime, don't respond with an // invalid deliveryTag. The message will be requeued. - if (capturedConnectionReference != consumerTag.ConnectionReference) + if (capturedConnectionReference != connectionReference) return; // No need for a retryable channel here, if the connection is lost // so is the consumer. - await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => + await channel.Queue(model => { // Check again as a reconnect may have occured in the meantime - var currentConnectionReference = Interlocked.Read(ref connectionReference); - if (currentConnectionReference != consumerTag.ConnectionReference) + var currentConnectionReference = connection.GetConnectionReference(); + if (currentConnectionReference != connectionReference) return; - channel.BasicCancel(consumerTag.ConsumerTag); + model.BasicCancel(consumerTag); }).ConfigureAwait(false); } - private async Task Respond(long expectedConnectionReference, ulong deliveryTag, ConsumeResult result) + private async Task Respond(TapetiChannel channel, long expectedConnectionReference, ulong deliveryTag, ConsumeResult result) { - await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => + await channel.Queue(model => { // If the connection was re-established in the meantime, don't respond with an // invalid deliveryTag. The message will be requeued. - var currentConnectionReference = Interlocked.Read(ref connectionReference); - if (currentConnectionReference != connectionReference) + var currentConnectionReference = connection.GetConnectionReference(); + if (currentConnectionReference != expectedConnectionReference) return; // No need for a retryable channel here, if the connection is lost we can't @@ -279,15 +303,15 @@ await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => { case ConsumeResult.Success: case ConsumeResult.ExternalRequeue: - channel.BasicAck(deliveryTag, false); + model.BasicAck(deliveryTag, false); break; case ConsumeResult.Error: - channel.BasicNack(deliveryTag, false, false); + model.BasicNack(deliveryTag, false, false); break; case ConsumeResult.Requeue: - channel.BasicNack(deliveryTag, false, true); + model.BasicNack(deliveryTag, false, true); break; default: @@ -350,7 +374,9 @@ public async Task DurableQueueDeclare(string queueName, IEnumerable + // Metadata operations are performed at startup so they can always run on the defaultConsumeChannel, + // regardless of whether the consumer of the queue will use a dedicated channel. + await defaultConsumeChannel.Queue(channel => { if (cancellationToken.IsCancellationRequested) return; @@ -391,7 +417,7 @@ public async Task DurableQueueVerify(string queueName, IRabbitMQArguments? argum if (!await GetDurableQueueDeclareRequired(queueName, arguments).ConfigureAwait(false)) return; - await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => + await defaultConsumeChannel.Queue(channel => { if (cancellationToken.IsCancellationRequested) return; @@ -409,7 +435,7 @@ public async Task DurableQueueDelete(string queueName, bool onlyIfEmpty, Cancell { uint deletedMessages = 0; - await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => + await defaultConsumeChannel.Queue(channel => { if (cancellationToken.IsCancellationRequested) return; @@ -423,7 +449,7 @@ await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => } - await GetTapetiChannel(TapetiChannelType.Consume).QueueWithProvider(async channelProvider => + await defaultConsumeChannel.QueueWithProvider(async channelProvider => { bool retry; do @@ -493,7 +519,7 @@ public async Task DynamicQueueDeclare(string? queuePrefix, IRabbitMQArgu string? queueName = null; var bindingLogger = logger as IBindingLogger; - await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => + await defaultConsumeChannel.Queue(channel => { if (cancellationToken.IsCancellationRequested) return; @@ -521,7 +547,7 @@ await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => /// public async Task DynamicQueueBind(string queueName, QueueBinding binding, CancellationToken cancellationToken) { - await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => + await defaultConsumeChannel.Queue(channel => { if (cancellationToken.IsCancellationRequested) return; @@ -536,42 +562,16 @@ await GetTapetiChannel(TapetiChannelType.Consume).Queue(channel => /// public async Task Close() { - IModel? capturedConsumeModel; - IModel? capturedPublishModel; - RabbitMQ.Client.IConnection? capturedConnection; - - lock (connectionLock) - { - isClosing = true; - capturedConsumeModel = consumeChannelModel; - capturedPublishModel = publishChannelModel; - capturedConnection = connection; - - consumeChannelModel = null; - publishChannelModel = null; - connection = null; - } - // Empty the queue - await consumeChannel.Reset().ConfigureAwait(false); - await publishChannel.Reset().ConfigureAwait(false); + await defaultConsumeChannel.Close().ConfigureAwait(false); + await defaultPublishChannel.Close().ConfigureAwait(false); - // No need to close the channels as the connection will be closed - capturedConsumeModel?.Dispose(); - capturedPublishModel?.Dispose(); + foreach (var channel in dedicatedChannels) + await channel.Close().ConfigureAwait(false); + + dedicatedChannels.Clear(); + connection.Close(); - // ReSharper disable once InvertIf - if (capturedConnection != null) - { - try - { - capturedConnection.Close(); - } - finally - { - capturedConnection.Dispose(); - } - } // Wait for message handlers to finish await messageHandlerTracker.WaitForIdle(CloseMessageHandlersTimeout); @@ -743,202 +743,12 @@ private void DeclareExchange(IModel channel, string exchange) } - private TapetiChannel GetTapetiChannel(TapetiChannelType channelType) - { - return channelType == TapetiChannelType.Publish - ? publishChannel - : consumeChannel; - } - - - /// - /// Only call this from a task in the taskQueue to ensure IModel is only used - /// by a single thread, as is recommended in the RabbitMQ .NET Client documentation. - /// - private IModel GetModel(TapetiChannelType channelType) + private TapetiChannel CreateDedicatedConsumeChannel() { - lock (connectionLock) - { - var channel = channelType == TapetiChannelType.Publish - ? publishChannelModel - : consumeChannelModel; - - if (channel is { IsOpen: true }) - return channel; - } - - // If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ - // not related to the connection), wait for a bit to avoid spamming the connection - if ((DateTime.UtcNow - connectedDateTime).TotalMilliseconds <= MinimumConnectedReconnectDelay) - Thread.Sleep(ReconnectDelay); + var channel = connection.CreateChannel(InitConsumeModel); + dedicatedChannels.Add(channel); - - var connectionFactory = new ConnectionFactory - { - HostName = connectionParams.HostName, - Port = connectionParams.Port, - VirtualHost = connectionParams.VirtualHost, - UserName = connectionParams.Username, - Password = connectionParams.Password, - AutomaticRecoveryEnabled = false, - TopologyRecoveryEnabled = false, - RequestedHeartbeat = TimeSpan.FromSeconds(30), - DispatchConsumersAsync = true - }; - - if (connectionParams.ConsumerDispatchConcurrency > 0) - connectionFactory.ConsumerDispatchConcurrency = connectionParams.ConsumerDispatchConcurrency; - - if (connectionParams.ClientProperties != null) - foreach (var pair in connectionParams.ClientProperties) - { - if (connectionFactory.ClientProperties.ContainsKey(pair.Key)) - connectionFactory.ClientProperties[pair.Key] = Encoding.UTF8.GetBytes(pair.Value); - else - connectionFactory.ClientProperties.Add(pair.Key, Encoding.UTF8.GetBytes(pair.Value)); - } - - - // TODO lock both channels when attempting the connection - // TODO when one channel is lost, do not reconnect, instead restore the channel - - while (true) - { - try - { - RabbitMQ.Client.IConnection? capturedConnection; - IModel? capturedConsumeChannelModel; - IModel? capturedPublishChannelModel; - - - lock (connectionLock) - { - capturedConnection = connection; - } - - if (capturedConnection != null) - { - try - { - if (capturedConnection is { IsOpen: true }) - capturedConnection.Close(); - } - catch (AlreadyClosedException) - { - } - finally - { - capturedConnection.Dispose(); - } - } - - logger.Connect(new ConnectContext(connectionParams, isReconnect)); - Interlocked.Increment(ref connectionReference); - - lock (connectionLock) - { - connection = connectionFactory.CreateConnection(); - capturedConnection = connection; - - consumeChannelModel = connection.CreateModel(); - if (consumeChannel == null) - throw new BrokerUnreachableException(null); - - publishChannelModel = connection.CreateModel(); - if (publishChannel == null) - throw new BrokerUnreachableException(null); - - capturedConsumeChannelModel = consumeChannelModel; - capturedPublishChannelModel = publishChannelModel; - } - - - if (config.GetFeatures().PublisherConfirms) - { - lastDeliveryTag = 0; - - Monitor.Enter(confirmLock); - try - { - foreach (var pair in confirmMessages) - pair.Value.CompletionSource.SetCanceled(); - - confirmMessages.Clear(); - } - finally - { - Monitor.Exit(confirmLock); - } - - capturedPublishChannelModel.ConfirmSelect(); - } - - if (connectionParams.PrefetchCount > 0) - capturedConsumeChannelModel.BasicQos(0, connectionParams.PrefetchCount, false); - - capturedPublishChannelModel.ModelShutdown += (_, e) => - { - lock (connectionLock) - { - if (consumeChannelModel == null || consumeChannelModel != capturedConsumeChannelModel) - return; - - consumeChannelModel = null; - } - - ConnectionEventListener?.Disconnected(new DisconnectedEventArgs(e.ReplyCode, e.ReplyText)); - logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText)); - - // Reconnect if the disconnect was unexpected - if (!isClosing) - GetTapetiChannel(TapetiChannelType.Consume).QueueRetryable(_ => { }); - }; - - capturedPublishChannelModel.ModelShutdown += (_, _) => - { - lock (connectionLock) - { - if (publishChannelModel == null || publishChannelModel != capturedPublishChannelModel) - return; - - publishChannelModel = null; - } - - // No need to reconnect, the next Publish will - }; - - - capturedPublishChannelModel.BasicReturn += HandleBasicReturn; - capturedPublishChannelModel.BasicAcks += HandleBasicAck; - capturedPublishChannelModel.BasicNacks += HandleBasicNack; - - connectedDateTime = DateTime.UtcNow; - - var connectedEventArgs = new ConnectedEventArgs(connectionParams, capturedConnection.LocalPort); - - if (isReconnect) - ConnectionEventListener?.Reconnected(connectedEventArgs); - else - ConnectionEventListener?.Connected(connectedEventArgs); - - logger.ConnectSuccess(new ConnectContext(connectionParams, isReconnect, capturedConnection.LocalPort)); - isReconnect = true; - - break; - } - catch (BrokerUnreachableException e) - { - logger.ConnectFailed(new ConnectContext(connectionParams, isReconnect, exception: e)); - Thread.Sleep(ReconnectDelay); - } - } - - lock (connectionLock) - { - return channelType == TapetiChannelType.Publish - ? publishChannelModel - : consumeChannelModel; - } + return channel; } @@ -1046,38 +856,30 @@ private static string GetReturnKey(string exchange, string routingKey) } - - private class ConnectContext : IConnectSuccessContext, IConnectFailedContext + private class TapetiConsumerTag : ITapetiConsumerTag { - public TapetiConnectionParams ConnectionParams { get; } - public bool IsReconnect { get; } - public int LocalPort { get; } - public Exception? Exception { get; } + private readonly TapetiClient client; + private readonly TapetiChannel channel; + public string ConsumerTag { get; } + public long ConnectionReference { get; } - public ConnectContext(TapetiConnectionParams connectionParams, bool isReconnect, int localPort = 0, Exception? exception = null) - { - ConnectionParams = connectionParams; - IsReconnect = isReconnect; - LocalPort = localPort; - Exception = exception; - } - } + public TapetiConsumerTag(TapetiClient client, TapetiChannel channel, string consumerTag, long connectionReference) + { + this.client = client; + this.channel = channel; - private class DisconnectContext : IDisconnectContext - { - public TapetiConnectionParams ConnectionParams { get; } - public ushort ReplyCode { get; } - public string ReplyText { get; } + ConnectionReference = connectionReference; + ConsumerTag = consumerTag; + } - public DisconnectContext(TapetiConnectionParams connectionParams, ushort replyCode, string replyText) + public Task Cancel() { - ConnectionParams = connectionParams; - ReplyCode = replyCode; - ReplyText = replyText; + return client.Cancel(channel, ConsumerTag, ConnectionReference); } } + } } diff --git a/Tapeti/Connection/TapetiClientConnection.cs b/Tapeti/Connection/TapetiClientConnection.cs new file mode 100644 index 0000000..1896d0e --- /dev/null +++ b/Tapeti/Connection/TapetiClientConnection.cs @@ -0,0 +1,303 @@ +using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; +using System; +using System.Text; +using System.Threading; + +namespace Tapeti.Connection +{ + internal readonly struct TapetiModelReference + { + public IModel Model { get; } + public long ConnectionReference { get; } + public DateTime CreatedDateTime { get; } + + + public TapetiModelReference(IModel model, long connectionReference, DateTime createdDateTime) + { + Model = model; + ConnectionReference = connectionReference; + CreatedDateTime = createdDateTime; + } + } + + + /// + /// Implements a resilient connection to RabbitMQ. + /// + internal class TapetiClientConnection + { + /// + /// Receives events when the connection state changes. + /// + public IConnectionEventListener? ConnectionEventListener { get; set; } + + public event Action? OnQueueReconnect; + + public bool IsClosing { get; private set; } + + + private const int ReconnectDelay = 5000; + private const int MinimumConnectedReconnectDelay = 1000; + + private const int ChannelRecreateDelay = 5000; + private const int MinimumChannelRecreateDelay = 1000; + + + private readonly ILogger logger; + private readonly TapetiConnectionParams connectionParams; + + private readonly ConnectionFactory connectionFactory; + + + // These fields must be locked using connectionLock + private readonly object connectionLock = new(); + private long connectionReference; + private RabbitMQ.Client.IConnection? connection; + private bool isReconnect; + private DateTime connectedDateTime; + private IModel? connectionMonitorChannel; + + + public TapetiClientConnection(ILogger logger, TapetiConnectionParams connectionParams) + { + this.logger = logger; + this.connectionParams = connectionParams; + + connectionFactory = new ConnectionFactory + { + HostName = connectionParams.HostName, + Port = connectionParams.Port, + VirtualHost = connectionParams.VirtualHost, + UserName = connectionParams.Username, + Password = connectionParams.Password, + AutomaticRecoveryEnabled = false, + TopologyRecoveryEnabled = false, + RequestedHeartbeat = TimeSpan.FromSeconds(30), + DispatchConsumersAsync = true + }; + + if (connectionParams.ConsumerDispatchConcurrency > 0) + connectionFactory.ConsumerDispatchConcurrency = connectionParams.ConsumerDispatchConcurrency; + + // ReSharper disable once InvertIf + if (connectionParams.ClientProperties != null) + foreach (var pair in connectionParams.ClientProperties) + { + if (connectionFactory.ClientProperties.ContainsKey(pair.Key)) + connectionFactory.ClientProperties[pair.Key] = Encoding.UTF8.GetBytes(pair.Value); + else + connectionFactory.ClientProperties.Add(pair.Key, Encoding.UTF8.GetBytes(pair.Value)); + } + } + + + public void Close() + { + RabbitMQ.Client.IConnection? capturedConnection; + + lock (connectionLock) + { + IsClosing = true; + capturedConnection = connection; + connection = null; + } + + // ReSharper disable once InvertIf + if (capturedConnection != null) + { + try + { + capturedConnection.Close(); + } + finally + { + capturedConnection.Dispose(); + } + } + } + + + public TapetiChannel CreateChannel(Action? onInitModel) + { + var capturedChannel = new WeakReference(null); + var channel = new TapetiChannel((ref TapetiModelReference? modelReference) => + { + AcquireModel(ref modelReference, model => + { + model.ModelShutdown += (_, _) => + { + if (capturedChannel.TryGetTarget(out var innerChannel)) + innerChannel.ClearModel(); + }; + + onInitModel?.Invoke(model); + }); + }); + + capturedChannel.SetTarget(channel); + return channel; + } + + + + private void AcquireModel(ref TapetiModelReference? modelReference, Action? onInitModel) + { + var sameConnection = modelReference != null && + modelReference.Value.ConnectionReference == Interlocked.Read(ref connectionReference); + + if (sameConnection && modelReference!.Value.Model.IsOpen) + return; + + long newConnectionReference; + RabbitMQ.Client.IConnection capturedConnection; + + lock (connectionLock) + { + if (connection is not { IsOpen: true }) + { + try + { + connection?.Close(); + } + catch (AlreadyClosedException) + { + } + finally + { + connection?.Dispose(); + } + + logger.Connect(new ConnectContext(connectionParams, isReconnect)); + newConnectionReference = Interlocked.Increment(ref connectionReference); + + connection = Connect(); + } + else + newConnectionReference = Interlocked.Read(ref connectionReference); + + capturedConnection = connection; + } + + if (sameConnection && (DateTime.UtcNow - modelReference!.Value.CreatedDateTime).TotalMilliseconds <= MinimumChannelRecreateDelay) + Thread.Sleep(ChannelRecreateDelay); + + var newModel = capturedConnection.CreateModel(); + onInitModel?.Invoke(newModel); + + modelReference = new TapetiModelReference(newModel, newConnectionReference, DateTime.UtcNow); + } + + + private RabbitMQ.Client.IConnection Connect() + { + // If the Disconnect quickly follows the Connect (when an error occurs that is reported back by RabbitMQ + // not related to the connection), wait for a bit to avoid spamming the connection + if ((DateTime.UtcNow - connectedDateTime).TotalMilliseconds <= MinimumConnectedReconnectDelay) + Thread.Sleep(ReconnectDelay); + + RabbitMQ.Client.IConnection newConnection; + while (true) + { + try + { + newConnection = connectionFactory.CreateConnection(); + connectionMonitorChannel = newConnection.CreateModel(); + if (connectionMonitorChannel == null) + throw new BrokerUnreachableException(null); + + var capturedConnectionMonitorChannel = connectionMonitorChannel; + + connectionMonitorChannel.ModelShutdown += (_, e) => + { + bool capturedIsClosing; + + lock (connectionLock) + { + if (connectionMonitorChannel == null || connectionMonitorChannel != capturedConnectionMonitorChannel) + return; + + capturedConnectionMonitorChannel = null; + capturedIsClosing = IsClosing; + } + + ConnectionEventListener?.Disconnected(new DisconnectedEventArgs(e.ReplyCode, e.ReplyText)); + logger.Disconnect(new DisconnectContext(connectionParams, e.ReplyCode, e.ReplyText)); + + // Reconnect if the disconnect was unexpected + if (!capturedIsClosing) + // Note: I'm not too happy with this design, but letting the Client handle the reconnect is + // effectively how it was done before TapetiClientConnection was split off and since it + // manages the channels it is the best I could come up with for now. + OnQueueReconnect?.Invoke(); + }; + + connectedDateTime = DateTime.UtcNow; + + var connectedEventArgs = new ConnectedEventArgs(connectionParams, newConnection.LocalPort); + + if (isReconnect) + ConnectionEventListener?.Reconnected(connectedEventArgs); + else + ConnectionEventListener?.Connected(connectedEventArgs); + + logger.ConnectSuccess(new ConnectContext(connectionParams, isReconnect, newConnection.LocalPort)); + isReconnect = true; + + break; + } + catch (BrokerUnreachableException e) + { + logger.ConnectFailed(new ConnectContext(connectionParams, isReconnect, exception: e)); + Thread.Sleep(ReconnectDelay); + } + } + + return newConnection; + } + + /// + /// Returns the unique identifier of the current connection. Increments when the connection is lost. + /// Can be used to detect if values related to the connection's lifetime, such as consumer tags, + /// are still valid. + /// + public long GetConnectionReference() + { + return Interlocked.Read(ref connectionReference); + } + + + private class ConnectContext : IConnectSuccessContext, IConnectFailedContext + { + public TapetiConnectionParams ConnectionParams { get; } + public bool IsReconnect { get; } + public int LocalPort { get; } + public Exception? Exception { get; } + + + public ConnectContext(TapetiConnectionParams connectionParams, bool isReconnect, int localPort = 0, Exception? exception = null) + { + ConnectionParams = connectionParams; + IsReconnect = isReconnect; + LocalPort = localPort; + Exception = exception; + } + } + + + private class DisconnectContext : IDisconnectContext + { + public TapetiConnectionParams ConnectionParams { get; } + public ushort ReplyCode { get; } + public string ReplyText { get; } + + + public DisconnectContext(TapetiConnectionParams connectionParams, ushort replyCode, string replyText) + { + ConnectionParams = connectionParams; + ReplyCode = replyCode; + ReplyText = replyText; + } + } + } +} diff --git a/Tapeti/Connection/TapetiConsumer.cs b/Tapeti/Connection/TapetiConsumer.cs index 95238fb..b7a9a0d 100644 --- a/Tapeti/Connection/TapetiConsumer.cs +++ b/Tapeti/Connection/TapetiConsumer.cs @@ -190,6 +190,7 @@ private class ExceptionContextBinding : IBinding { public string? QueueName { get; } public QueueType? QueueType => null; + public bool DedicatedChannel => false; public ExceptionContextBinding(string? queueName) diff --git a/Tapeti/Connection/TapetiSubscriber.cs b/Tapeti/Connection/TapetiSubscriber.cs index b7023eb..8af1216 100644 --- a/Tapeti/Connection/TapetiSubscriber.cs +++ b/Tapeti/Connection/TapetiSubscriber.cs @@ -14,7 +14,7 @@ internal class TapetiSubscriber : ISubscriber private readonly Func clientFactory; private readonly ITapetiConfig config; private bool consuming; - private readonly List consumerTags = new(); + private readonly List consumerTags = new(); private CancellationTokenSource? initializeCancellationTokenSource; @@ -111,7 +111,7 @@ public async Task Stop() initializeCancellationTokenSource?.Cancel(); initializeCancellationTokenSource = null; - await Task.WhenAll(consumerTags.Select(async tag => await clientFactory().Cancel(tag))).ConfigureAwait(false); + await Task.WhenAll(consumerTags.Select(async tag => await tag.Cancel())).ConfigureAwait(false); consumerTags.Clear(); consuming = false; @@ -155,10 +155,19 @@ private async Task ConsumeQueues(CancellationToken cancellationToken) var queueName = group.Key; var consumer = new TapetiConsumer(cancellationToken, config, queueName, group); - return await clientFactory().Consume(queueName, consumer, cancellationToken).ConfigureAwait(false); + return await clientFactory().Consume(queueName, consumer, GetConsumeOptions(group), cancellationToken).ConfigureAwait(false); })).ConfigureAwait(false)) .Where(t => t?.ConsumerTag != null) - .Cast()); + .Cast()); + } + + + private static TapetiConsumeOptions GetConsumeOptions(IEnumerable bindings) + { + return new TapetiConsumeOptions + { + DedicatedChannel = bindings.Any(b => b.DedicatedChannel) + }; } diff --git a/Tapeti/Default/ControllerMethodBinding.cs b/Tapeti/Default/ControllerMethodBinding.cs index 0c74764..9d0d23f 100644 --- a/Tapeti/Default/ControllerMethodBinding.cs +++ b/Tapeti/Default/ControllerMethodBinding.cs @@ -4,6 +4,7 @@ using System.Reflection; using System.Threading.Tasks; using Tapeti.Config; +using Tapeti.Config.Annotations; using Tapeti.Connection; using Tapeti.Helpers; @@ -92,6 +93,9 @@ public struct BindingInfo /// public QueueType? QueueType => bindingInfo.QueueInfo.QueueType; + /// + public bool DedicatedChannel => bindingInfo.QueueInfo.DedicatedChannel; + /// public Type Controller => bindingInfo.ControllerType; @@ -323,6 +327,14 @@ public class QueueInfo /// Optional arguments (x-arguments) passed when declaring the queue. ///
public IRabbitMQArguments? QueueArguments { get; set; } + + /// + /// Determines if the queue is consumed on a dedicated channel or the shared default channel. + /// + /// + /// See + /// + public bool DedicatedChannel { get; set; } /// /// Determines if the QueueInfo properties contain a valid combination. diff --git a/Tapeti/TapetiConfigControllers.cs b/Tapeti/TapetiConfigControllers.cs index 7371eca..4406a96 100644 --- a/Tapeti/TapetiConfigControllers.cs +++ b/Tapeti/TapetiConfigControllers.cs @@ -147,6 +147,7 @@ public static ITapetiConfigBuilder RegisterAllControllers(this ITapetiConfigBuil var dynamicQueueAttribute = member.GetDynamicQueueAttribute(); var durableQueueAttribute = member.GetDurableQueueAttribute(); var queueArgumentsAttribute = member.GetQueueArgumentsAttribute(); + var dedicatedChannelAttribute = member.GetCustomAttribute(); if (dynamicQueueAttribute != null && durableQueueAttribute != null) throw new TopologyConfigurationException($"Cannot combine static and dynamic queue attributes on controller {member.DeclaringType?.Name} method {member.Name}"); @@ -177,7 +178,8 @@ public static ITapetiConfigBuilder RegisterAllControllers(this ITapetiConfigBuil return new ControllerMethodBinding.QueueInfo(queueType, name) { - QueueArguments = GetQueueArguments(queueArgumentsAttribute) ?? fallbackQueueInfo?.QueueArguments + QueueArguments = GetQueueArguments(queueArgumentsAttribute) ?? fallbackQueueInfo?.QueueArguments, + DedicatedChannel = dedicatedChannelAttribute != null }; } diff --git a/Tapeti/TapetiConnection.cs b/Tapeti/TapetiConnection.cs index 820d5e1..a948443 100644 --- a/Tapeti/TapetiConnection.cs +++ b/Tapeti/TapetiConnection.cs @@ -41,10 +41,7 @@ public TapetiConnection(ITapetiConfig config) this.config = config; (config.DependencyResolver as IDependencyContainer)?.RegisterDefault(GetPublisher); - client = new Lazy(() => new TapetiClient(config, Params ?? new TapetiConnectionParams()) - { - ConnectionEventListener = new ConnectionEventListener(this) - }); + client = new Lazy(() => new TapetiClient(config, Params ?? new TapetiConnectionParams(), new ConnectionEventListener(this))); } /// From e735bbc46a4daaade6fb8d263c19bac0bb54e096 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 17 Feb 2025 15:18:49 +0100 Subject: [PATCH 2/8] Start of unit tests for connectivity issues - Introduced Toxiproxy - Replaced Moq with NSubstitute - Issue remaining in the unit tests with Management HTTP calls, not sure if it just needs a reboot --- Tapeti.Tests/Client/ControllerTests.cs | 11 +- Tapeti.Tests/Client/RabbitMQFixture.cs | 168 +++++++++++++++++----- Tapeti.Tests/Client/TapetiClientTests.cs | 27 ++-- Tapeti.Tests/Config/QueueArgumentsTest.cs | 51 ++++--- Tapeti.Tests/Tapeti.Tests.csproj | 15 +- 5 files changed, 192 insertions(+), 80 deletions(-) diff --git a/Tapeti.Tests/Client/ControllerTests.cs b/Tapeti.Tests/Client/ControllerTests.cs index 2baa7f8..82e1745 100644 --- a/Tapeti.Tests/Client/ControllerTests.cs +++ b/Tapeti.Tests/Client/ControllerTests.cs @@ -19,6 +19,7 @@ public class ControllerTests : IAsyncLifetime private readonly Container container = new(); private TapetiConnection? connection; + private RabbitMQFixture.RabbitMQTestProxy proxy = null!; public ControllerTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper) @@ -30,9 +31,9 @@ public ControllerTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelp } - public Task InitializeAsync() + public async Task InitializeAsync() { - return Task.CompletedTask; + proxy = await fixture.AcquireProxy(); } @@ -40,6 +41,8 @@ public async Task DisposeAsync() { if (connection != null) await connection.DisposeAsync(); + + proxy.Dispose(); } @@ -102,8 +105,8 @@ private TapetiConnection CreateConnection(ITapetiConfig config, ushort prefetchC Params = new TapetiConnectionParams { HostName = "127.0.0.1", - Port = fixture.RabbitMQPort, - ManagementPort = fixture.RabbitMQManagementPort, + Port = proxy.RabbitMQPort, + ManagementPort = proxy.RabbitMQManagementPort, Username = RabbitMQFixture.RabbitMQUsername, Password = RabbitMQFixture.RabbitMQPassword, PrefetchCount = prefetchCount, diff --git a/Tapeti.Tests/Client/RabbitMQFixture.cs b/Tapeti.Tests/Client/RabbitMQFixture.cs index 3e8a581..8578308 100644 --- a/Tapeti.Tests/Client/RabbitMQFixture.cs +++ b/Tapeti.Tests/Client/RabbitMQFixture.cs @@ -1,10 +1,10 @@ using System; +using System.Threading; using System.Threading.Tasks; -using Docker.DotNet; -using Docker.DotNet.Models; using DotNet.Testcontainers.Builders; -using DotNet.Testcontainers.Configurations; using DotNet.Testcontainers.Containers; +using DotNet.Testcontainers.Networks; +using Toxiproxy.Net; using Xunit; namespace Tapeti.Tests.Client @@ -21,59 +21,159 @@ public sealed class RabbitMQFixture : IAsyncLifetime public static string RabbitMQUsername => "tapetitests"; public static string RabbitMQPassword => "topsecret1234"; - public ushort RabbitMQPort { get; private set; } - public ushort RabbitMQManagementPort { get; private set; } + private INetwork? network; + private IContainer? rabbitMQContainer; + private IContainer? toxiproxyContainer; + private ushort rabbitMQPort; + private ushort rabbitMQManagementPort; + private ushort toxiproxyPort; - private TestcontainerMessageBroker? testcontainers; + private Toxiproxy.Net.Connection? toxiproxyConnection; + private Toxiproxy.Net.Client? toxiproxyClient; + private Proxy? rabbitMQProxy; + //private Proxy? rabbitMQManagementProxy; + + private readonly SemaphoreSlim acquireLimit = new(1, 1); private const int DefaultRabbitMQPort = 5672; private const int DefaultRabbitMQManagementPort = 15672; + private const int DefaultToxiproxyPort = 8474; - private const string ImageName = "rabbitmq"; - private const string ImageTag = "3.11.3-management-alpine"; + private const string RabbitMQImageName = "rabbitmq"; + private const string RabbitMQImageTag = "3.11.3-management-alpine"; + private const string ToxiproxyImageName = "ghcr.io/shopify/toxiproxy"; + private const string ToxiproxyImageTag = "2.11.0"; public async Task InitializeAsync() { - // Testcontainers does not seem to pull the image the first time. - // I didn't get it to work, even using WithImagePullPolicy from the latest beta. - // Note: running it the first time can take a while. - var client = new DockerClientConfiguration().CreateClient(); - await client.Images.CreateImageAsync( - new ImagesCreateParameters - { - FromImage = ImageName, - Tag = ImageTag - }, - null, - new Progress()); + network = new NetworkBuilder() + .WithName(Guid.NewGuid().ToString("D")) + .Build(); // If you get a "Sequence contains no elements" error here: make sure Docker Desktop is running - var testcontainersBuilder = new TestcontainersBuilder() - .WithMessageBroker(new RabbitMqTestcontainerConfiguration($"{ImageName}:{ImageTag}") - { - Username = RabbitMQUsername, - Password = RabbitMQPassword - }) - .WithExposedPort(DefaultRabbitMQManagementPort) - .WithPortBinding(0, DefaultRabbitMQManagementPort); + // Yes, there is a RabbitMqBuilder in TestContainers, but this provides more control. + rabbitMQContainer = new ContainerBuilder() + .WithImage($"{RabbitMQImageName}:{RabbitMQImageTag}") + .WithEnvironment("RABBITMQ_DEFAULT_USER", RabbitMQUsername) + .WithEnvironment("RABBITMQ_DEFAULT_PASS", RabbitMQPassword) + .WithPortBinding(DefaultRabbitMQManagementPort, true) + .WithNetwork(network) + .WithNetworkAliases("rabbitmq") + .Build(); + + toxiproxyContainer = new ContainerBuilder() + .WithImage($"{ToxiproxyImageName}:{ToxiproxyImageTag}") + .WithPortBinding(DefaultToxiproxyPort, true) + .WithPortBinding(DefaultRabbitMQPort, true) + //.WithPortBinding(DefaultRabbitMQManagementPort, true) + .WithNetwork(network) + .Build(); + + await network.CreateAsync(); + await rabbitMQContainer!.StartAsync(); + await toxiproxyContainer!.StartAsync(); + + toxiproxyPort = toxiproxyContainer.GetMappedPublicPort(DefaultToxiproxyPort); + rabbitMQPort = toxiproxyContainer.GetMappedPublicPort(DefaultRabbitMQPort); + //rabbitMQManagementPort = toxiproxyContainer.GetMappedPublicPort(DefaultRabbitMQManagementPort); + rabbitMQManagementPort = rabbitMQContainer.GetMappedPublicPort(DefaultRabbitMQManagementPort); + + await InitializeProxy(); + } + - testcontainers = testcontainersBuilder.Build(); + /// + /// Acquires a connection to the RabbitMQ test Toxiproxy. Be sure to Dispose to prevent blocking other tests. + /// + /// + /// This method guarantees any Toxiproxy "toxics" do not carry over into other tests. + /// + public async Task AcquireProxy() + { + await acquireLimit.WaitAsync(); + await toxiproxyClient!.ResetAsync(); + + return new RabbitMQTestProxy(() => + { + acquireLimit.Release(); + }) + { + RabbitMQPort = rabbitMQPort, + RabbitMQManagementPort = rabbitMQManagementPort, + + RabbitMQProxy = rabbitMQProxy + //RabbitMQManagementProxy = rabbitMQManagementProxy + }; + } - await testcontainers!.StartAsync(); - RabbitMQPort = testcontainers.GetMappedPublicPort(DefaultRabbitMQPort); - RabbitMQManagementPort = testcontainers.GetMappedPublicPort(DefaultRabbitMQManagementPort); + private async Task InitializeProxy() + { + toxiproxyConnection = new Toxiproxy.Net.Connection("127.0.0.1", toxiproxyPort); + toxiproxyClient = toxiproxyConnection.Client(); + + rabbitMQProxy = await toxiproxyClient.AddAsync(new Proxy + { + Name = "RabbitMQ", + Enabled = true, + Listen = $"0.0.0.0:{DefaultRabbitMQPort}", + Upstream = $"rabbitmq:{DefaultRabbitMQPort}" + }); + + /* + rabbitMQManagementProxy = await toxiproxyClient.AddAsync(new Proxy + { + Name = "RabbitMQManagement", + Enabled = true, + Listen = $"0.0.0.0:{DefaultRabbitMQManagementPort}", + Upstream = $"rabbitmq:{DefaultRabbitMQManagementPort}" + }); + */ } public async Task DisposeAsync() { - if (testcontainers != null) - await testcontainers.DisposeAsync(); + toxiproxyConnection?.Dispose(); + + if (toxiproxyContainer != null) + await toxiproxyContainer.DisposeAsync(); + + if (rabbitMQContainer != null) + await rabbitMQContainer.DisposeAsync(); + + if (network != null) + await network.DeleteAsync(); + } + + + public sealed class RabbitMQTestProxy : IDisposable + { + public ushort RabbitMQPort { get; init; } + public ushort RabbitMQManagementPort { get; init; } + + public Proxy? RabbitMQProxy { get; init; } + + // Disabled for now since proxying HTTP to RabbitMQ does not seem to work with only Toxiproxy + //public Proxy? RabbitMQManagementProxy { get; init; } + + private Action? OnDispose { get; } + + + public RabbitMQTestProxy(Action? onDispose) + { + OnDispose = onDispose; + } + + + public void Dispose() + { + OnDispose?.Invoke(); + } } } } \ No newline at end of file diff --git a/Tapeti.Tests/Client/TapetiClientTests.cs b/Tapeti.Tests/Client/TapetiClientTests.cs index 3a3159b..4cd1bf8 100644 --- a/Tapeti.Tests/Client/TapetiClientTests.cs +++ b/Tapeti.Tests/Client/TapetiClientTests.cs @@ -1,4 +1,5 @@ -using System.Collections.Generic; +using System; +using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -20,6 +21,7 @@ public class TapetiClientTests : IAsyncLifetime private readonly RabbitMQFixture fixture; private readonly MockDependencyResolver dependencyResolver = new(); + private RabbitMQFixture.RabbitMQTestProxy proxy = null!; private TapetiClient client = null!; @@ -31,17 +33,17 @@ public TapetiClientTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHe } - public Task InitializeAsync() + public async Task InitializeAsync() { + proxy = await fixture.AcquireProxy(); client = CreateClient(); - - return Task.CompletedTask; } public async Task DisposeAsync() { await client.Close(); + proxy.Dispose(); } @@ -49,8 +51,8 @@ public async Task DisposeAsync() [Fact] public void Fixture() { - ((int)fixture.RabbitMQPort).ShouldBeGreaterThan(0); - ((int)fixture.RabbitMQManagementPort).ShouldBeGreaterThan(0); + ((int)proxy.RabbitMQPort).ShouldBeGreaterThan(0); + ((int)proxy.RabbitMQManagementPort).ShouldBeGreaterThan(0); } @@ -122,6 +124,13 @@ public async Task PublishHandleOverflow() } + [Fact] + public Task Reconnect() + { + throw new NotImplementedException(); + } + + // TODO test the other methods private RabbitMQ.Client.IConnection CreateRabbitMQClient() @@ -129,7 +138,7 @@ private RabbitMQ.Client.IConnection CreateRabbitMQClient() var connectionFactory = new ConnectionFactory { HostName = "127.0.0.1", - Port = fixture.RabbitMQPort, + Port = proxy.RabbitMQPort, UserName = RabbitMQFixture.RabbitMQUsername, Password = RabbitMQFixture.RabbitMQPassword, AutomaticRecoveryEnabled = false, @@ -147,8 +156,8 @@ private TapetiClient CreateClient() new TapetiConnectionParams { HostName = "127.0.0.1", - Port = fixture.RabbitMQPort, - ManagementPort = fixture.RabbitMQManagementPort, + Port = proxy.RabbitMQPort, + ManagementPort = proxy.RabbitMQManagementPort, Username = RabbitMQFixture.RabbitMQUsername, Password = RabbitMQFixture.RabbitMQPassword, PrefetchCount = 50 diff --git a/Tapeti.Tests/Config/QueueArgumentsTest.cs b/Tapeti.Tests/Config/QueueArgumentsTest.cs index 14664f7..d31cc88 100644 --- a/Tapeti.Tests/Config/QueueArgumentsTest.cs +++ b/Tapeti.Tests/Config/QueueArgumentsTest.cs @@ -4,7 +4,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -using Moq; +using NSubstitute; using Shouldly; using Tapeti.Config.Annotations; using Tapeti.Config; @@ -25,55 +25,54 @@ public static string AsUTF8String(this object value) public class QueueArgumentsTest : BaseControllerTest { - private static readonly MockRepository MoqRepository = new(MockBehavior.Strict); - - private readonly Mock client; + private readonly ITapetiClient client; private readonly Dictionary declaredQueues = new(); public QueueArgumentsTest() { - client = MoqRepository.Create(); - var routingKeyStrategy = MoqRepository.Create(); - var exchangeStrategy = MoqRepository.Create(); + client = Substitute.For(); + var routingKeyStrategy = Substitute.For(); + var exchangeStrategy = Substitute.For(); - DependencyResolver.Set(routingKeyStrategy.Object); - DependencyResolver.Set(exchangeStrategy.Object); + DependencyResolver.Set(routingKeyStrategy); + DependencyResolver.Set(exchangeStrategy); routingKeyStrategy - .Setup(s => s.GetRoutingKey(typeof(TestMessage1))) + .GetRoutingKey(typeof(TestMessage1)) .Returns("testmessage1"); routingKeyStrategy - .Setup(s => s.GetRoutingKey(typeof(TestMessage2))) + .GetRoutingKey(typeof(TestMessage2)) .Returns("testmessage2"); exchangeStrategy - .Setup(s => s.GetExchange(It.IsAny())) + .GetExchange(Arg.Any()) .Returns("exchange"); var queue = 0; client - .Setup(c => c.DynamicQueueDeclare(null, It.IsAny(), It.IsAny())) - .Callback((string _, IRabbitMQArguments arguments, CancellationToken _) => + .DynamicQueueDeclare(null, Arg.Any(), Arg.Any()) + .Returns(callInfo => { queue++; - declaredQueues.Add($"queue-{queue}", arguments); - }) - .ReturnsAsync(() => $"queue-{queue}"); + declaredQueues.Add($"queue-{queue}", callInfo.Arg()); + + return Task.FromResult($"queue-{queue}"); + }); client - .Setup(c => c.DurableQueueDeclare(It.IsAny(), It.IsAny>(), It.IsAny(), It.IsAny())) - .Callback((string queueName, IEnumerable _, IRabbitMQArguments arguments, CancellationToken _) => + .DurableQueueDeclare(Arg.Any(), Arg.Any>(), Arg.Any(), Arg.Any()) + .Returns(callInfo => { - declaredQueues.Add(queueName, arguments); - }) - .Returns(Task.CompletedTask); + declaredQueues.Add(callInfo.Arg(), callInfo.Arg()); + return Task.CompletedTask; + }); client - .Setup(c => c.DynamicQueueBind(It.IsAny(), It.IsAny(), It.IsAny())) + .DynamicQueueBind(Arg.Any(), Arg.Any(), Arg.Any()) .Returns(Task.CompletedTask); } @@ -91,7 +90,7 @@ public async Task SingleQueueArguments() - var subscriber = new TapetiSubscriber(() => client.Object, config); + var subscriber = new TapetiSubscriber(() => client, config); await subscriber.ApplyBindings(); @@ -112,7 +111,7 @@ public async Task ConflictingDynamicQueueArguments() { var config = GetControllerConfig(); - var subscriber = new TapetiSubscriber(() => client.Object, config); + var subscriber = new TapetiSubscriber(() => client, config); await subscriber.ApplyBindings(); declaredQueues.Count.ShouldBe(2); @@ -132,7 +131,7 @@ public async Task ConflictingDurableQueueArguments() var testApplyBindings = () => { - var subscriber = new TapetiSubscriber(() => client.Object, config); + var subscriber = new TapetiSubscriber(() => client, config); return subscriber.ApplyBindings(); }; diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index 6290394..7387117 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -10,14 +10,15 @@ - - - + + + - - - - + + + + + all runtime; build; native; contentfiles; analyzers From e7883bf090e34c16557d7469b16f90474ea55d97 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 19 Feb 2025 09:43:26 +0100 Subject: [PATCH 3/8] Fixed unit tests - Previous issues was due to not waiting for the RabbitMQ container to start up completely - Reconnect test implemented for client --- Tapeti.Tests/Client/RabbitMQFixture.cs | 22 +++++----- Tapeti.Tests/Client/TapetiClientTests.cs | 51 ++++++++++++++++++++++-- 2 files changed, 57 insertions(+), 16 deletions(-) diff --git a/Tapeti.Tests/Client/RabbitMQFixture.cs b/Tapeti.Tests/Client/RabbitMQFixture.cs index 8578308..38e0ff9 100644 --- a/Tapeti.Tests/Client/RabbitMQFixture.cs +++ b/Tapeti.Tests/Client/RabbitMQFixture.cs @@ -32,7 +32,7 @@ public sealed class RabbitMQFixture : IAsyncLifetime private Toxiproxy.Net.Connection? toxiproxyConnection; private Toxiproxy.Net.Client? toxiproxyClient; private Proxy? rabbitMQProxy; - //private Proxy? rabbitMQManagementProxy; + private Proxy? rabbitMQManagementProxy; private readonly SemaphoreSlim acquireLimit = new(1, 1); @@ -60,7 +60,8 @@ public async Task InitializeAsync() .WithImage($"{RabbitMQImageName}:{RabbitMQImageTag}") .WithEnvironment("RABBITMQ_DEFAULT_USER", RabbitMQUsername) .WithEnvironment("RABBITMQ_DEFAULT_PASS", RabbitMQPassword) - .WithPortBinding(DefaultRabbitMQManagementPort, true) + //.WithPortBinding(DefaultRabbitMQManagementPort, true) + .WithWaitStrategy(Wait.ForUnixContainer().UntilMessageIsLogged("Server startup complete")) .WithNetwork(network) .WithNetworkAliases("rabbitmq") .Build(); @@ -69,7 +70,7 @@ public async Task InitializeAsync() .WithImage($"{ToxiproxyImageName}:{ToxiproxyImageTag}") .WithPortBinding(DefaultToxiproxyPort, true) .WithPortBinding(DefaultRabbitMQPort, true) - //.WithPortBinding(DefaultRabbitMQManagementPort, true) + .WithPortBinding(DefaultRabbitMQManagementPort, true) .WithNetwork(network) .Build(); @@ -79,8 +80,7 @@ public async Task InitializeAsync() toxiproxyPort = toxiproxyContainer.GetMappedPublicPort(DefaultToxiproxyPort); rabbitMQPort = toxiproxyContainer.GetMappedPublicPort(DefaultRabbitMQPort); - //rabbitMQManagementPort = toxiproxyContainer.GetMappedPublicPort(DefaultRabbitMQManagementPort); - rabbitMQManagementPort = rabbitMQContainer.GetMappedPublicPort(DefaultRabbitMQManagementPort); + rabbitMQManagementPort = toxiproxyContainer.GetMappedPublicPort(DefaultRabbitMQManagementPort); await InitializeProxy(); } @@ -105,8 +105,8 @@ public async Task AcquireProxy() RabbitMQPort = rabbitMQPort, RabbitMQManagementPort = rabbitMQManagementPort, - RabbitMQProxy = rabbitMQProxy - //RabbitMQManagementProxy = rabbitMQManagementProxy + RabbitMQProxy = rabbitMQProxy!, + RabbitMQManagementProxy = rabbitMQManagementProxy! }; } @@ -124,7 +124,6 @@ private async Task InitializeProxy() Upstream = $"rabbitmq:{DefaultRabbitMQPort}" }); - /* rabbitMQManagementProxy = await toxiproxyClient.AddAsync(new Proxy { Name = "RabbitMQManagement", @@ -132,7 +131,6 @@ private async Task InitializeProxy() Listen = $"0.0.0.0:{DefaultRabbitMQManagementPort}", Upstream = $"rabbitmq:{DefaultRabbitMQManagementPort}" }); - */ } @@ -156,10 +154,8 @@ public sealed class RabbitMQTestProxy : IDisposable public ushort RabbitMQPort { get; init; } public ushort RabbitMQManagementPort { get; init; } - public Proxy? RabbitMQProxy { get; init; } - - // Disabled for now since proxying HTTP to RabbitMQ does not seem to work with only Toxiproxy - //public Proxy? RabbitMQManagementProxy { get; init; } + public Proxy RabbitMQProxy { get; init; } = null!; + public Proxy RabbitMQManagementProxy { get; init; } = null!; private Action? OnDispose { get; } diff --git a/Tapeti.Tests/Client/TapetiClientTests.cs b/Tapeti.Tests/Client/TapetiClientTests.cs index 4cd1bf8..32d8a72 100644 --- a/Tapeti.Tests/Client/TapetiClientTests.cs +++ b/Tapeti.Tests/Client/TapetiClientTests.cs @@ -3,6 +3,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using NSubstitute; using RabbitMQ.Client; using Shouldly; using Tapeti.Connection; @@ -19,15 +20,18 @@ namespace Tapeti.Tests.Client public class TapetiClientTests : IAsyncLifetime { private readonly RabbitMQFixture fixture; + private readonly ITestOutputHelper testOutputHelper; private readonly MockDependencyResolver dependencyResolver = new(); private RabbitMQFixture.RabbitMQTestProxy proxy = null!; private TapetiClient client = null!; + private readonly IConnectionEventListener connectionEventListener = Substitute.For(); public TapetiClientTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper) { this.fixture = fixture; + this.testOutputHelper = testOutputHelper; dependencyResolver.Set(new MockLogger(testOutputHelper)); } @@ -125,9 +129,50 @@ public async Task PublishHandleOverflow() [Fact] - public Task Reconnect() + public async Task Reconnect() { - throw new NotImplementedException(); + var disconnectedCompletion = new TaskCompletionSource(); + var reconnectedCompletion = new TaskCompletionSource(); + + connectionEventListener + .When(c => c.Disconnected(Arg.Any())) + .Do(_ => + { + testOutputHelper.WriteLine("Disconnected event triggered"); + disconnectedCompletion.TrySetResult(); + }); + + connectionEventListener + .When(c => c.Reconnected(Arg.Any())) + .Do(_ => + { + testOutputHelper.WriteLine("Reconnected event triggered"); + reconnectedCompletion.TrySetResult(); + }); + + // Trigger the connection to be established + await client.Publish(Encoding.UTF8.GetBytes("hello, void!"), new MessageProperties(), "nowhere", "nobody", false); + + + proxy.RabbitMQProxy.Enabled = false; + await proxy.RabbitMQProxy.UpdateAsync(); + + + await WithTimeout(disconnectedCompletion.Task, TimeSpan.FromSeconds(60)); + + proxy.RabbitMQProxy.Enabled = true; + await proxy.RabbitMQProxy.UpdateAsync(); + + await WithTimeout(reconnectedCompletion.Task, TimeSpan.FromSeconds(60)); + } + + + + private static async Task WithTimeout(Task task, TimeSpan timeout) + { + var timeoutTask = Task.Delay(timeout); + if (await Task.WhenAny(task, timeoutTask) == timeoutTask) + throw new TimeoutException("Task took too long to complete"); } @@ -162,7 +207,7 @@ private TapetiClient CreateClient() Password = RabbitMQFixture.RabbitMQPassword, PrefetchCount = 50 }, - null); + connectionEventListener); } } } \ No newline at end of file From ba8c8af02c3871315777e06e7c924281544499c9 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 19 Feb 2025 11:00:19 +0100 Subject: [PATCH 4/8] Added unit test for resubscribing to controllers on reconnect --- .../Client/Controller/ReconnectController.cs | 78 +++++++++++++++++++ Tapeti.Tests/Client/ControllerTests.cs | 63 ++++++++++++++- Tapeti.Tests/Client/TapetiClientTests.cs | 14 +--- Tapeti.Tests/Helpers/TaskExtensions.cs | 15 ++++ Tapeti.Tests/Tapeti.Tests.csproj | 3 +- 5 files changed, 160 insertions(+), 13 deletions(-) create mode 100644 Tapeti.Tests/Client/Controller/ReconnectController.cs create mode 100644 Tapeti.Tests/Helpers/TaskExtensions.cs diff --git a/Tapeti.Tests/Client/Controller/ReconnectController.cs b/Tapeti.Tests/Client/Controller/ReconnectController.cs new file mode 100644 index 0000000..160388d --- /dev/null +++ b/Tapeti.Tests/Client/Controller/ReconnectController.cs @@ -0,0 +1,78 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Nito.AsyncEx; +using Tapeti.Config.Annotations; +using Tapeti.Tests.Helpers; +using Xunit.Abstractions; +using Xunit.Sdk; + +namespace Tapeti.Tests.Client.Controller +{ + public class ReconnectDurableMessage + { + public int Number; + } + + + public class ReconnectDynamicMessage + { + public int Number; + } + + + [MessageController] + public class ReconnectController + { + private readonly ITestOutputHelper testOutputHelper; + private static bool durableBlock = true; + private static readonly AsyncAutoResetEvent DurableMessageReceived = new(); + private static readonly AsyncAutoResetEvent DynamicMessageReceived = new (); + + + [NoBinding] + public static void SetBlockDurableMessage(bool block) + { + durableBlock = block; + } + + + [NoBinding] + public static async Task WaitForDurableMessage() + { + await DurableMessageReceived.WaitAsync().WithTimeout(TimeSpan.FromSeconds(10)); + } + + + [NoBinding] + public static async Task WaitForDynamicMessage() + { + await DynamicMessageReceived.WaitAsync().WithTimeout(TimeSpan.FromSeconds(10)); + } + + + public ReconnectController(ITestOutputHelper testOutputHelper) + { + this.testOutputHelper = testOutputHelper; + } + + + [DurableQueue("reconnect.durable")] + public async Task DurableMessage(ReconnectDurableMessage message, CancellationToken cancellationToken) + { + testOutputHelper.WriteLine($"- Received message {message.Number} in durable queue"); + DurableMessageReceived.Set(); + + if (durableBlock) + await Task.Delay(Timeout.Infinite, cancellationToken); + } + + + [DynamicQueue("reconnect.dynamic")] + public void NoWaitMessage(ReconnectDynamicMessage message) + { + testOutputHelper.WriteLine($"- Received message {message.Number} in dynamic queue"); + DynamicMessageReceived.Set(); + } + } +} diff --git a/Tapeti.Tests/Client/ControllerTests.cs b/Tapeti.Tests/Client/ControllerTests.cs index 82e1745..f6df326 100644 --- a/Tapeti.Tests/Client/ControllerTests.cs +++ b/Tapeti.Tests/Client/ControllerTests.cs @@ -5,9 +5,11 @@ using Tapeti.Config; using Tapeti.SimpleInjector; using Tapeti.Tests.Client.Controller; +using Tapeti.Tests.Helpers; using Tapeti.Tests.Mock; using Xunit; using Xunit.Abstractions; +using Xunit.Sdk; namespace Tapeti.Tests.Client { @@ -16,6 +18,7 @@ namespace Tapeti.Tests.Client public class ControllerTests : IAsyncLifetime { private readonly RabbitMQFixture fixture; + private readonly ITestOutputHelper testOutputHelper; private readonly Container container = new(); private TapetiConnection? connection; @@ -25,6 +28,7 @@ public class ControllerTests : IAsyncLifetime public ControllerTests(RabbitMQFixture fixture, ITestOutputHelper testOutputHelper) { this.fixture = fixture; + this.testOutputHelper = testOutputHelper; container.RegisterInstance(new MockLogger(testOutputHelper)); container.RegisterInstance(testOutputHelper); @@ -81,7 +85,7 @@ public async Task DedicatedChannel() .RegisterController() .Build(); - connection = CreateConnection(config, 50, 2); + connection = CreateConnection(config); await connection!.Subscribe(); @@ -98,6 +102,63 @@ public async Task DedicatedChannel() } + [Fact] + public async Task Reconnect() + { + var config = new TapetiConfig(new SimpleInjectorDependencyResolver(container)) + .EnableDeclareDurableQueues() + .RegisterController() + .Build(); + + connection = CreateConnection(config); + + var disconnectedCompletion = new TaskCompletionSource(); + var reconnectedCompletion = new TaskCompletionSource(); + + connection.Disconnected += (_, _) => disconnectedCompletion.TrySetResult(); + connection.Reconnected += (_, _) => reconnectedCompletion.TrySetResult(); + + await connection.Subscribe(); + + + ReconnectController.SetBlockDurableMessage(true); + await connection.GetPublisher().Publish(new ReconnectDurableMessage { Number = 1 }); + await connection.GetPublisher().Publish(new ReconnectDynamicMessage { Number = 1 }); + + // Both messages should arrive. The message for the durable queue will not be acked. + testOutputHelper.WriteLine("> Waiting for initial messages"); + await Task.WhenAll(ReconnectController.WaitForDurableMessage(), ReconnectController.WaitForDynamicMessage()); + + + testOutputHelper.WriteLine("> Disabling proxy"); + proxy.RabbitMQProxy.Enabled = false; + await proxy.RabbitMQProxy.UpdateAsync(); + + await disconnectedCompletion.Task.WithTimeout(TimeSpan.FromSeconds(60)); + + + testOutputHelper.WriteLine("> Re-enabling proxy"); + ReconnectController.SetBlockDurableMessage(false); + + + proxy.RabbitMQProxy.Enabled = true; + await proxy.RabbitMQProxy.UpdateAsync(); + + await reconnectedCompletion.Task.WithTimeout(TimeSpan.FromSeconds(60)); + + + // Message in the durable queue should be delivered again + testOutputHelper.WriteLine("> Waiting for durable message redelivery"); + await ReconnectController.WaitForDurableMessage(); + + + // Dynamic queue is of course empty but should be recreated + testOutputHelper.WriteLine("> Sending and waiting for dynamic message"); + await connection.GetPublisher().Publish(new ReconnectDynamicMessage { Number = 2 }); + await ReconnectController.WaitForDynamicMessage(); + } + + private TapetiConnection CreateConnection(ITapetiConfig config, ushort prefetchCount = 1, int? consumerDispatchConcurrency = null) { return new TapetiConnection(config) diff --git a/Tapeti.Tests/Client/TapetiClientTests.cs b/Tapeti.Tests/Client/TapetiClientTests.cs index 32d8a72..4eabaf4 100644 --- a/Tapeti.Tests/Client/TapetiClientTests.cs +++ b/Tapeti.Tests/Client/TapetiClientTests.cs @@ -9,6 +9,7 @@ using Tapeti.Connection; using Tapeti.Default; using Tapeti.Exceptions; +using Tapeti.Tests.Helpers; using Tapeti.Tests.Mock; using Xunit; using Xunit.Abstractions; @@ -158,21 +159,12 @@ public async Task Reconnect() await proxy.RabbitMQProxy.UpdateAsync(); - await WithTimeout(disconnectedCompletion.Task, TimeSpan.FromSeconds(60)); + await disconnectedCompletion.Task.WithTimeout(TimeSpan.FromSeconds(60)); proxy.RabbitMQProxy.Enabled = true; await proxy.RabbitMQProxy.UpdateAsync(); - await WithTimeout(reconnectedCompletion.Task, TimeSpan.FromSeconds(60)); - } - - - - private static async Task WithTimeout(Task task, TimeSpan timeout) - { - var timeoutTask = Task.Delay(timeout); - if (await Task.WhenAny(task, timeoutTask) == timeoutTask) - throw new TimeoutException("Task took too long to complete"); + await reconnectedCompletion.Task.WithTimeout(TimeSpan.FromSeconds(60)); } diff --git a/Tapeti.Tests/Helpers/TaskExtensions.cs b/Tapeti.Tests/Helpers/TaskExtensions.cs new file mode 100644 index 0000000..e3ef0da --- /dev/null +++ b/Tapeti.Tests/Helpers/TaskExtensions.cs @@ -0,0 +1,15 @@ +using System.Threading.Tasks; +using System; + +namespace Tapeti.Tests.Helpers +{ + internal static class TaskExtensions + { + public static async Task WithTimeout(this Task task, TimeSpan timeout) + { + var timeoutTask = Task.Delay(timeout); + if (await Task.WhenAny(task, timeoutTask) == timeoutTask) + throw new TimeoutException("Task took too long to complete"); + } + } +} diff --git a/Tapeti.Tests/Tapeti.Tests.csproj b/Tapeti.Tests/Tapeti.Tests.csproj index 7387117..3a1f38f 100644 --- a/Tapeti.Tests/Tapeti.Tests.csproj +++ b/Tapeti.Tests/Tapeti.Tests.csproj @@ -1,7 +1,7 @@ - net6.0;net7.0 + net6.0;net7.0;net8.0 enable @@ -12,6 +12,7 @@ + From aa75b0354752dd3bb8856991481a834611e32aad Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Fri, 21 Feb 2025 14:21:05 +0100 Subject: [PATCH 5/8] Added dedicated channel to Reconnect test --- .../Client/Controller/ReconnectController.cs | 28 +++++++++++++++++-- Tapeti.Tests/Client/ControllerTests.cs | 6 ++-- 2 files changed, 28 insertions(+), 6 deletions(-) diff --git a/Tapeti.Tests/Client/Controller/ReconnectController.cs b/Tapeti.Tests/Client/Controller/ReconnectController.cs index 160388d..9f5ab3d 100644 --- a/Tapeti.Tests/Client/Controller/ReconnectController.cs +++ b/Tapeti.Tests/Client/Controller/ReconnectController.cs @@ -15,6 +15,12 @@ public class ReconnectDurableMessage } + public class ReconnectDurableDedicatedMessage + { + public int Number; + } + + public class ReconnectDynamicMessage { public int Number; @@ -27,6 +33,7 @@ public class ReconnectController private readonly ITestOutputHelper testOutputHelper; private static bool durableBlock = true; private static readonly AsyncAutoResetEvent DurableMessageReceived = new(); + private static readonly AsyncAutoResetEvent DurableDedicatedMessageReceived = new(); private static readonly AsyncAutoResetEvent DynamicMessageReceived = new (); @@ -38,9 +45,12 @@ public static void SetBlockDurableMessage(bool block) [NoBinding] - public static async Task WaitForDurableMessage() + public static async Task WaitForDurableMessages() { - await DurableMessageReceived.WaitAsync().WithTimeout(TimeSpan.FromSeconds(10)); + await Task.WhenAll( + DurableMessageReceived.WaitAsync(), + DurableDedicatedMessageReceived.WaitAsync() + ).WithTimeout(TimeSpan.FromSeconds(10)); } @@ -67,7 +77,19 @@ public async Task DurableMessage(ReconnectDurableMessage message, CancellationTo await Task.Delay(Timeout.Infinite, cancellationToken); } - + + [DurableQueue("reconnect.durable.dedicated")] + [DedicatedChannel] + public async Task DurableMessage(ReconnectDurableDedicatedMessage message, CancellationToken cancellationToken) + { + testOutputHelper.WriteLine($"- Received message {message.Number} in durable queue on dedicated channel"); + DurableDedicatedMessageReceived.Set(); + + if (durableBlock) + await Task.Delay(Timeout.Infinite, cancellationToken); + } + + [DynamicQueue("reconnect.dynamic")] public void NoWaitMessage(ReconnectDynamicMessage message) { diff --git a/Tapeti.Tests/Client/ControllerTests.cs b/Tapeti.Tests/Client/ControllerTests.cs index f6df326..777e32a 100644 --- a/Tapeti.Tests/Client/ControllerTests.cs +++ b/Tapeti.Tests/Client/ControllerTests.cs @@ -9,7 +9,6 @@ using Tapeti.Tests.Mock; using Xunit; using Xunit.Abstractions; -using Xunit.Sdk; namespace Tapeti.Tests.Client { @@ -123,11 +122,12 @@ public async Task Reconnect() ReconnectController.SetBlockDurableMessage(true); await connection.GetPublisher().Publish(new ReconnectDurableMessage { Number = 1 }); + await connection.GetPublisher().Publish(new ReconnectDurableDedicatedMessage { Number = 1 }); await connection.GetPublisher().Publish(new ReconnectDynamicMessage { Number = 1 }); // Both messages should arrive. The message for the durable queue will not be acked. testOutputHelper.WriteLine("> Waiting for initial messages"); - await Task.WhenAll(ReconnectController.WaitForDurableMessage(), ReconnectController.WaitForDynamicMessage()); + await Task.WhenAll(ReconnectController.WaitForDurableMessages(), ReconnectController.WaitForDynamicMessage()); testOutputHelper.WriteLine("> Disabling proxy"); @@ -149,7 +149,7 @@ public async Task Reconnect() // Message in the durable queue should be delivered again testOutputHelper.WriteLine("> Waiting for durable message redelivery"); - await ReconnectController.WaitForDurableMessage(); + await ReconnectController.WaitForDurableMessages(); // Dynamic queue is of course empty but should be recreated From 2a0cb14cdea19a764551afa47d0c09d5dcffc6a9 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 24 Mar 2025 10:36:54 +0100 Subject: [PATCH 6/8] Fixed ReadTheDocs yaml --- .readthedocs.yaml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 9db0418..29f1593 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -1,9 +1,9 @@ -version 2 +version: 2 -build - os ubuntu-22.04 - tools - python 3.12 +build: + os: ubuntu-22.04 + tools: + python: 3.12 -sphinx - configuration docs/conf.py \ No newline at end of file +sphinx: + configuration: docs/conf.py \ No newline at end of file From 572ef8e8436fe741a2345d242dc8f81eb7d73599 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Mon, 24 Mar 2025 10:53:04 +0100 Subject: [PATCH 7/8] Fixed ReadTheDocs yaml (for realsies) --- .readthedocs.yaml | 10 +++++++--- docs/conf.py | 4 +++- docs/requirements.txt | 1 + 3 files changed, 11 insertions(+), 4 deletions(-) create mode 100644 docs/requirements.txt diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 29f1593..da70fdb 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -1,9 +1,13 @@ version: 2 build: - os: ubuntu-22.04 + os: "ubuntu-22.04" tools: - python: 3.12 + python: "3.12" sphinx: - configuration: docs/conf.py \ No newline at end of file + configuration: "docs/conf.py" + +python: + install: + - requirements: "docs/requirements.txt" \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index 17d1913..57a5767 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -30,7 +30,9 @@ # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = [] +extensions = [ + 'sphinx_rtd_theme' +] # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] diff --git a/docs/requirements.txt b/docs/requirements.txt new file mode 100644 index 0000000..5614a3c --- /dev/null +++ b/docs/requirements.txt @@ -0,0 +1 @@ +sphinx_rtd_theme==3.0.2 \ No newline at end of file From ebf22e1dc77923fd80425ebc899c8d0010ece0f4 Mon Sep 17 00:00:00 2001 From: Mark van Renswoude Date: Wed, 26 Mar 2025 10:34:14 +0100 Subject: [PATCH 8/8] Documentation updates --- docs/README.md | 11 +++- docs/attributes.rst | 113 +++++++++++++++++++++++++++++++++++++++ docs/conf.py | 5 +- docs/docker-bootstrap.sh | 35 ++++++++++++ docs/docker-build.bat | 1 + docs/docker-run.bat | 1 + docs/dockerfile | 13 +++++ docs/gettingstarted.rst | 2 - docs/indepth.rst | 9 ++-- docs/index.rst | 1 + 10 files changed, 182 insertions(+), 9 deletions(-) create mode 100644 docs/attributes.rst create mode 100644 docs/docker-bootstrap.sh create mode 100644 docs/docker-build.bat create mode 100644 docs/docker-run.bat create mode 100644 docs/dockerfile diff --git a/docs/README.md b/docs/README.md index ccea1cc..5989e26 100644 --- a/docs/README.md +++ b/docs/README.md @@ -11,4 +11,13 @@ To build the HTML output, run: To use the auto reloading server (rundev.bat), install the sphinx-autobuild package: -```pip install sphinx-autobuild``` \ No newline at end of file +```pip install sphinx-autobuild``` + + +Alternatively, use Docker. + +To build the image (only required the first time or when requirements.txt changes): +```docker-build.bat``` + +To run a local server: +```docker-run.bat``` \ No newline at end of file diff --git a/docs/attributes.rst b/docs/attributes.rst new file mode 100644 index 0000000..8601ff4 --- /dev/null +++ b/docs/attributes.rst @@ -0,0 +1,113 @@ +Attributes +========== + +As described in previous chapters, Tapeti uses attributes to handle configuration which can differ per controller, queue or message. This chapter provides a reference, as well as documentation for some of the more advanced attributes. + +There are two types of attributes used by Tapeti: message annotations and client side configuration. + +Message annotations +------------------- + +The attributes control how a message should be published or consumed. They are typically referenced in an assembly containing the message classes which is shared between the sending and receiving services. + +These annotations are part of the separate Tapeti.Annotations NuGet package. + +Mandatory +^^^^^^^^^ +Indicates the message must be handled by at least one queue. An exception will be thrown on the publishing side if there is no route for the message. + +See: :ref:`mandatory` + +:: + + [Mandatory] + public class SomeoneHandleMeMessage + { + } + +Request +^^^^^^^ +Indicates the message is a request which expects a response. An exception will be thrown on the consuming side if the message handler does not return the specified response type. + +See: :ref:`requestresponse` + +:: + + [Request(Response = typeof(BunnyCountResponseMessage))] + public class BunnyCountRequestMessage + { + public string ColorFilter { get; set; } + } + + public class BunnyCountResponseMessage + { + public int Count { get; set; } + } + + +RoutingKey +^^^^^^^^^^ +Modifies the default routing key that is generated for this message. Either specify the 'Full' property to override the routing key completely, or specify a 'Prefix' and/or 'Postfix'. + +.. note:: + If a non-default IRoutingKeyStrategy is used, it's implementation must explicitly support this attribute for it to have an effect, preferably by using Tapeti.Default.RoutingKeyHelper. + +:: + + [RoutingKey(Full = "modified.routing.key")] + public class SomeMessage + { + } + + +Client configuration +-------------------- + +These attributes control how the Tapeti client handles messages. + +DedicatedChannel +^^^^^^^^^^^^^^^^ +Requests a dedicated RabbitMQ Channel for consuming messages from the queue. + +The DedicatedChannel attribute can be applied to any controller or method and will apply to the queue +that is used in that context. It does not need be applied to all message handlers for that queue to have +an effect. + +The intended use case is for high-traffic message handlers, or message handlers which can block for either +a long time or indefinitely for throttling purposes. These can clog up the channel's workers and impact +other queues. + +DurableQueue +^^^^^^^^^^^^ +Binds to an existing durable queue to receive messages. Can be used on an entire MessageController class or on individual methods. + +DynamicQueue +^^^^^^^^^^^^ +Creates a non-durable auto-delete queue to receive messages. Can be used on an entire MessageController class or on individual methods. + +MessageController +^^^^^^^^^^^^^^^^^ +Attaching this attribute to a class includes it in the auto-discovery of message controllers +when using the RegisterAllControllers method. It is not required when manually registering a controller. + +NoBinding +^^^^^^^^^ +Indicates that the method is not a message handler and should not be bound by Tapeti. + +QueueArguments +^^^^^^^^^^^^^^ +Specifies the optional queue arguments (also known as 'x-arguments') used when declaring +the queue. + +The QueueArguments attribute can be applied to any controller or method and will affect the queue +that is used in that context. For durable queues, at most one QueueArguments attribute can be specified +per unique queue name. + +Also note that queue arguments can not be changed after a queue is declared. You should declare a new queue +and make the old one Obsolete to have Tapeti automatically removed it once it is empty. Tapeti will use the +existing queue, but log a warning at startup time. + +ResponseHandler +^^^^^^^^^^^^^^^ +Indicates that the method only handles response messages which are sent directly +to the queue. No binding will be created. \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index 57a5767..04af19e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -65,7 +65,7 @@ # # This is also used if you do content translation via gettext catalogs. # Usually you set "language" from the command line for these cases. -language = None +language = 'en' # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. @@ -86,7 +86,6 @@ # import sphinx_rtd_theme html_theme = 'sphinx_rtd_theme' -html_theme_path = [sphinx_rtd_theme.get_html_theme_path()] # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the @@ -97,7 +96,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = [] # -- Options for HTMLHelp output ------------------------------------------ diff --git a/docs/docker-bootstrap.sh b/docs/docker-bootstrap.sh new file mode 100644 index 0000000..f7c180d --- /dev/null +++ b/docs/docker-bootstrap.sh @@ -0,0 +1,35 @@ +# +# For use inside the Docker container +# + +# We can't bind to 0.0.0.0 since we want to bind to the same port as sphinx-autobuild +CONTAINER_IP=$(ip -4 -o addr show eth0 | awk '{print $4}' | cut -d'/' -f1) + +cat < /etc/nginx/sites-available/default.conf +map \$http_upgrade \$connection_upgrade { + default upgrade; + '' close; +} + +server { + listen $CONTAINER_IP:8000 default_server; + + location / { + proxy_pass http://127.0.0.1:8000; + proxy_http_version 1.1; + proxy_set_header Upgrade \$http_upgrade; + proxy_set_header Connection \$connection_upgrade; + } +} +EOF + +ln -s /etc/nginx/sites-available/default.conf /etc/nginx/sites-enabled/default.conf + +echo ">> /etc/nginx/sites-available/default.conf" +cat /etc/nginx/sites-available/default.conf +echo "<<" +echo "" + +nginx -t +service nginx start +sphinx-autobuild /docs ./_build/html -N \ No newline at end of file diff --git a/docs/docker-build.bat b/docs/docker-build.bat new file mode 100644 index 0000000..64cb9c4 --- /dev/null +++ b/docs/docker-build.bat @@ -0,0 +1 @@ +docker build -t tapetirtd . \ No newline at end of file diff --git a/docs/docker-run.bat b/docs/docker-run.bat new file mode 100644 index 0000000..472de7c --- /dev/null +++ b/docs/docker-run.bat @@ -0,0 +1 @@ +docker run -it --rm -v .:/docs -p 8000:8000 --name tapetirtd tapetirtd \ No newline at end of file diff --git a/docs/dockerfile b/docs/dockerfile new file mode 100644 index 0000000..2cbc00a --- /dev/null +++ b/docs/dockerfile @@ -0,0 +1,13 @@ +FROM python:3-bookworm + +WORKDIR /usr/src/app + +RUN pip install --no-cache-dir sphinx sphinx-autobuild +RUN apt-get update +RUN apt-get install -y nginx iproute2 + +COPY docker-bootstrap.sh ./ +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +CMD /usr/bin/bash -c /usr/src/app/docker-bootstrap.sh \ No newline at end of file diff --git a/docs/gettingstarted.rst b/docs/gettingstarted.rst index 98dbeb3..fe34412 100644 --- a/docs/gettingstarted.rst +++ b/docs/gettingstarted.rst @@ -15,8 +15,6 @@ You will need an integration package as well for your IoC (Inversion of Control) - `Autofac `_ (Tapeti.Autofac) - `Castle Windsor `_ (Tapeti.CastleWindsor) - `Ninject `_ (Tapeti.Ninject) -- `Unity `_ (Tapeti.UnityContainer) - SimpleInjector is used in all examples. The "01-PublishSubscribe" example included in the source shows how the other integration packages can be used. diff --git a/docs/indepth.rst b/docs/indepth.rst index 8b68d6b..d06e8ca 100644 --- a/docs/indepth.rst +++ b/docs/indepth.rst @@ -122,6 +122,7 @@ If all message handlers bound to a durable queue are marked as obsolete, includi If there are still messages in the queue it's pending removal will be logged but the consumers will run as normal to empty the queue. The queue will then remain until it is checked again when the application is restarted. +.. _requestresponse: Request - response ------------------ @@ -171,11 +172,13 @@ If you simply want to broadcast an event in response to a message, do not use th In practise your service may end up with the same message having two versions; one where a reply is expected and one where it's not. This is not considered a design flaw but a clear contract between services. It is common and recommended for the request message to inherit from the base non-request version, and implement two message handlers that internally perform the same logic. -While designing Tapeti this difference has been defined as `Transfer of responsibility`_ which is explained below. +While designing Tapeti this difference has been defined as :ref:`mandatory` which is explained below. -Transfer of responsibility --------------------------- +.. _mandatory: + +Transfer of responsibility (mandatory messages) +----------------------------------------------- When working with microservices there will be dependencies between services. Sometimes the dependency should be on the consumer side, which is the classic publish-subscribe pattern. For example, a reporting service will often listen in on status updates from various other services to compose a combined report. The services producing the events simply broadcast the message without concerning who if anyone is listening. diff --git a/docs/index.rst b/docs/index.rst index c88d375..d9fb361 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -9,6 +9,7 @@ Tapeti documentation gettingstarted compatibility indepth + attributes dataannotations flow transient