Skip to content

Commit

Permalink
fix: ignore null or empty kafka key in producer
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Aug 24, 2021
1 parent 8dcaef3 commit 584e448
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix>-beta.2</BaseVersionSuffix>
<BaseVersionSuffix>-beta.3</BaseVersionSuffix>
<BaseVersion>3.3.0$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
Expand Down
3 changes: 2 additions & 1 deletion docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ uid: releases

# Releases

## [3.3.0-beta.2](https://github.com/BEagle1984/silverback/releases/tag/v3.3.0-beta.2)
## [3.3.0-beta.3](https://github.com/BEagle1984/silverback/releases/tag/v3.3.0-beta.3)

### What's new

Expand All @@ -15,6 +15,7 @@ uid: releases
### Fixes

* Invoke the Kafka partition EOF callback for all connected consumers
* Ignore null or empty Kafka key in producer

## [3.2.0](https://github.com/BEagle1984/silverback/releases/tag/v3.2.0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ internal static class KafkaKeyHelper
})
.ToList();

if (!keysDictionary.Any())
if (!keysDictionary.Any() ||
keysDictionary.All(keyValue => string.IsNullOrEmpty(keyValue.Value)))
{
return null;
}

return keysDictionary.Count == 1
? keysDictionary.First().Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public MessageHeader(string name, object? value)
public MessageHeader(string name, string? value)
{
_name = Check.NotNull(name, nameof(name));
Value = value ?? string.Empty;
Value = value;
}

[SuppressMessage("ReSharper", "UnusedMember.Local", Justification = "Used to be used to deserialize")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,5 +249,42 @@ public async Task WithKafkaKey_MessageKeySet()
messages[1].Key.Should().BeEquivalentTo(Encoding.UTF8.GetBytes("two"));
messages[2].Key.Should().BeEquivalentTo(Encoding.UTF8.GetBytes("three"));
}

[Fact]
public async Task WithKafkaKey_Null_NullMessageKeySet()
{
Host.ConfigureServices(
services => services
.AddLogging()
.AddSilverback()
.UseModel()
.WithConnectionToMessageBroker(
options => options.AddMockedKafka(
mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(1)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<TestEventOne>(
endpoint => endpoint
.ProduceTo(DefaultTopicName)
.WithKafkaKey<TestEventOne>(_ => null)))
.AddIntegrationSpyAndSubscriber())
.Run();

var publisher = Host.ScopedServiceProvider.GetRequiredService<IEventPublisher>();
await publisher.PublishAsync(new TestEventOne { Content = "one" });
await publisher.PublishAsync(new TestEventOne { Content = "two" });
await publisher.PublishAsync(new TestEventOne { Content = "three" });

var messages = DefaultTopic.GetAllMessages();
messages.Should().HaveCount(3);
messages[0].Key.Should().BeNull();
messages[1].Key.Should().BeNull();
messages[2].Key.Should().BeNull();
}
}
}
149 changes: 136 additions & 13 deletions tests/Silverback.Integration.Tests.E2E/Kafka/OutboundRoutingTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ public async Task OutboundRouting_AnyPartition_MessagesRoutedToRandomPartition()
mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<IIntegrationEvent>(
endpoint => endpoint.ProduceTo(DefaultTopicName))))
.Run();
Expand Down Expand Up @@ -74,7 +78,11 @@ public async Task OutboundRouting_SpecificPartition_MessagesRoutedToSpecifiedPar
mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<IIntegrationEvent>(
endpoint => endpoint
.ProduceTo(DefaultTopicName, 3))))
Expand Down Expand Up @@ -108,7 +116,11 @@ public async Task OutboundRouting_AnyPartitionWithPartitionKey_MessagesRoutedToP
mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<IIntegrationEvent>(
endpoint => endpoint.ProduceTo(DefaultTopicName))))
.Run();
Expand All @@ -128,6 +140,77 @@ public async Task OutboundRouting_AnyPartitionWithPartitionKey_MessagesRoutedToP
DefaultTopic.Partitions[4].Messages.Count.Should().Be(2);
}

