From 584e448c23e2ad15341ecdb534c1c1efc806c5c5 Mon Sep 17 00:00:00 2001 From: BEagle1984 Date: Mon, 23 Aug 2021 16:18:16 +0200 Subject: [PATCH] fix: ignore null or empty kafka key in producer --- Directory.Build.props | 2 +- docs/releases.md | 3 +- .../Messaging/Messages/KafkaKeyHelper.cs | 5 +- .../Messaging/Messages/MessageHeader.cs | 2 +- .../Kafka/OutboundMessageEnrichmentTests.cs | 37 +++++ .../Kafka/OutboundRoutingTests.cs | 149 ++++++++++++++++-- .../Messages/TestEventWithStringKafkaKey.cs | 15 ++ 7 files changed, 196 insertions(+), 17 deletions(-) create mode 100644 tests/Silverback.Integration.Tests.E2E/TestTypes/Messages/TestEventWithStringKafkaKey.cs diff --git a/Directory.Build.props b/Directory.Build.props index 9ffe19ed3..d713fb214 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,6 +1,6 @@ - -beta.2 + -beta.3 3.3.0$(BaseVersionSuffix) 1 $(BaseVersionSuffix) diff --git a/docs/releases.md b/docs/releases.md index 1995c11c1..b53754d9d 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -4,7 +4,7 @@ uid: releases # Releases -## [3.3.0-beta.2](https://github.com/BEagle1984/silverback/releases/tag/v3.3.0-beta.2) +## [3.3.0-beta.3](https://github.com/BEagle1984/silverback/releases/tag/v3.3.0-beta.3) ### What's new @@ -15,6 +15,7 @@ uid: releases ### Fixes * Invoke the Kafka partition EOF callback for all connected consumers +* Ignore null or empty Kafka key in producer ## [3.2.0](https://github.com/BEagle1984/silverback/releases/tag/v3.2.0) diff --git a/src/Silverback.Integration.Kafka/Messaging/Messages/KafkaKeyHelper.cs b/src/Silverback.Integration.Kafka/Messaging/Messages/KafkaKeyHelper.cs index d7d907963..3f47a6a48 100644 --- a/src/Silverback.Integration.Kafka/Messaging/Messages/KafkaKeyHelper.cs +++ b/src/Silverback.Integration.Kafka/Messaging/Messages/KafkaKeyHelper.cs @@ -30,8 +30,11 @@ internal static class KafkaKeyHelper }) .ToList(); - if (!keysDictionary.Any()) + if (!keysDictionary.Any() || + keysDictionary.All(keyValue => string.IsNullOrEmpty(keyValue.Value))) + { return null; + } return keysDictionary.Count == 1 ? keysDictionary.First().Value diff --git a/src/Silverback.Integration/Messaging/Messages/MessageHeader.cs b/src/Silverback.Integration/Messaging/Messages/MessageHeader.cs index cf2664a4d..bfa65cc3e 100644 --- a/src/Silverback.Integration/Messaging/Messages/MessageHeader.cs +++ b/src/Silverback.Integration/Messaging/Messages/MessageHeader.cs @@ -43,7 +43,7 @@ public MessageHeader(string name, object? value) public MessageHeader(string name, string? value) { _name = Check.NotNull(name, nameof(name)); - Value = value ?? string.Empty; + Value = value; } [SuppressMessage("ReSharper", "UnusedMember.Local", Justification = "Used to be used to deserialize")] diff --git a/tests/Silverback.Integration.Tests.E2E/Kafka/OutboundMessageEnrichmentTests.cs b/tests/Silverback.Integration.Tests.E2E/Kafka/OutboundMessageEnrichmentTests.cs index 6ce104812..ddb568f5b 100644 --- a/tests/Silverback.Integration.Tests.E2E/Kafka/OutboundMessageEnrichmentTests.cs +++ b/tests/Silverback.Integration.Tests.E2E/Kafka/OutboundMessageEnrichmentTests.cs @@ -249,5 +249,42 @@ public async Task WithKafkaKey_MessageKeySet() messages[1].Key.Should().BeEquivalentTo(Encoding.UTF8.GetBytes("two")); messages[2].Key.Should().BeEquivalentTo(Encoding.UTF8.GetBytes("three")); } + + [Fact] + public async Task WithKafkaKey_Null_NullMessageKeySet() + { + Host.ConfigureServices( + services => services + .AddLogging() + .AddSilverback() + .UseModel() + .WithConnectionToMessageBroker( + options => options.AddMockedKafka( + mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(1))) + .AddKafkaEndpoints( + endpoints => endpoints + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) + .AddOutbound( + endpoint => endpoint + .ProduceTo(DefaultTopicName) + .WithKafkaKey(_ => null))) + .AddIntegrationSpyAndSubscriber()) + .Run(); + + var publisher = Host.ScopedServiceProvider.GetRequiredService(); + await publisher.PublishAsync(new TestEventOne { Content = "one" }); + await publisher.PublishAsync(new TestEventOne { Content = "two" }); + await publisher.PublishAsync(new TestEventOne { Content = "three" }); + + var messages = DefaultTopic.GetAllMessages(); + messages.Should().HaveCount(3); + messages[0].Key.Should().BeNull(); + messages[1].Key.Should().BeNull(); + messages[2].Key.Should().BeNull(); + } } } diff --git a/tests/Silverback.Integration.Tests.E2E/Kafka/OutboundRoutingTests.cs b/tests/Silverback.Integration.Tests.E2E/Kafka/OutboundRoutingTests.cs index 9dfd627f4..62dfa34d6 100644 --- a/tests/Silverback.Integration.Tests.E2E/Kafka/OutboundRoutingTests.cs +++ b/tests/Silverback.Integration.Tests.E2E/Kafka/OutboundRoutingTests.cs @@ -43,7 +43,11 @@ public async Task OutboundRouting_AnyPartition_MessagesRoutedToRandomPartition() mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5))) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( endpoint => endpoint.ProduceTo(DefaultTopicName)))) .Run(); @@ -74,7 +78,11 @@ public async Task OutboundRouting_SpecificPartition_MessagesRoutedToSpecifiedPar mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5))) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( endpoint => endpoint .ProduceTo(DefaultTopicName, 3)))) @@ -108,7 +116,11 @@ public async Task OutboundRouting_AnyPartitionWithPartitionKey_MessagesRoutedToP mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5))) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( endpoint => endpoint.ProduceTo(DefaultTopicName)))) .Run(); @@ -128,6 +140,77 @@ public async Task OutboundRouting_AnyPartitionWithPartitionKey_MessagesRoutedToP DefaultTopic.Partitions[4].Messages.Count.Should().Be(2); } + [Fact] + public async Task OutboundRouting_AnyPartitionWithNullPartitionKey_MessagesRoutedToRandomPartition() + { + Host.ConfigureServices( + services => services + .AddLogging() + .AddSilverback() + .UseModel() + .WithConnectionToMessageBroker( + options => options.AddMockedKafka( + mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5))) + .AddKafkaEndpoints( + endpoints => endpoints + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) + .AddOutbound( + endpoint => endpoint.ProduceTo(DefaultTopicName)))) + .Run(); + + var publisher = Host.ScopedServiceProvider.GetRequiredService(); + + await publisher.PublishAsync(new TestEventWithKafkaKey { KafkaKey = null, Content = "1" }); + await publisher.PublishAsync(new TestEventWithKafkaKey { KafkaKey = null, Content = "2" }); + await publisher.PublishAsync(new TestEventWithKafkaKey { KafkaKey = null, Content = "3" }); + await publisher.PublishAsync(new TestEventWithKafkaKey { KafkaKey = null, Content = "4" }); + await publisher.PublishAsync(new TestEventWithKafkaKey { KafkaKey = null, Content = "5" }); + + DefaultTopic.Partitions + .Where(partition => partition.Messages.Count > 0) + .Should().HaveCountGreaterThan(1); + } + + [Fact] + public async Task + OutboundRouting_AnyPartitionWithNullOrEmptyPartitionKey_MessagesRoutedToRandomPartition() + { + Host.ConfigureServices( + services => services + .AddLogging() + .AddSilverback() + .UseModel() + .WithConnectionToMessageBroker( + options => options.AddMockedKafka( + mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5))) + .AddKafkaEndpoints( + endpoints => endpoints + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) + .AddOutbound( + endpoint => endpoint.ProduceTo(DefaultTopicName)))) + .Run(); + + var publisher = Host.ScopedServiceProvider.GetRequiredService(); + + await publisher.PublishAsync(new TestEventWithStringKafkaKey { KafkaKey = null, Content = "1" }); + await publisher.PublishAsync(new TestEventWithStringKafkaKey { KafkaKey = string.Empty, Content = "2" }); + await publisher.PublishAsync(new TestEventWithStringKafkaKey { KafkaKey = string.Empty, Content = "3" }); + await publisher.PublishAsync(new TestEventWithStringKafkaKey { KafkaKey = string.Empty, Content = "4" }); + await publisher.PublishAsync(new TestEventWithStringKafkaKey { KafkaKey = null, Content = "5" }); + + DefaultTopic.Partitions + .Where(partition => partition.Messages.Count > 0) + .Should().HaveCountGreaterThan(1); + } + [Fact] public async Task OutboundRouting_SpecificPartitionWithPartitionKey_MessagesRoutedToSpecifiedPartition() @@ -142,7 +225,11 @@ public async Task mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5))) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( endpoint => endpoint.ProduceTo(DefaultTopicName, 3)))) .Run(); @@ -173,7 +260,11 @@ public async Task DynamicRouting_NameFunction_MessagesRouted() .WithConnectionToMessageBroker(options => options.AddMockedKafka()) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( endpoint => endpoint .ProduceTo( @@ -225,7 +316,11 @@ public async Task DynamicRouting_NameAndPartitionFunctions_MessagesRouted() .WithConnectionToMessageBroker(options => options.AddMockedKafka()) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( endpoint => endpoint .ProduceTo( @@ -296,7 +391,11 @@ public async Task DynamicRouting_NameFormat_MessagesRouted() .WithConnectionToMessageBroker(options => options.AddMockedKafka()) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( endpoint => endpoint .ProduceTo( @@ -350,7 +449,11 @@ public async Task DynamicRouting_CustomNameResolver_MessagesRouted() .WithConnectionToMessageBroker(options => options.AddMockedKafka()) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( endpoint => endpoint .UseEndpointNameResolver()))) @@ -431,7 +534,11 @@ public async Task DynamicRouting_GenericSingleEndpointRouter_MessagesRouted() .WithConnectionToMessageBroker(options => options.AddMockedKafka()) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( (message, _, endpointsDictionary) => message.Content switch { @@ -504,7 +611,11 @@ public async Task DynamicRouting_GenericSingleEndpointRouter_MessagesRoutedToPar mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5))) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( (message, _, endpointsDictionary) => message.Content switch { @@ -591,7 +702,11 @@ public async Task DynamicRouting_WithProducerPreloading_ProducersPreloaded() .WithConnectionToMessageBroker(options => options.AddMockedKafka()) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( (message, _, endpointsDictionary) => message.Content switch { @@ -662,7 +777,11 @@ public void DynamicRouting_WithoutProducerPreloading_ProducersLoadedAtFirstProdu .WithConnectionToMessageBroker(options => options.AddMockedKafka()) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( (message, _, endpointsDictionary) => message.Content switch { @@ -710,7 +829,11 @@ public async Task DynamicRouting_GenericMultipleEndpointRouter_ProducersPreloade .WithConnectionToMessageBroker(options => options.AddMockedKafka()) .AddKafkaEndpoints( endpoints => endpoints - .Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; }) + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://tests"; + }) .AddOutbound( (message, _, endpointsDictionary) => message.Content switch { diff --git a/tests/Silverback.Integration.Tests.E2E/TestTypes/Messages/TestEventWithStringKafkaKey.cs b/tests/Silverback.Integration.Tests.E2E/TestTypes/Messages/TestEventWithStringKafkaKey.cs new file mode 100644 index 000000000..f57af585a --- /dev/null +++ b/tests/Silverback.Integration.Tests.E2E/TestTypes/Messages/TestEventWithStringKafkaKey.cs @@ -0,0 +1,15 @@ +// Copyright (c) 2020 Sergio Aquilini +// This code is licensed under MIT license (see LICENSE file for details) + +using Silverback.Messaging.Messages; + +namespace Silverback.Tests.Integration.E2E.TestTypes.Messages +{ + public class TestEventWithStringKafkaKey : IIntegrationEvent + { + [KafkaKeyMember] + public string? KafkaKey { get; set; } + + public string? Content { get; set; } + } +}