Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: Improve EventExecutor #393

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
201 changes: 100 additions & 101 deletions src/OpenFeature/EventExecutor.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand All @@ -10,26 +9,23 @@

namespace OpenFeature
{
internal delegate Task ShutdownDelegate(CancellationToken cancellationToken);

internal sealed partial class EventExecutor : IAsyncDisposable
{
private readonly object _lockObj = new object();
private readonly object _lockObj = new();
public readonly Channel<object> EventChannel = Channel.CreateBounded<object>(1);
private FeatureProvider? _defaultProvider;
private readonly Dictionary<string, FeatureProvider> _namedProviderReferences = new Dictionary<string, FeatureProvider>();
private readonly List<FeatureProvider> _activeSubscriptions = new List<FeatureProvider>();
private readonly Dictionary<string, FeatureProvider> _namedProviderReferences = [];
private readonly List<FeatureProvider> _activeSubscriptions = [];

private readonly Dictionary<ProviderEventTypes, List<EventHandlerDelegate>> _apiHandlers = new Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>();
private readonly Dictionary<string, Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>> _clientHandlers = new Dictionary<string, Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>>();
private readonly Dictionary<ProviderEventTypes, List<EventHandlerDelegate>> _apiHandlers = [];
private readonly Dictionary<string, Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>> _clientHandlers = [];

private ILogger _logger;

public EventExecutor()
{
this._logger = NullLogger<EventExecutor>.Instance;
var eventProcessing = new Thread(this.ProcessEventAsync);
eventProcessing.Start();
Task.Run(this.ProcessEventAsync);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have caught this earlier.

}

public ValueTask DisposeAsync() => new(this.ShutdownAsync());
Expand All @@ -42,7 +38,7 @@
{
if (!this._apiHandlers.TryGetValue(eventType, out var eventHandlers))
{
eventHandlers = new List<EventHandlerDelegate>();
eventHandlers = [];
this._apiHandlers[eventType] = eventHandlers;
}

Expand Down Expand Up @@ -70,13 +66,13 @@
// check if there is already a list of handlers for the given client and event type
if (!this._clientHandlers.TryGetValue(client, out var registry))
{
registry = new Dictionary<ProviderEventTypes, List<EventHandlerDelegate>>();
registry = [];
this._clientHandlers[client] = registry;
}

if (!this._clientHandlers[client].TryGetValue(eventType, out var eventHandlers))
{
eventHandlers = new List<EventHandlerDelegate>();
eventHandlers = [];
this._clientHandlers[client][eventType] = eventHandlers;
}

Expand Down Expand Up @@ -127,16 +123,15 @@
}
lock (this._lockObj)
{
var newProvider = provider;
FeatureProvider? oldProvider = null;
if (this._namedProviderReferences.TryGetValue(client, out var foundOldProvider))
{
oldProvider = foundOldProvider;
}

this._namedProviderReferences[client] = newProvider;
this._namedProviderReferences[client] = provider;

this.StartListeningAndShutdownOld(newProvider, oldProvider);
this.StartListeningAndShutdownOld(provider, oldProvider);
}
}

Expand All @@ -146,8 +141,7 @@
if (!this.IsProviderActive(newProvider))
{
this._activeSubscriptions.Add(newProvider);
var featureProviderEventProcessing = new Thread(this.ProcessFeatureProviderEventsAsync);
featureProviderEventProcessing.Start(newProvider);
Task.Run(() => this.ProcessFeatureProviderEventsAsync(newProvider));
}

if (oldProvider != null && !this.IsProviderBound(oldProvider))
Expand Down Expand Up @@ -186,42 +180,37 @@
}
var status = provider.Status;

var message = "";
if (status == ProviderStatus.Ready && eventType == ProviderEventTypes.ProviderReady)
{
message = "Provider is ready";
}
else if (status == ProviderStatus.Error && eventType == ProviderEventTypes.ProviderError)
var message = status switch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice. 😎

{
message = "Provider is in error state";
}
else if (status == ProviderStatus.Stale && eventType == ProviderEventTypes.ProviderStale)
ProviderStatus.Ready when eventType == ProviderEventTypes.ProviderReady => "Provider is ready",
ProviderStatus.Error when eventType == ProviderEventTypes.ProviderError => "Provider is in error state",
ProviderStatus.Stale when eventType == ProviderEventTypes.ProviderStale => "Provider is in stale state",
_ => string.Empty
};

