Skip to content

Commit

Permalink
fix: properly handle acknowledge in OutboxWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed May 19, 2023
1 parent 37f06ae commit eb5dd2a
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 23 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix></BaseVersionSuffix>
<BaseVersion>4.3.1$(BaseVersionSuffix)</BaseVersion>
<BaseVersion>4.3.2$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
</PropertyGroup>
Expand Down
6 changes: 6 additions & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ uid: releases

# Releases

## [4.3.2](https://github.com/BEagle1984/silverback/releases/tag/v4.3.2)

### Fixes

* Fix potential message loss in `OutboxWorker`

## [4.3.1](https://github.com/BEagle1984/silverback/releases/tag/v4.3.1)

### Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected virtual Task ProduceMessageAsync(

private static async Task AcknowledgeAllAsync(
IOutboxReader outboxReader,
List<OutboxStoredMessage> messages,
IEnumerable<OutboxStoredMessage> messages,
ConcurrentBag<OutboxStoredMessage> failedMessages)
{
await outboxReader.RetryAsync(failedMessages).ConfigureAwait(false);
Expand All @@ -157,6 +157,8 @@ private async Task ProcessQueueAsync(
return;
}

var lastProduced = -1;

try
{
Interlocked.Add(ref _pendingProduceOperations, outboxMessages.Count);
Expand All @@ -165,34 +167,28 @@ private async Task ProcessQueueAsync(
{
_logger.LogProcessingOutboxStoredMessage(i + 1, outboxMessages.Count);

try
{
await ProcessMessageAsync(
outboxMessages[i],
failedMessages,
outboxReader,
serviceProvider)
.ConfigureAwait(false);
}
catch (Exception)
{
// Subtract the produce operations that will never be initiated
Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - i - 1));
throw;
}
lastProduced = i;
await ProcessMessageAsync(
outboxMessages[i],
failedMessages,
outboxReader,
serviceProvider)
.ConfigureAwait(false);

if (stoppingToken.IsCancellationRequested)
{
// Subtract the produce operations that will never be initiated
Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - i - 1));
break;
}
}
}
finally
{
if (lastProduced < outboxMessages.Count - 1)
{
// Subtract the produce operations that will never be initiated
Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - lastProduced - 1));
}

await WaitAllAsync().ConfigureAwait(false);
await AcknowledgeAllAsync(outboxReader, outboxMessages, failedMessages).ConfigureAwait(false);
await AcknowledgeAllAsync(outboxReader, outboxMessages.Take(lastProduced + 1), failedMessages).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,11 @@ await _outboxWriter.WriteAsync(

await _worker.ProcessQueueAsync(CancellationToken.None);

_broker.ProducedMessages.Should().HaveCount(3);
_broker.ProducedMessages.Should().HaveCount(4);
_broker.ProducedMessages[0].Endpoint.Name.Should().Be("topic1");
_broker.ProducedMessages[1].Endpoint.Name.Should().Be("topic2");
_broker.ProducedMessages[2].Endpoint.Name.Should().Be("topic1");
_broker.ProducedMessages[3].Endpoint.Name.Should().Be("topic1");
}
}
}

0 comments on commit eb5dd2a

Please sign in to comment.