Skip to content

Commit

Permalink
feat: Implement AddInbound with message type parameter
Browse files Browse the repository at this point in the history
This allows a more compact configuration for the typed deserializer
  • Loading branch information
BEagle1984 committed Aug 29, 2021
1 parent 584e448 commit 94f83bc
Show file tree
Hide file tree
Showing 35 changed files with 579 additions and 97 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup Label="Package information">
<BaseVersionSuffix>-beta.3</BaseVersionSuffix>
<BaseVersionSuffix></BaseVersionSuffix>
<BaseVersion>3.3.0$(BaseVersionSuffix)</BaseVersion>
<DatabasePackagesRevision>1</DatabasePackagesRevision>
<DatabasePackagesVersionSuffix>$(BaseVersionSuffix)</DatabasePackagesVersionSuffix>
Expand Down
1 change: 0 additions & 1 deletion docs/concepts/broker/inbound.md
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ public class MyEndpointsConfigurator : IEndpointsConfigurator
}
```


### Publishing events

Messages can be published when a policy is applied, in order to execute custom code.
Expand Down
10 changes: 9 additions & 1 deletion docs/concepts/broker/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,22 @@ public class MyEndpointsConfigurator : IEndpointsConfigurator
.ProduceTo("inventory-events")
.SerializeAsJson(serializer => serializer
.UseFixedType<InventoryEvent>()))
// The following configurations are equivalent, the second
// one being more implicit
.AddInbound(endpoint => endpoint
.ConsumeFrom("order-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
})
.DeserializeJson(serializer => serializer
.UseFixedType<OrderEvent>())));
.UseFixedType<OrderEvent>()))
.AddInbound<OrderEvent>(endpoint => endpoint
.ConsumeFrom("order-events")
.Configure(config =>
{
config.GroupId = "my-consumer";
})));
}
```
# [Legacy](#tab/json-fixed-type-legacy)
Expand Down
3 changes: 2 additions & 1 deletion docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ uid: releases

# Releases

## [3.3.0-beta.3](https://github.com/BEagle1984/silverback/releases/tag/v3.3.0-beta.3)
## [3.3.0](https://github.com/BEagle1984/silverback/releases/tag/v3.3.0)

### What's new

* Optimize in-memory mocked Kafka (avoid spawning too many threads)
* Support multiple brokers (with overlapping topic names) in mocked Kafka and MQTT
* Add message validation for both producer and consumer (see <xref:validation>)
* Add new `AddInbound` overloads specifying message type for a more compact configuration when using the typed deserializer (see <xref:serialization>)

### Fixes

Expand Down
12 changes: 4 additions & 8 deletions samples/MQTT/Basic.ConsumerV3/EndpointsConfigurator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,15 @@ public void Configure(IEndpointsConfigurationBuilder builder)
.ProduceTo("samples/testaments")))