[Fact]
public async Task OutboundRouting_AnyPartitionWithNullPartitionKey_MessagesRoutedToRandomPartition()
{
Host.ConfigureServices(
services => services
.AddLogging()
.AddSilverback()
.UseModel()
.WithConnectionToMessageBroker(
options => options.AddMockedKafka(
mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<IIntegrationEvent>(
endpoint => endpoint.ProduceTo(DefaultTopicName))))
.Run();

var publisher = Host.ScopedServiceProvider.GetRequiredService<IEventPublisher>();

await publisher.PublishAsync(new TestEventWithKafkaKey { KafkaKey = null, Content = "1" });
await publisher.PublishAsync(new TestEventWithKafkaKey { KafkaKey = null, Content = "2" });
await publisher.PublishAsync(new TestEventWithKafkaKey { KafkaKey = null, Content = "3" });
await publisher.PublishAsync(new TestEventWithKafkaKey { KafkaKey = null, Content = "4" });
await publisher.PublishAsync(new TestEventWithKafkaKey { KafkaKey = null, Content = "5" });

DefaultTopic.Partitions
.Where(partition => partition.Messages.Count > 0)
.Should().HaveCountGreaterThan(1);
}

[Fact]
public async Task
OutboundRouting_AnyPartitionWithNullOrEmptyPartitionKey_MessagesRoutedToRandomPartition()
{
Host.ConfigureServices(
services => services
.AddLogging()
.AddSilverback()
.UseModel()
.WithConnectionToMessageBroker(
options => options.AddMockedKafka(
mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<IIntegrationEvent>(
endpoint => endpoint.ProduceTo(DefaultTopicName))))
.Run();

var publisher = Host.ScopedServiceProvider.GetRequiredService<IEventPublisher>();

await publisher.PublishAsync(new TestEventWithStringKafkaKey { KafkaKey = null, Content = "1" });
await publisher.PublishAsync(new TestEventWithStringKafkaKey { KafkaKey = string.Empty, Content = "2" });
await publisher.PublishAsync(new TestEventWithStringKafkaKey { KafkaKey = string.Empty, Content = "3" });
await publisher.PublishAsync(new TestEventWithStringKafkaKey { KafkaKey = string.Empty, Content = "4" });
await publisher.PublishAsync(new TestEventWithStringKafkaKey { KafkaKey = null, Content = "5" });

DefaultTopic.Partitions
.Where(partition => partition.Messages.Count > 0)
.Should().HaveCountGreaterThan(1);
}

[Fact]
public async Task
OutboundRouting_SpecificPartitionWithPartitionKey_MessagesRoutedToSpecifiedPartition()
Expand All @@ -142,7 +225,11 @@ public async Task
mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<IIntegrationEvent>(
endpoint => endpoint.ProduceTo(DefaultTopicName, 3))))
.Run();
Expand Down Expand Up @@ -173,7 +260,11 @@ public async Task DynamicRouting_NameFunction_MessagesRouted()
.WithConnectionToMessageBroker(options => options.AddMockedKafka())
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<TestEventOne>(
endpoint => endpoint
.ProduceTo<TestEventOne>(
Expand Down Expand Up @@ -225,7 +316,11 @@ public async Task DynamicRouting_NameAndPartitionFunctions_MessagesRouted()
.WithConnectionToMessageBroker(options => options.AddMockedKafka())
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<TestEventOne>(
endpoint => endpoint
.ProduceTo<TestEventOne>(
Expand Down Expand Up @@ -296,7 +391,11 @@ public async Task DynamicRouting_NameFormat_MessagesRouted()
.WithConnectionToMessageBroker(options => options.AddMockedKafka())
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<TestEventOne>(
endpoint => endpoint
.ProduceTo<TestEventOne>(
Expand Down Expand Up @@ -350,7 +449,11 @@ public async Task DynamicRouting_CustomNameResolver_MessagesRouted()
.WithConnectionToMessageBroker(options => options.AddMockedKafka())
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<TestEventOne>(
endpoint => endpoint
.UseEndpointNameResolver<TestEndpointNameResolver>())))
Expand Down Expand Up @@ -431,7 +534,11 @@ public async Task DynamicRouting_GenericSingleEndpointRouter_MessagesRouted()
.WithConnectionToMessageBroker(options => options.AddMockedKafka())
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<TestEventOne>(
(message, _, endpointsDictionary) => message.Content switch
{
Expand Down Expand Up @@ -504,7 +611,11 @@ public async Task DynamicRouting_GenericSingleEndpointRouter_MessagesRoutedToPar
mockedKafkaOptions => mockedKafkaOptions.WithDefaultPartitionsCount(5)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<TestEventOne>(
(message, _, endpointsDictionary) => message.Content switch
{
Expand Down Expand Up @@ -591,7 +702,11 @@ public async Task DynamicRouting_WithProducerPreloading_ProducersPreloaded()
.WithConnectionToMessageBroker(options => options.AddMockedKafka())
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<TestEventOne>(
(message, _, endpointsDictionary) => message.Content switch
{
Expand Down Expand Up @@ -662,7 +777,11 @@ public void DynamicRouting_WithoutProducerPreloading_ProducersLoadedAtFirstProdu
.WithConnectionToMessageBroker(options => options.AddMockedKafka())
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<TestEventOne>(
(message, _, endpointsDictionary) => message.Content switch
{
Expand Down Expand Up @@ -710,7 +829,11 @@ public async Task DynamicRouting_GenericMultipleEndpointRouter_ProducersPreloade
.WithConnectionToMessageBroker(options => options.AddMockedKafka())
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(config => { config.BootstrapServers = "PLAINTEXT://tests"; })
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://tests";
})
.AddOutbound<TestEventOne>(
(message, _, endpointsDictionary) => message.Content switch
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) 2020 Sergio Aquilini
// This code is licensed under MIT license (see LICENSE file for details)

using Silverback.Messaging.Messages;

namespace Silverback.Tests.Integration.E2E.TestTypes.Messages
{
public class TestEventWithStringKafkaKey : IIntegrationEvent
{
[KafkaKeyMember]
public string? KafkaKey { get; set; }

public string? Content { get; set; }
}
}

0 comments on commit 584e448

Please sign in to comment.