Skip to content

Commit

Permalink
feat: implement proper round-robin partitioner in mocked Kafka producer
Browse files Browse the repository at this point in the history
This should increase tests stability, removing the random part from the equation.
  • Loading branch information
BEagle1984 committed Aug 29, 2021
1 parent 98c8139 commit 4483fb2
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ namespace Silverback.Messaging.Broker.Kafka.Mocks
{
internal sealed class MockedConfluentProducer : IMockedConfluentProducer
{
private static readonly Random RandomInstance = new();

private readonly ProducerConfig _config;

private readonly IInMemoryTopicCollection _topics;

private readonly object _roundRobinLockObject = new();

private int _lastPushedPartition = -1;

public MockedConfluentProducer(ProducerConfig config, IInMemoryTopicCollection topics)
{
_config = Check.NotNull(config, nameof(config));
Expand Down Expand Up @@ -146,10 +148,10 @@ public void Dispose()
}

[SuppressMessage("", "CA5394", Justification = "Usecure randomness is fine here")]
private static int GetPartitionIndex(IInMemoryTopic topic, byte[]? messageKey)
private int GetPartitionIndex(IInMemoryTopic topic, byte[]? messageKey)
{
if (messageKey == null)
return RandomInstance.Next(0, topic.Partitions.Count);
return GetNextRoundRobinPartition(topic);

return messageKey.Last() % topic.Partitions.Count;
}
Expand All @@ -169,5 +171,16 @@ private int PushToTopic(
offset = inMemoryTopic.Push(partitionIndex, message);
return partitionIndex;
}

private int GetNextRoundRobinPartition(IInMemoryTopic topic)
{
lock (_roundRobinLockObject)
{
if (++_lastPushedPartition >= topic.Partitions.Count)
_lastPushedPartition = 0;

return _lastPushedPartition;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -592,9 +592,7 @@ public async Task Streaming_ProcessingPartitionsIndependently_PublishedStreamPer

await Helper.WaitUntilAllMessagesAreConsumedAsync();

if (receivedStreams.Count != 3)
receivedStreams.Should().HaveCount(3);

receivedStreams.Should().HaveCount(3);
receivedMessages.Should().HaveCount(15);
receivedMessages.Select(message => message.Content)
.Should().BeEquivalentTo(Enumerable.Range(1, 15).Select(i => $"{i}"));
Expand Down

0 comments on commit 4483fb2

Please sign in to comment.