// Consume the samples/basic topic
.AddInbound(
// Note: It is mandatory to specify the message type, since
// MQTT 3 doesn't support message headers (aka user
// properties)
.AddInbound<SampleMessage>(
endpoint => endpoint
.ConsumeFrom("samples/basic")
.WithQualityOfServiceLevel(
MqttQualityOfServiceLevel.AtLeastOnce)

// It is mandatory to specify the message type, since
// MQTT 3 doesn't support message headers (aka user
// properties)
.DeserializeJson(
serializer => serializer
.UseFixedType<SampleMessage>())

// Silently skip the messages in case of exception
.OnError(policy => policy.Skip())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,62 @@ public IKafkaEndpointsConfigurationBuilder AddOutbound(
public IKafkaEndpointsConfigurationBuilder AddInbound(
Action<IKafkaConsumerEndpointBuilder> endpointBuilderAction,
int consumersCount = 1);

/// <summary>
/// Adds an inbound endpoint and instantiates a <see cref="KafkaConsumer" /> to consume from a Kafka topic.
/// </summary>
/// <remarks>
/// Multiple calls to this methods will cause multiple consumers to be instantiated, which means
/// multiple connections being issues and more resources being used. The <see cref="KafkaConsumerEndpoint" />
/// allows to define multiple topics to be consumed, to efficiently instantiate a single consumer for all of
/// them.
/// </remarks>
/// <typeparam name="TMessage">
/// The type of the messages that will be consumed from this endpoint. Specifying the message type will
/// usually automatically switch to the typed message serializer and deserialize this specific type,
/// regardless of the message headers.
/// </typeparam>
/// <param name="endpointBuilderAction">
/// An <see cref="Action{T}" /> that takes the <see cref="IKafkaConsumerEndpointBuilder" /> and configures
/// it.
/// </param>
/// <param name="consumersCount">
/// The number of consumers to be instantiated. The default is 1.
/// </param>
/// <returns>
/// The <see cref="IKafkaEndpointsConfigurationBuilder" /> so that additional calls can be chained.
/// </returns>
public IKafkaEndpointsConfigurationBuilder AddInbound<TMessage>(
Action<IKafkaConsumerEndpointBuilder> endpointBuilderAction,
int consumersCount = 1);

/// <summary>
/// Adds an inbound endpoint and instantiates a <see cref="KafkaConsumer" /> to consume from a Kafka topic.
/// </summary>
/// <remarks>
/// Multiple calls to this methods will cause multiple consumers to be instantiated, which means
/// multiple connections being issues and more resources being used. The <see cref="KafkaConsumerEndpoint" />
/// allows to define multiple topics to be consumed, to efficiently instantiate a single consumer for all of
/// them.
/// </remarks>
/// <param name="messageType">
/// The type of the messages that will be consumed from this endpoint. Specifying the message type will
/// usually automatically switch to the typed message serializer and deserialize this specific type,
/// regardless of the message headers.
/// </param>
/// <param name="endpointBuilderAction">
/// An <see cref="Action{T}" /> that takes the <see cref="IKafkaConsumerEndpointBuilder" /> and configures
/// it.
/// </param>
/// <param name="consumersCount">
/// The number of consumers to be instantiated. The default is 1.
/// </param>
/// <returns>
/// The <see cref="IKafkaEndpointsConfigurationBuilder" /> so that additional calls can be chained.
/// </returns>
public IKafkaEndpointsConfigurationBuilder AddInbound(
Type? messageType,
Action<IKafkaConsumerEndpointBuilder> endpointBuilderAction,
int consumersCount = 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ private Func<IReadOnlyCollection<TopicPartition>, IEnumerable<TopicPartitionOffs
/// The <see cref="KafkaClientConfig" /> to be used to initialize the
/// <see cref="KafkaConsumerConfig" />.
/// </param>
/// <param name="messageType">
/// Gets the type of the message being consumed.
/// </param>
/// <param name="endpointsConfigurationBuilder">
/// The optional reference to the <see cref="IEndpointsConfigurationBuilder" /> that instantiated the
/// builder.
/// </param>
public KafkaConsumerEndpointBuilder(
KafkaClientConfig? clientConfig = null,
Type? messageType = null,
IEndpointsConfigurationBuilder? endpointsConfigurationBuilder = null)
: base(endpointsConfigurationBuilder)
: base(messageType, endpointsConfigurationBuilder)
{
_clientConfig = clientConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,25 @@ public IKafkaEndpointsConfigurationBuilder AddOutbound(
}

public IKafkaEndpointsConfigurationBuilder AddInbound(
Action<IKafkaConsumerEndpointBuilder> endpointBuilderAction,
int consumersCount = 1) =>
AddInbound(null, endpointBuilderAction, consumersCount);

public IKafkaEndpointsConfigurationBuilder AddInbound<TMessage>(
Action<IKafkaConsumerEndpointBuilder> endpointBuilderAction,
int consumersCount = 1) =>
AddInbound(typeof(TMessage), endpointBuilderAction, consumersCount);

public IKafkaEndpointsConfigurationBuilder AddInbound(
Type? messageType,
Action<IKafkaConsumerEndpointBuilder> endpointBuilderAction,
int consumersCount = 1)
{
Check.NotNull(endpointBuilderAction, nameof(endpointBuilderAction));

var builder = new KafkaConsumerEndpointBuilder(ClientConfig, this);
var builder = new KafkaConsumerEndpointBuilder(ClientConfig, messageType, this);
builder.DeserializeJson();

endpointBuilderAction.Invoke(builder);

_endpointsConfigurationBuilder.AddInbound(builder.Build(), consumersCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,54 @@ public IMqttEndpointsConfigurationBuilder AddOutbound(
/// </returns>
public IMqttEndpointsConfigurationBuilder AddInbound(
Action<IMqttConsumerEndpointBuilder> endpointBuilderAction);

/// <summary>
/// Adds an inbound endpoint and instantiates a <see cref="MqttConsumer"/> to consume from a Mqtt topic.
/// </summary>
/// <remarks>
/// Multiple calls to this methods will cause multiple consumers to be instantiated, which means
/// multiple connections being issues and more resources being used. The <see cref="MqttConsumerEndpoint" />
/// allows to define multiple topics to be consumed, to efficiently instantiate a single consumer for all of
/// them.
/// </remarks>
/// <typeparam name="TMessage">
/// The type of the messages that will be consumed from this endpoint. Specifying the message type will
/// usually automatically switch to the typed message serializer and deserialize this specific type,
/// regardless of the message headers.
/// </typeparam>
/// <param name="endpointBuilderAction">
/// An <see cref="Action{T}" /> that takes the <see cref="IMqttConsumerEndpointBuilder" /> and configures
/// it.
/// </param>
/// <returns>
/// The <see cref="IMqttEndpointsConfigurationBuilder" /> so that additional calls can be chained.
/// </returns>
public IMqttEndpointsConfigurationBuilder AddInbound<TMessage>(
Action<IMqttConsumerEndpointBuilder> endpointBuilderAction);

/// <summary>
/// Adds an inbound endpoint and instantiates a <see cref="MqttConsumer"/> to consume from a Mqtt topic.
/// </summary>
/// <remarks>
/// Multiple calls to this methods will cause multiple consumers to be instantiated, which means
/// multiple connections being issues and more resources being used. The <see cref="MqttConsumerEndpoint" />
/// allows to define multiple topics to be consumed, to efficiently instantiate a single consumer for all of
/// them.
/// </remarks>
/// <param name="messageType">
/// The type of the messages that will be consumed from this endpoint. Specifying the message type will
/// usually automatically switch to the typed message serializer and deserialize this specific type,
/// regardless of the message headers.
/// </param>
/// <param name="endpointBuilderAction">
/// An <see cref="Action{T}" /> that takes the <see cref="IMqttConsumerEndpointBuilder" /> and configures
/// it.
/// </param>
/// <returns>
/// The <see cref="IMqttEndpointsConfigurationBuilder" /> so that additional calls can be chained.
/// </returns>
public IMqttEndpointsConfigurationBuilder AddInbound(
Type? messageType,
Action<IMqttConsumerEndpointBuilder> endpointBuilderAction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,18 @@ public class MqttConsumerEndpointBuilder
/// <param name="clientConfig">
/// The <see cref="MqttClientConfig" />.
/// </param>
/// <param name="messageType">
/// Gets the type of the message being consumed.
/// </param>
/// <param name="endpointsConfigurationBuilder">
/// The optional reference to the <see cref="IEndpointsConfigurationBuilder" /> that instantiated the
/// builder.
/// </param>
public MqttConsumerEndpointBuilder(
MqttClientConfig clientConfig,
Type? messageType = null,
IEndpointsConfigurationBuilder? endpointsConfigurationBuilder = null)
: base(endpointsConfigurationBuilder)
: base(messageType, endpointsConfigurationBuilder)
{
_clientConfig = clientConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,22 @@ public IMqttEndpointsConfigurationBuilder AddOutbound(
}

public IMqttEndpointsConfigurationBuilder AddInbound(
Action<IMqttConsumerEndpointBuilder> endpointBuilderAction) =>
AddInbound(null, endpointBuilderAction);

public IMqttEndpointsConfigurationBuilder AddInbound<TMessage>(
Action<IMqttConsumerEndpointBuilder> endpointBuilderAction) =>
AddInbound(typeof(TMessage), endpointBuilderAction);

public IMqttEndpointsConfigurationBuilder AddInbound(
Type? messageType,
Action<IMqttConsumerEndpointBuilder> endpointBuilderAction)
{
Check.NotNull(endpointBuilderAction, nameof(endpointBuilderAction));

var builder = new MqttConsumerEndpointBuilder(ClientConfig, this);
var builder = new MqttConsumerEndpointBuilder(ClientConfig, messageType, this);
builder.DeserializeJson();

endpointBuilderAction.Invoke(builder);

_endpointsConfigurationBuilder.AddInbound(builder.Build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public static TBuilder DeserializeJsonUsingNewtonsoft<TBuilder>(
Check.NotNull(endpointBuilder, nameof(endpointBuilder));

var serializerBuilder = new NewtonsoftJsonMessageSerializerBuilder();

if (endpointBuilder.MessageType != null)
serializerBuilder.UseFixedType(endpointBuilder.MessageType);

serializerBuilderAction?.Invoke(serializerBuilder);
endpointBuilder.DeserializeUsing(serializerBuilder.Build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ public interface INewtonsoftJsonMessageSerializerBuilder
/// </returns>
INewtonsoftJsonMessageSerializerBuilder UseFixedType<TMessage>();

/// <summary>
/// Specifies a fixed message type. This will prevent the message type header to be written when
/// serializing and the header will be ignored when deserializing.
/// </summary>
/// <param name="messageType">
/// The type of the message to serialize or deserialize.
/// </param>
/// <returns>
/// The <see cref="JsonMessageSerializerBuilder" /> so that additional calls can be chained.
/// </returns>
INewtonsoftJsonMessageSerializerBuilder UseFixedType(Type messageType);

/// <summary>
/// Configures the <see cref="JsonSerializerSettings" />.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,17 @@ public INewtonsoftJsonMessageSerializerBuilder UseFixedType<TMessage>()
return this;
}

/// <inheritdoc cref="INewtonsoftJsonMessageSerializerBuilder.UseFixedType(Type)"/>
public INewtonsoftJsonMessageSerializerBuilder UseFixedType(Type messageType)
{
var serializerType = typeof(NewtonsoftJsonMessageSerializer<>).MakeGenericType(messageType);
_serializer = (NewtonsoftJsonMessageSerializerBase)Activator.CreateInstance(serializerType);
return this;
}

/// <inheritdoc cref="INewtonsoftJsonMessageSerializerBuilder.Configure" />
public INewtonsoftJsonMessageSerializerBuilder Configure(Action<JsonSerializerSettings> configureAction)
public INewtonsoftJsonMessageSerializerBuilder Configure(
Action<JsonSerializerSettings> configureAction)
{
Check.NotNull(configureAction, nameof(configureAction));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public static TBuilder DeserializeJson<TBuilder>(
Check.NotNull(endpointBuilder, nameof(endpointBuilder));

var serializerBuilder = new JsonMessageSerializerBuilder();

if (endpointBuilder.MessageType != null)
serializerBuilder.UseFixedType(endpointBuilder.MessageType);

serializerBuilderAction?.Invoke(serializerBuilder);
endpointBuilder.DeserializeUsing(serializerBuilder.Build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,24 @@ public abstract class ConsumerEndpointBuilder<TEndpoint, TBuilder>
/// <summary>
/// Initializes a new instance of the <see cref="ConsumerEndpointBuilder{TEndpoint,TBuilder}" /> class.
/// </summary>
/// <param name="messageType">
/// Gets the type of the message being consumed.
/// </param>
/// <param name="endpointsConfigurationBuilder">
/// The optional reference to the <see cref="IEndpointsConfigurationBuilder" /> that instantiated the
/// builder.
/// </param>
protected ConsumerEndpointBuilder(IEndpointsConfigurationBuilder? endpointsConfigurationBuilder = null)
protected ConsumerEndpointBuilder(
Type? messageType = null,
IEndpointsConfigurationBuilder? endpointsConfigurationBuilder = null)
: base(endpointsConfigurationBuilder)
{
MessageType = messageType;
}

/// <inheritdoc cref="IConsumerEndpointBuilder{TBuilder}.MessageType" />
public Type? MessageType { get; }

/// <inheritdoc cref="IConsumerEndpointBuilder{TBuilder}.DeserializeUsing" />
public TBuilder DeserializeUsing(IMessageSerializer serializer) =>
UseSerializer(Check.NotNull(serializer, nameof(serializer)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ namespace Silverback.Messaging.Configuration
public interface IConsumerEndpointBuilder<out TBuilder> : IEndpointBuilder<TBuilder>
where TBuilder : IConsumerEndpointBuilder<TBuilder>
{
/// <summary>
/// Gets the type of the message being consumed.
/// </summary>
/// <remarks>
/// This value might be used during the configuration to automatically determine some configurations (e.g. the
/// correct deserializer to be used) without having to specify the message type once again.
/// </remarks>
Type? MessageType { get; }

/// <summary>
/// Specifies the <see cref="IMessageSerializer" /> to be used to deserialize the messages.
/// </summary>
Expand Down
Loading

0 comments on commit 94f83bc

Please sign in to comment.