-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
0157dc1
commit 8657064
Showing
2 changed files
with
168 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
using System; | ||
using System.Data.Common; | ||
using System.Linq; | ||
using System.Net; | ||
using System.Net.Sockets; | ||
using System.Threading; | ||
using Newtonsoft.Json; | ||
using Npgsql; | ||
using StackExchange.Redis; | ||
|
||
namespace Worker | ||
{ | ||
public class Program | ||
{ | ||
public static int Main(string[] args) | ||
{ | ||
try | ||
{ | ||
var pgsql = OpenDbConnection("Server=db;Username=postgres;Password=postgres;"); | ||
var redisConn = OpenRedisConnection("redis"); | ||
var redis = redisConn.GetDatabase(); | ||
|
||
// Keep alive is not implemented in Npgsql yet. This workaround was recommended: | ||
// https://github.com/npgsql/npgsql/issues/1214#issuecomment-235828359 | ||
var keepAliveCommand = pgsql.CreateCommand(); | ||
keepAliveCommand.CommandText = "SELECT 1"; | ||
|
||
var definition = new { vote = "", voter_id = "" }; | ||
while (true) | ||
{ | ||
// Slow down to prevent CPU spike, only query each 100ms | ||
Thread.Sleep(100); | ||
|
||
// Reconnect redis if down | ||
if (redisConn == null || !redisConn.IsConnected) { | ||
Console.WriteLine("Reconnecting Redis"); | ||
redisConn = OpenRedisConnection("redis"); | ||
redis = redisConn.GetDatabase(); | ||
} | ||
string json = redis.ListLeftPopAsync("votes").Result; | ||
if (json != null) | ||
{ | ||
var vote = JsonConvert.DeserializeAnonymousType(json, definition); | ||
Console.WriteLine($"Processing vote for '{vote.vote}' by '{vote.voter_id}'"); | ||
// Reconnect DB if down | ||
if (!pgsql.State.Equals(System.Data.ConnectionState.Open)) | ||
{ | ||
Console.WriteLine("Reconnecting DB"); | ||
pgsql = OpenDbConnection("Server=db;Username=postgres;Password=postgres;"); | ||
} | ||
else | ||
{ // Normal +1 vote requested | ||
UpdateVote(pgsql, vote.voter_id, vote.vote); | ||
} | ||
} | ||
else | ||
{ | ||
keepAliveCommand.ExecuteNonQuery(); | ||
} | ||
} | ||
} | ||
catch (Exception ex) | ||
{ | ||
Console.Error.WriteLine(ex.ToString()); | ||
return 1; | ||
} | ||
} | ||
|
||
private static NpgsqlConnection OpenDbConnection(string connectionString) | ||
{ | ||
NpgsqlConnection connection; | ||
|
||
while (true) | ||
{ | ||
try | ||
{ | ||
connection = new NpgsqlConnection(connectionString); | ||
connection.Open(); | ||
break; | ||
} | ||
catch (SocketException) | ||
{ | ||
Console.Error.WriteLine("Waiting for db"); | ||
Thread.Sleep(1000); | ||
} | ||
catch (DbException) | ||
{ | ||
Console.Error.WriteLine("Waiting for db"); | ||
Thread.Sleep(1000); | ||
} | ||
} | ||
|
||
Console.Error.WriteLine("Connected to db"); | ||
|
||
var command = connection.CreateCommand(); | ||
command.CommandText = @"CREATE TABLE IF NOT EXISTS votes ( | ||
id VARCHAR(255) NOT NULL UNIQUE, | ||
vote VARCHAR(255) NOT NULL | ||
)"; | ||
command.ExecuteNonQuery(); | ||
|
||
return connection; | ||
} | ||
|
||
private static ConnectionMultiplexer OpenRedisConnection(string hostname) | ||
{ | ||
// Use IP address to workaround https://github.com/StackExchange/StackExchange.Redis/issues/410 | ||
var ipAddress = GetIp(hostname); | ||
Console.WriteLine($"Found redis at {ipAddress}"); | ||
|
||
while (true) | ||
{ | ||
try | ||
{ | ||
Console.Error.WriteLine("Connecting to redis"); | ||
return ConnectionMultiplexer.Connect(ipAddress); | ||
} | ||
catch (RedisConnectionException) | ||
{ | ||
Console.Error.WriteLine("Waiting for redis"); | ||
Thread.Sleep(1000); | ||
} | ||
} | ||
} | ||
|
||
private static string GetIp(string hostname) | ||
=> Dns.GetHostEntryAsync(hostname) | ||
.Result | ||
.AddressList | ||
.First(a => a.AddressFamily == AddressFamily.InterNetwork) | ||
.ToString(); | ||
|
||
private static void UpdateVote(NpgsqlConnection connection, string voterId, string vote) | ||
{ | ||
var command = connection.CreateCommand(); | ||
try | ||
{ | ||
command.CommandText = "INSERT INTO votes (id, vote) VALUES (@id, @vote)"; | ||
command.Parameters.AddWithValue("@id", voterId); | ||
command.Parameters.AddWithValue("@vote", vote); | ||
command.ExecuteNonQuery(); | ||
} | ||
catch (DbException) | ||
{ | ||
command.CommandText = "UPDATE votes SET vote = @vote WHERE id = @id"; | ||
command.ExecuteNonQuery(); | ||
} | ||
finally | ||
{ | ||
command.Dispose(); | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>net7.0</TargetFramework> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<PackageReference Include="StackExchange.Redis" Version="2.2.4" /> | ||
<PackageReference Include="Npgsql" Version="4.1.9" /> | ||
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" /> | ||
</ItemGroup> | ||
|
||
</Project> |