-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathexecutionSync.js
116 lines (100 loc) · 5.02 KB
/
executionSync.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
let lmdb = require("lmdb")
let consola = require("consola")
let { ensureCodeAvailability, fetchTxContent, wait } = require("./utils.js")
let execute = require("./execute.js");
module.exports = async function executionSync() {
await syncExecution()
await wait(Math.max(servedContractsIds.size * 300, 20000))
setInterval(syncExecution, 5000)
}
async function syncExecution() {
console.log("Executing contracts")
for (contract of servedContractsIds) {
if (!databases.interactions[contract]) {
databases.interactions[contract] = lmdb.open("./db/interactions/" + contract)
}
await databases.indexes.put(contract, [...databases.interactions[contract].getRange().map(({ key, value }) => ({ id: value.id, timestamp: value.timestamp }))].sort((a, b) => a.timestamp - b.timestamp).map(i => i.id))
}
for (let contractId of [...global.servedContractsIds]) {
let contractInteractions = await databases.indexes.get(contractId)
if (!contractInteractions) {
consola.error(contractId, "No interactions in index")
continue;
}
let amountOfInteractions = contractInteractions.length
let isExecuted = await databases.isExecuted.get(contractId);
if (typeof isExecuted === "number" && isExecuted == amountOfInteractions) {
continue;
}
let contractInstantiateTx = await databases.contracts.get(contractId)
// console.log(contractInstantiateTx, contractId)
if (!contractInstantiateTx) { continue }
let interactionIndex = -1
for (let interaction of contractInteractions) {
if (!databases.interactions[contractId]) {
databases.interactions[contractId] = databases.interactions[contractId] = lmdb.open("./db/interactions/" + contractId)
}
interaction = await databases.interactions[contractId].get(interaction)
interactionIndex++
let alreadyIndexed = await databases.evaluationResults.get(contractId + interactionIndex)
if (alreadyIndexed && alreadyIndexed.id == interaction.id) {
continue;
}
let state, contractSrc;
if (interactionIndex == 0) {//We're syncing up from ground, need to download init state and init code
try {
contractSrc = contractInstantiateTx.tags.find(tag => tag.name == "Contract-Src")?.value
state = contractInstantiateTx.tags.find(tag => tag.name == "Init-State") ? JSON.parse(contractInstantiateTx.tags.find(tag => tag.name == "Init-State").value) : JSON.parse(await fetchTxContent(contractId))
} catch (e) {
return null
}
} else {
state = (await databases.evaluationResults.get(contractId + (interactionIndex - 1))).state
contractSrc = state.evolve ? state.evolve : (await databases.evaluationResults.get(contractId + (interactionIndex - 1))).contractSrc
}
await ensureCodeAvailability(contractSrc)
// console.log(interaction)
let newState;
try {
newState = await execute(contractSrc, state, interaction, contractInstantiateTx)
} catch (e) {
if (e.name == "UncacheableError") {
break;
}
consola.error("[" + contractId + "] (" + interaction.id + ")", e)
// console.log(contractSrc, state, interaction, contractInstantiateTx)
newState = state
}
await databases.evaluationResults.put(contractId + interactionIndex, {
state: newState?.state || state,
contractSrc: contractSrc,
id: interaction.id,
timestamp: interaction.timestamp
})
await databases.evaluationResults.put(contractId + "latest", {
state: newState?.state || state,
contractSrc: contractSrc,
id: interaction.id,
timestamp: interaction.timestamp
})
// console.log(newState)
}
if (amountOfInteractions === 0) {
let state, contractSrc;
try {
contractSrc = contractInstantiateTx.tags.find(tag => tag.name == "Contract-Src")?.value
state = contractInstantiateTx.tags.find(tag => tag.name == "Init-State") ? JSON.parse(contractInstantiateTx.tags.find(tag => tag.name == "Init-State").value) : JSON.parse(await fetchTxContent(contractId))
} catch (e) {
return null
}
await databases.evaluationResults.put(contractId + "latest", {
state: state,
contractSrc: contractSrc,
id: "Init",
timestamp: 0
})
}
await databases.isExecuted.put(contractId, amountOfInteractions)
consola.info("Executed contract " + contractId + ", " + amountOfInteractions + " interactions")
}
}