Skip to content

Commit 9b28534

Browse files
committed
Fixed Queue Conflicts
1 parent 275e384 commit 9b28534

File tree

8 files changed

+72
-23
lines changed

8 files changed

+72
-23
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- liquibase formatted sql
2+
-- changeset ahmedyasser:1711445550
3+
CREATE EXTENSION IF NOT EXISTS hstore;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"mappings": {
3+
"properties": {
4+
"chatId": {
5+
"type": "keyword"
6+
},
7+
"timestamp": {
8+
"type": "keyword"
9+
}
10+
}
11+
}
12+
}

DSL/OpenSearch/init.sh

+4
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,7 @@ curl -XDELETE 'http://localhost:9200/*' -u admin:admin --insecure
44
#notifications
55
curl -H "Content-Type: application/x-ndjson" -X PUT "http://localhost:9200/notifications" -ku admin:admin --data-binary "@fieldMappings/notifications.json"
66
curl -H "Content-Type: application/x-ndjson" -X PUT "http://localhost:9200/notifications/_bulk" -ku admin:admin --data-binary "@mock/notifications.json"
7+
8+
#chatqueue
9+
curl -H "Content-Type: application/x-ndjson" -X PUT "http://localhost:9200/chatqueue" -ku admin:admin --data-binary "@fieldMappings/chatqueue.json"
10+
curl -H "Content-Type: application/x-ndjson" -X PUT "http://localhost:9200/chatqueue/_bulk" -ku admin:admin --data-binary "@mock/chatqueue.json"

DSL/OpenSearch/mock/chatqueue.json

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{"index":{"_id":"1"}}
2+
{"chatId": "c307cb64-f2c0-4465-879c-762c26666122","timestamp": "1801371325497"}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
generate_cookie:
2+
call: http.post
3+
args:
4+
url: "[#CHATBOT_TIM]/jwt/custom-jwt-generate"
5+
body:
6+
JWTName: "JWTTOKEN"
7+
expirationInMinutes: 120
8+
content: {
9+
"personalCode": "EE30303039917",
10+
"idCode": "EE30303039917",
11+
"login": "EE30303039917",
12+
"displayName": "David",
13+
"firstName": "David",
14+
"lastName": "Shawn",
15+
"csaEmail": "d.shawn@gmail.com",
16+
"csaTitle": "David User",
17+
"authorities": [
18+
"ROLE_ADMINISTRATOR"
19+
],
20+
"authMethod": "smart-id",
21+
"fullName": "David Shawn"
22+
}
23+
result: cookie_result
24+
25+
return_result:
26+
return: ${cookie_result.response.body.token}

migrate.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
#!/bin/bash
2-
docker run --rm --network bykstack -v `pwd`/DSL/Liquibase/changelog:/liquibase/changelog -v `pwd`/DSL/Liquibase/master.yml:/liquibase/master.yml -v `pwd`/DSL/Liquibase/data:/liquibase/data liquibase/liquibase --defaultsFile=/liquibase/changelog/liquibase.properties --changelog-file=master.yml --url=jdbc:postgresql://users_db:5432/byk?user=byk --password=01234 update
2+
docker run --rm --network bykstack -v `pwd`/DSL/Liquibase/changelog:/liquibase/changelog -v `pwd`/DSL/Liquibase/master.yml:/liquibase/master.yml -v `pwd`/DSL/Liquibase/data:/liquibase/data liquibase/liquibase --defaultsFile=/liquibase/changelog/liquibase.properties --changelog-file=master.yml --url=jdbc:postgresql://171.22.247.13:5433/byk?user=byk --password=2nH09n6Gly update

notification-server/src/openSearch.js

+11-12
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ async function markAsSent({ _index, _id }, connectionId) {
5151
}
5252

5353
async function enqueueChatId(chatId) {
54-
if(await findChatId(chatId)) return;
55-
56-
await client.index({
54+
if (await findChatId(chatId)) return;
55+
56+
await client.index({
5757
index: openSearchConfig.chatQueueIndex,
5858
body: {
5959
chatId,
@@ -76,13 +76,13 @@ async function dequeueChatId(chatId) {
7676
},
7777
},
7878
refresh: true,
79+
conflicts: "proceed",
7980
});
8081
}
8182

8283
async function findChatId(chatId) {
8384
const found = await isQueueIndexExists();
84-
if(!found)
85-
return null;
85+
if (!found) return null;
8686

8787
const response = await client.search({
8888
index: openSearchConfig.chatQueueIndex,
@@ -97,24 +97,23 @@ async function findChatId(chatId) {
9797
},
9898
});
9999

100-
if(response.body.hits.hits.length == 0)
101-
return null;
100+
if (response.body.hits.hits.length == 0) return null;
102101

103102
return response.body.hits.hits[0]._source;
104-
};
103+
}
105104

106105
async function isQueueIndexExists() {
107106
const res = await client.indices.exists({
108-
index: openSearchConfig.chatQueueIndex
107+
index: openSearchConfig.chatQueueIndex,
109108
});
110109

111110
return res.body;
112111
}
113112

114113
async function findChatIdOrder(chatId) {
115114
const found = await findChatId(chatId);
116-
if(!found) return 0;
117-
115+
if (!found) return 0;
116+
118117
const response = await client.search({
119118
index: openSearchConfig.chatQueueIndex,
120119
body: {
@@ -130,7 +129,7 @@ async function findChatIdOrder(chatId) {
130129
});
131130

132131
return response.body.hits.total.value + 1;
133-
};
132+
}
134133

135134
module.exports = {
136135
searchNotification,

notification-server/src/server.js

+13-10
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ const express = require("express");
22
const cors = require("cors");
33
const { buildSSEResponse } = require("./sseUtil");
44
const { serverConfig } = require("./config");
5-
const { buildNotificationSearchInterval, buildQueueCounter } = require("./addOns");
6-
const { enqueueChatId, dequeueChatId } = require('./openSearch');
5+
const {
6+
buildNotificationSearchInterval,
7+
buildQueueCounter,
8+
} = require("./addOns");
9+
const { enqueueChatId, dequeueChatId } = require("./openSearch");
710

811
const app = express();
912

@@ -21,21 +24,21 @@ app.get("/sse/notifications/:channelId", (req, res) => {
2124

2225
app.get("/sse/queue/:id", (req, res) => {
2326
const { id } = req.params;
24-
buildSSEResponse({
27+
buildSSEResponse({
2528
req,
2629
res,
2730
buildCallbackFunction: buildQueueCounter({ id }),
28-
});
31+
});
2932
});
3033

31-
app.post("/enqueue", (req, res) => {
32-
enqueueChatId(req.body.id);
33-
res.sendStatus(200);
34+
app.post("/enqueue", async (req, res) => {
35+
await enqueueChatId(req.body.id);
36+
res.status(200).json({ response: `enqueued successfully` });
3437
});
3538

36-
app.post("/dequeue", (req, res) => {
37-
dequeueChatId(req.body.id);
38-
res.sendStatus(200);
39+
app.post("/dequeue", async (req, res) => {
40+
await dequeueChatId(req.body.id);
41+
res.status(200).json({ response: `dequeued successfully` });
3942
});
4043

4144
const server = app.listen(serverConfig.port, () => {

0 commit comments

Comments
 (0)