From eb5dd2ae30dced9c6f192fb0d47793e5ebe68091 Mon Sep 17 00:00:00 2001 From: BEagle1984 Date: Fri, 19 May 2023 12:10:30 +0200 Subject: [PATCH] fix: properly handle acknowledge in OutboxWorker --- Directory.Build.props | 2 +- docs/releases.md | 6 +++ .../TransactionalOutbox/OutboxWorker.cs | 38 +++++++++---------- .../TransactionalOutbox/OutboxWorkerTests.cs | 3 +- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/Directory.Build.props b/Directory.Build.props index d2701737e..56a936810 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 4.3.1$(BaseVersionSuffix) + 4.3.2$(BaseVersionSuffix) 1 $(BaseVersionSuffix) diff --git a/docs/releases.md b/docs/releases.md index 8fd76d87d..4dad15a22 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -4,6 +4,12 @@ uid: releases # Releases +## [4.3.2](https://github.com/BEagle1984/silverback/releases/tag/v4.3.2) + +### Fixes + +* Fix potential message loss in `OutboxWorker` + ## [4.3.1](https://github.com/BEagle1984/silverback/releases/tag/v4.3.1) ### Fixes diff --git a/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxWorker.cs b/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxWorker.cs index 4bf9a58d1..f7f33b456 100644 --- a/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxWorker.cs +++ b/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxWorker.cs @@ -130,7 +130,7 @@ protected virtual Task ProduceMessageAsync( private static async Task AcknowledgeAllAsync( IOutboxReader outboxReader, - List messages, + IEnumerable messages, ConcurrentBag failedMessages) { await outboxReader.RetryAsync(failedMessages).ConfigureAwait(false); @@ -157,6 +157,8 @@ private async Task ProcessQueueAsync( return; } + var lastProduced = -1; + try { Interlocked.Add(ref _pendingProduceOperations, outboxMessages.Count); @@ -165,34 +167,28 @@ private async Task ProcessQueueAsync( { _logger.LogProcessingOutboxStoredMessage(i + 1, outboxMessages.Count); - try - { - await ProcessMessageAsync( - outboxMessages[i], - failedMessages, - outboxReader, - serviceProvider) - .ConfigureAwait(false); - } - catch (Exception) - { - // Subtract the produce operations that will never be initiated - Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - i - 1)); - throw; - } + lastProduced = i; + await ProcessMessageAsync( + outboxMessages[i], + failedMessages, + outboxReader, + serviceProvider) + .ConfigureAwait(false); if (stoppingToken.IsCancellationRequested) - { - // Subtract the produce operations that will never be initiated - Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - i - 1)); break; - } } } finally { + if (lastProduced < outboxMessages.Count - 1) + { + // Subtract the produce operations that will never be initiated + Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - lastProduced - 1)); + } + await WaitAllAsync().ConfigureAwait(false); - await AcknowledgeAllAsync(outboxReader, outboxMessages, failedMessages).ConfigureAwait(false); + await AcknowledgeAllAsync(outboxReader, outboxMessages.Take(lastProduced + 1), failedMessages).ConfigureAwait(false); } } diff --git a/tests/Silverback.Integration.Tests/Messaging/Outbound/TransactionalOutbox/OutboxWorkerTests.cs b/tests/Silverback.Integration.Tests/Messaging/Outbound/TransactionalOutbox/OutboxWorkerTests.cs index a4058a125..f5f526f58 100644 --- a/tests/Silverback.Integration.Tests/Messaging/Outbound/TransactionalOutbox/OutboxWorkerTests.cs +++ b/tests/Silverback.Integration.Tests/Messaging/Outbound/TransactionalOutbox/OutboxWorkerTests.cs @@ -213,10 +213,11 @@ await _outboxWriter.WriteAsync( await _worker.ProcessQueueAsync(CancellationToken.None); - _broker.ProducedMessages.Should().HaveCount(3); + _broker.ProducedMessages.Should().HaveCount(4); _broker.ProducedMessages[0].Endpoint.Name.Should().Be("topic1"); _broker.ProducedMessages[1].Endpoint.Name.Should().Be("topic2"); _broker.ProducedMessages[2].Endpoint.Name.Should().Be("topic1"); + _broker.ProducedMessages[3].Endpoint.Name.Should().Be("topic1"); } } }