-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathDefaultGoogleCloudPubSubPersistentConnection.cs
124 lines (106 loc) · 5.14 KB
/
DefaultGoogleCloudPubSubPersistentConnection.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace EventBus.GoogleCloudPubSub
{
public class DefaultGoogleCloudPubSubPersistentConnection : IGoogleCloudPubSubPersistentConnection
{
private readonly ILogger<DefaultGoogleCloudPubSubPersistentConnection> _logger;
private readonly string _projectId;
private readonly string _topicId;
private readonly string _subscriptionId;
private readonly string _pubsubEmulatorHost;
bool _disposed;
public DefaultGoogleCloudPubSubPersistentConnection(ILogger<DefaultGoogleCloudPubSubPersistentConnection> logger,
IDictionary<string, string> config)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_projectId = config["projectid"];
_topicId = config["topicid"];
_subscriptionId = config["subscriptionid"];
_pubsubEmulatorHost = config["pubsub.emulator.host"];
Initial().Wait();
}
public async Task Pub(Action<PublisherClient> publisheHandler)
{
TopicName topicName = new TopicName(_projectId, _topicId);
// Publish a message to the topic using PublisherClient.
PublisherClient publisher = await CreatePublisher(topicName);
publisheHandler(publisher);
}
public async Task Sub(Func<SubscriberClient, Task> subscribeHandler)
{
SubscriptionName subscriptionName = new SubscriptionName(_projectId, _subscriptionId);
// Pull messages from the subscription using SubscriberClient.
SubscriberClient subscriber = await CreateSubscriber(subscriptionName);
await subscribeHandler(subscriber);
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
}
private async Task Initial()
{
var topicName = new TopicName(_projectId, _topicId);
await CreateTopic(topicName);
var subscriptionName = new SubscriptionName(_projectId, _subscriptionId);
await CreateSubscription(subscriptionName, topicName);
}
private async Task CreateTopic(TopicName topicName)
{
var publisherServiceApiClient = string.IsNullOrEmpty(_pubsubEmulatorHost)
? await PublisherServiceApiClient.CreateAsync()
: await new PublisherServiceApiClientBuilder { Endpoint = _pubsubEmulatorHost, ChannelCredentials = ChannelCredentials.Insecure }.BuildAsync();
try
{
await publisherServiceApiClient.CreateTopicAsync(topicName);
}
catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
{
_logger.LogError(e, e.Message);
///await publisherServiceApiClient.GetTopicAsync(topicName);
}
catch (Exception ex)
{
throw new Exception(ex.Message, ex);
}
}
private async Task CreateSubscription(SubscriptionName subscriptionName, TopicName topicName)
{
var subscriberServiceApiClient = string.IsNullOrEmpty(_pubsubEmulatorHost)
? await SubscriberServiceApiClient.CreateAsync()
: await new SubscriberServiceApiClientBuilder { Endpoint = _pubsubEmulatorHost, ChannelCredentials = ChannelCredentials.Insecure }.BuildAsync();
try
{
await subscriberServiceApiClient.CreateSubscriptionAsync(subscriptionName, topicName, pushConfig: null, ackDeadlineSeconds: 60);
}
catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
{
_logger.LogError(e, e.Message);
///await subscriberServiceApiClient.GetSubscriptionAsync(subscriptionName);
}
catch (Exception ex)
{
throw new Exception(ex.Message, ex);
}
}
private async Task<PublisherClient> CreatePublisher(TopicName topicName)
{
// Publish a message to the topic using PublisherClient.
return string.IsNullOrEmpty(_pubsubEmulatorHost)
? await PublisherClient.CreateAsync(topicName)
: await PublisherClient.CreateAsync(topicName, clientCreationSettings: new PublisherClient.ClientCreationSettings(credentials: ChannelCredentials.Insecure, serviceEndpoint: _pubsubEmulatorHost));
}
private async Task<SubscriberClient> CreateSubscriber(SubscriptionName subscriptionName)
{
// Pull messages from the subscription using SubscriberClient.
return string.IsNullOrEmpty(_pubsubEmulatorHost)
? await SubscriberClient.CreateAsync(subscriptionName)
: await SubscriberClient.CreateAsync(subscriptionName, clientCreationSettings: new SubscriberClient.ClientCreationSettings(credentials: ChannelCredentials.Insecure, serviceEndpoint: _pubsubEmulatorHost));
}
}
}