Skip to content

Commit

Permalink
feat: improve application shutdown handling
Browse files Browse the repository at this point in the history
Don't block into ApplicationStopping to let other registered callbacks to be called immediately and await broker disconnect in ApplicationStopped.
  • Loading branch information
BEagle1984 committed Jun 26, 2021
1 parent 13edf6e commit ca2c6b0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 47 deletions.
100 changes: 54 additions & 46 deletions src/Silverback.Integration/Messaging/Broker/BrokerConnectorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Silverback.Diagnostics;
using Silverback.Messaging.Configuration;
Expand All @@ -19,22 +18,19 @@ namespace Silverback.Messaging.Broker
/// </summary>
public class BrokerConnectorService : BackgroundService
{
private readonly IServiceScopeFactory _serviceScopeFactory;

private readonly IHostApplicationLifetime _applicationLifetime;

private readonly IBrokerCollection _brokerCollection;

private readonly BrokerConnectionOptions _connectionOptions;

private readonly ISilverbackLogger<BrokerConnectorService> _logger;

private readonly CancellationToken _applicationStoppingToken;

private readonly TaskCompletionSource<bool> _brokerDisconnectedTaskCompletionSource = new();

/// <summary>
/// Initializes a new instance of the <see cref="BrokerConnectorService" /> class.
/// </summary>
/// <param name="serviceScopeFactory">
/// The <see cref="IServiceScopeFactory" />.
/// </param>
/// <param name="applicationLifetime">
/// The <see cref="IHostApplicationLifetime" />.
/// </param>
Expand All @@ -48,56 +44,51 @@ public class BrokerConnectorService : BackgroundService
/// The <see cref="ISilverbackLogger" />.
/// </param>
public BrokerConnectorService(
IServiceScopeFactory serviceScopeFactory,
IHostApplicationLifetime applicationLifetime,
IBrokerCollection brokersCollection,
BrokerConnectionOptions connectionOptions,
ISilverbackLogger<BrokerConnectorService> logger)
{
_serviceScopeFactory = Check.NotNull(serviceScopeFactory, nameof(serviceScopeFactory));
_applicationLifetime = Check.NotNull(applicationLifetime, nameof(applicationLifetime));
_brokerCollection = Check.NotNull(brokersCollection, nameof(brokersCollection));
_connectionOptions = Check.NotNull(connectionOptions, nameof(connectionOptions));
_logger = Check.NotNull(logger, nameof(logger));

Check.NotNull(applicationLifetime, nameof(applicationLifetime));
applicationLifetime.ApplicationStarted.Register(OnApplicationStarted);
applicationLifetime.ApplicationStopping.Register(OnApplicationStopping);
applicationLifetime.ApplicationStopped.Register(OnApplicationStopped);

_applicationStoppingToken = applicationLifetime.ApplicationStopping;
}

/// <inheritdoc cref="BackgroundService.ExecuteAsync" />
[SuppressMessage("", "VSTHRD101", Justification = "All exceptions are catched")]
[SuppressMessage("", "CA1031", Justification = "Catch all to avoid crashes")]
protected override Task ExecuteAsync(CancellationToken stoppingToken)
protected override Task ExecuteAsync(CancellationToken stoppingToken) =>
_connectionOptions.Mode == BrokerConnectionMode.Startup
? ConnectAsync()
: Task.CompletedTask;

private void OnApplicationStarted()
{
using var scope = _serviceScopeFactory.CreateScope();
if (_connectionOptions.Mode == BrokerConnectionMode.AfterStartup)
ConnectAsync().FireAndForget();
}

_applicationLifetime.ApplicationStopping.Register(
() => AsyncHelper.RunSynchronously(() => _brokerCollection.DisconnectAsync()));
private void OnApplicationStopping() =>
Task.Run(
async () =>
{
await _brokerCollection.DisconnectAsync().ConfigureAwait(false);
_brokerDisconnectedTaskCompletionSource.SetResult(true);
})
.FireAndForget();

switch (_connectionOptions.Mode)
{
case BrokerConnectionMode.Startup:
return ConnectAsync(stoppingToken);
case BrokerConnectionMode.AfterStartup:
_applicationLifetime.ApplicationStarted.Register(
async () =>
{
try
{
await ConnectAsync(stoppingToken).ConfigureAwait(false);
}
catch
{
// Swallow everything to avoid crashing the process
}
});
return Task.CompletedTask;
default:
return Task.CompletedTask;
}
}
private void OnApplicationStopped() =>
_brokerDisconnectedTaskCompletionSource.Task.Wait();

[SuppressMessage("", "CA1031", Justification = Justifications.ExceptionLogged)]
private async Task ConnectAsync(CancellationToken stoppingToken)
private async Task ConnectAsync()
{
while (!stoppingToken.IsCancellationRequested)
while (!_applicationStoppingToken.IsCancellationRequested)
{
try
{
Expand All @@ -110,12 +101,29 @@ private async Task ConnectAsync(CancellationToken stoppingToken)

if (!_connectionOptions.RetryOnFailure)
break;

if (_connectionOptions.Mode == BrokerConnectionMode.Startup)
Thread.Sleep(_connectionOptions.RetryInterval);
else
await Task.Delay(_connectionOptions.RetryInterval, stoppingToken).ConfigureAwait(false);
}

await DelayRetryAsync().ConfigureAwait(false);
}
}

private async Task DelayRetryAsync()
{
if (_connectionOptions.Mode == BrokerConnectionMode.Startup)
{
// We have to synchronously wait, to ensure the connection is awaited before starting
Thread.Sleep(_connectionOptions.RetryInterval);
return;
}

try
{
await Task.Delay(_connectionOptions.RetryInterval, _applicationStoppingToken)
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Ignore, the application is just shutting down
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using Confluent.Kafka;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Silverback.Messaging.Broker;
using Silverback.Messaging.Publishing;
using Silverback.Tests.Integration.E2E.TestHost;
using Silverback.Tests.Integration.E2E.TestTypes.Messages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ public async Task StartAsync_ApplicationStopping_BrokerGracefullyDisconnectedReg
BrokerConnectionMode mode)
{
var appStoppingTokenSource = new CancellationTokenSource();
var appStoppedTokenSource = new CancellationTokenSource();
var lifetimeEvents = Substitute.For<IHostApplicationLifetime>();
lifetimeEvents.ApplicationStopping.Returns(appStoppingTokenSource.Token);
lifetimeEvents.ApplicationStopped.Returns(appStoppedTokenSource.Token);

var serviceProvider = ServiceProviderHelper.GetServiceProvider(
services => services
Expand All @@ -194,6 +196,7 @@ public async Task StartAsync_ApplicationStopping_BrokerGracefullyDisconnectedReg
testBroker.IsConnected.Should().BeTrue();

appStoppingTokenSource.Cancel();
appStoppedTokenSource.Cancel();

testBroker.IsConnected.Should().BeFalse();
}
Expand Down

0 comments on commit ca2c6b0

Please sign in to comment.