if (string.IsNullOrWhiteSpace(message))
{
message = "Provider is in stale state";
return;
}

if (message != "")
try
{
try
handler.Invoke(new ProviderEventPayload
{
handler.Invoke(new ProviderEventPayload
{
ProviderName = provider.GetMetadata()?.Name,
Type = eventType,
Message = message
});
}
catch (Exception exc)
{
this.ErrorRunningHandler(exc);
}
ProviderName = provider.GetMetadata()?.Name,
Type = eventType,
Message = message
});
}
catch (Exception exc)
{
this.ErrorRunningHandler(exc);

Check warning on line 207 in src/OpenFeature/EventExecutor.cs

View check run for this annotation

Codecov / codecov/patch

src/OpenFeature/EventExecutor.cs#L205-L207

Added lines #L205 - L207 were not covered by tests
}
}

private async void ProcessFeatureProviderEventsAsync(object? providerRef)
private async Task ProcessFeatureProviderEventsAsync(FeatureProvider provider)
{
var typedProviderRef = (FeatureProvider?)providerRef;
if (typedProviderRef?.GetEventChannel() is not { Reader: { } reader })
if (provider.GetEventChannel() is not { Reader: { } reader })
{
return;
}
Expand All @@ -234,82 +223,92 @@
switch (item)
{
case ProviderEventPayload eventPayload:
this.UpdateProviderStatus(typedProviderRef, eventPayload);
await this.EventChannel.Writer.WriteAsync(new Event { Provider = typedProviderRef, EventPayload = eventPayload }).ConfigureAwait(false);
UpdateProviderStatus(provider, eventPayload);
await this.EventChannel.Writer.WriteAsync(new Event { Provider = provider, EventPayload = eventPayload }).ConfigureAwait(false);
break;
}
}
}

