Skip to content

Commit 57c5491

Browse files
committed
Merge branch 'hotfix/2.5.1'
2 parents 21a47ea + bbfb580 commit 57c5491

File tree

4 files changed

+17
-18
lines changed

4 files changed

+17
-18
lines changed

Tapeti/Connection/TapetiBasicConsumer.cs

+10-13
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace Tapeti.Connection
99
/// <summary>
1010
/// Implements the bridge between the RabbitMQ Client consumer and a Tapeti Consumer
1111
/// </summary>
12-
internal class TapetiBasicConsumer : DefaultBasicConsumer
12+
internal class TapetiBasicConsumer : AsyncDefaultBasicConsumer
1313
{
1414
private readonly IConsumer consumer;
1515
private readonly Func<ulong, ConsumeResult, Task> onRespond;
@@ -24,26 +24,23 @@ public TapetiBasicConsumer(IConsumer consumer, Func<ulong, ConsumeResult, Task>
2424

2525

2626
/// <inheritdoc />
27-
public override void HandleBasicDeliver(string consumerTag,
27+
public override async Task HandleBasicDeliver(string consumerTag,
2828
ulong deliveryTag,
2929
bool redelivered,
3030
string exchange,
3131
string routingKey,
3232
IBasicProperties properties,
3333
ReadOnlyMemory<byte> body)
3434
{
35-
Task.Run(async () =>
35+
try
3636
{
37-
try
38-
{
39-
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), body.ToArray());
40-
await onRespond(deliveryTag, response);
41-
}
42-
catch
43-
{
44-
await onRespond(deliveryTag, ConsumeResult.Error);
45-
}
46-
});
37+
var response = await consumer.Consume(exchange, routingKey, new RabbitMQMessageProperties(properties), body);
38+
await onRespond(deliveryTag, response);
39+
}
40+
catch
41+
{
42+
await onRespond(deliveryTag, ConsumeResult.Error);
43+
}
4744
}
4845
}
4946
}

Tapeti/Connection/TapetiClient.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,8 @@ private IModel GetChannel()
656656
Password = connectionParams.Password,
657657
AutomaticRecoveryEnabled = false,
658658
TopologyRecoveryEnabled = false,
659-
RequestedHeartbeat = TimeSpan.FromSeconds(30)
659+
RequestedHeartbeat = TimeSpan.FromSeconds(30),
660+
DispatchConsumersAsync = true
660661
};
661662

662663
if (connectionParams.ClientProperties != null)

Tapeti/Connection/TapetiConsumer.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ public TapetiConsumer(CancellationToken cancellationToken, ITapetiConfig config,
3939

4040

4141
/// <inheritdoc />
42-
public async Task<ConsumeResult> Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body)
42+
public async Task<ConsumeResult> Consume(string exchange, string routingKey, IMessageProperties properties, ReadOnlyMemory<byte> body)
4343
{
4444
object message = null;
4545
try
4646
{
47-
message = messageSerializer.Deserialize(body, properties);
47+
message = messageSerializer.Deserialize(body.ToArray(), properties);
4848
if (message == null)
4949
throw new ArgumentException("Message body could not be deserialized into a message object", nameof(body));
5050

Tapeti/IConsumer.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Threading.Tasks;
1+
using System;
2+
using System.Threading.Tasks;
23
using Tapeti.Config;
34

45
namespace Tapeti
@@ -16,6 +17,6 @@ public interface IConsumer
1617
/// <param name="properties">Metadata included in the message</param>
1718
/// <param name="body">The raw body of the message</param>
1819
/// <returns></returns>
19-
Task<ConsumeResult> Consume(string exchange, string routingKey, IMessageProperties properties, byte[] body);
20+
Task<ConsumeResult> Consume(string exchange, string routingKey, IMessageProperties properties, ReadOnlyMemory<byte> body);
2021
}
2122
}

0 commit comments

Comments
 (0)