// Method to process events
private async void ProcessEventAsync()
private async Task ProcessEventAsync()
{
while (await this.EventChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
{
if (!this.EventChannel.Reader.TryRead(out var item))
{

Check warning on line 239 in src/OpenFeature/EventExecutor.cs

View check run for this annotation

Codecov / codecov/patch

src/OpenFeature/EventExecutor.cs#L239

Added line #L239 was not covered by tests
continue;
}

switch (item)
if (item is not Event e)
{
case Event e:
lock (this._lockObj)
{
if (e.EventPayload?.Type != null && this._apiHandlers.TryGetValue(e.EventPayload.Type, out var eventHandlers))
{
foreach (var eventHandler in eventHandlers)
{
this.InvokeEventHandler(eventHandler, e);
}
}

// look for client handlers and call invoke method there
foreach (var keyAndValue in this._namedProviderReferences)
{
if (keyAndValue.Value == e.Provider && keyAndValue.Key != null)
{
if (this._clientHandlers.TryGetValue(keyAndValue.Key, out var clientRegistry))
{
if (e.EventPayload?.Type != null && clientRegistry.TryGetValue(e.EventPayload.Type, out var clientEventHandlers))
{
foreach (var eventHandler in clientEventHandlers)
{
this.InvokeEventHandler(eventHandler, e);
}
}
}
}
}

if (e.Provider != this._defaultProvider)
{
break;
}
// handling the default provider - invoke event handlers for clients which are not bound
// to a particular feature provider
foreach (var keyAndValues in this._clientHandlers)
{
if (this._namedProviderReferences.TryGetValue(keyAndValues.Key, out _))
{
// if there is an association for the client to a specific feature provider, then continue
continue;
}
if (e.EventPayload?.Type != null && keyAndValues.Value.TryGetValue(e.EventPayload.Type, out var clientEventHandlers))
{
foreach (var eventHandler in clientEventHandlers)
{
this.InvokeEventHandler(eventHandler, e);
}
}
}
}
break;
continue;

Check warning on line 245 in src/OpenFeature/EventExecutor.cs

View check run for this annotation

Codecov / codecov/patch

src/OpenFeature/EventExecutor.cs#L245

Added line #L245 was not covered by tests
}

lock (this._lockObj)
{
this.ProcessApiHandlers(e);
this.ProcessClientHandlers(e);
this.ProcessDefaultProviderHandlers(e);
}
}
}

private void ProcessApiHandlers(Event e)
{
if (e.EventPayload?.Type != null && this._apiHandlers.TryGetValue(e.EventPayload.Type, out var eventHandlers))
{
foreach (var eventHandler in eventHandlers)
{
this.InvokeEventHandler(eventHandler, e);
}
}
}

private void ProcessClientHandlers(Event e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You removed a comment, but I guess the method name makes it clear what's happening now, so I'm fine with it.

{
foreach (var keyAndValue in this._namedProviderReferences)
{
if (keyAndValue.Value == e.Provider
&& this._clientHandlers.TryGetValue(keyAndValue.Key, out var clientRegistry)
&& e.EventPayload?.Type != null
&& clientRegistry.TryGetValue(e.EventPayload.Type, out var clientEventHandlers))
{
foreach (var eventHandler in clientEventHandlers)
{
this.InvokeEventHandler(eventHandler, e);
}
}
}
}

private void ProcessDefaultProviderHandlers(Event e)
{
if (e.Provider != this._defaultProvider)
{
return;
}

foreach (var keyAndValues in this._clientHandlers)
{
if (this._namedProviderReferences.ContainsKey(keyAndValues.Key))
{
continue;
}

if (e.EventPayload?.Type != null && keyAndValues.Value.TryGetValue(e.EventPayload.Type, out var clientEventHandlers))
{
foreach (var eventHandler in clientEventHandlers)
{
this.InvokeEventHandler(eventHandler, e);
}
}
}
}


// map events to provider status as per spec: https://openfeature.dev/specification/sections/events/#requirement-535
private void UpdateProviderStatus(FeatureProvider provider, ProviderEventPayload eventPayload)
private static void UpdateProviderStatus(FeatureProvider provider, ProviderEventPayload eventPayload)
{
switch (eventPayload.Type)
{
Expand Down
4 changes: 2 additions & 2 deletions src/OpenFeature/FeatureProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public abstract class FeatureProvider
/// <summary>
/// The event channel of the provider.
/// </summary>
protected readonly Channel<object> EventChannel = Channel.CreateBounded<object>(1);
protected readonly Channel<object>? EventChannel = Channel.CreateBounded<object>(1);

/// <summary>
/// Metadata describing the provider.
Expand Down Expand Up @@ -140,7 +140,7 @@ public virtual Task ShutdownAsync(CancellationToken cancellationToken = default)
/// Returns the event channel of the provider.
/// </summary>
/// <returns>The event channel of the provider</returns>
public virtual Channel<object> GetEventChannel() => this.EventChannel;
public virtual Channel<object>? GetEventChannel() => this.EventChannel;

/// <summary>
/// Track a user action or application state, usually representing a business objective or outcome. The implementation of this method is optional.
Expand Down
6 changes: 5 additions & 1 deletion src/OpenFeature/Providers/Memory/InMemoryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ public async Task UpdateFlagsAsync(IDictionary<string, Flag>? flags = null)
FlagsChanged = changed, // emit all
Message = "flags changed",
};
await this.EventChannel.Writer.WriteAsync(@event).ConfigureAwait(false);

if (this.EventChannel != null)
{
await this.EventChannel.Writer.WriteAsync(@event).ConfigureAwait(false);
}
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ await provider.UpdateFlagsAsync(new Dictionary<string, Flag>(){
)
}});

var res = await provider.GetEventChannel().Reader.ReadAsync() as ProviderEventPayload;
var res = await provider.GetEventChannel()!.Reader.ReadAsync() as ProviderEventPayload;
Assert.Equal(ProviderEventTypes.ProviderConfigurationChanged, res?.Type);

await Assert.ThrowsAsync<FlagNotFoundException>(() => provider.ResolveBooleanValueAsync("old-flag", false, EvaluationContext.Empty));
Expand Down
7 changes: 1 addition & 6 deletions test/OpenFeature.Tests/TestImplementations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,7 @@ public override void Track(string trackingEventName, EvaluationContext? evaluati

internal ValueTask SendEventAsync(ProviderEventTypes eventType, CancellationToken cancellationToken = default)
{
return this.EventChannel.Writer.WriteAsync(new ProviderEventPayload { Type = eventType, ProviderName = this.GetMetadata().Name, }, cancellationToken);
}

internal ValueTask SendEventAsync(ProviderEventPayload payload, CancellationToken cancellationToken = default)
{
return this.EventChannel.Writer.WriteAsync(payload, cancellationToken);
return this.EventChannel!.Writer.WriteAsync(new ProviderEventPayload { Type = eventType, ProviderName = this.GetMetadata().Name, }, cancellationToken);
}
}
}