Skip to content

Commit 983c338

Browse files
authored
Cap concurrent requests to get Neptune schema (#58)
Previously, for a graph with `n` nodes or edges, there would be `n` concurrent requests made to Neptune via `queryNeptune`. Depending on your instance class or VPC settings, you could hit errors such as `MemoryLimitExceededException` or connection limits/sockets abruptly closing, with increasing likelihood as the size of the graph increased. Initially observed abrupt socket closing just like #20. Now, the concurrent request Promises are resolved in batches using `mapAll`. For now, the batch size is hardcoded to 20. For very small instance classes (e.g. from the "Development and testing" template), `MemoryLimitExceededException`s are still likely. Further improvements could include: - relating the batch size to the instance class - allowing the user to specify the batch size in the process args - exponential/dynamic backoff on transient errors as indicated in https://docs.aws.amazon.com/neptune/latest/userguide/errors-engine-codes.html#errors-query
1 parent f1d1498 commit 983c338

File tree

3 files changed

+111
-10
lines changed

3 files changed

+111
-10
lines changed

src/NeptuneSchema.js

+5-10
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { aws4Interceptor } from "aws4-axios";
1515
import { fromNodeProviderChain } from "@aws-sdk/credential-providers";
1616
import { NeptunedataClient, ExecuteOpenCypherQueryCommand } from "@aws-sdk/client-neptunedata";
1717
import { loggerDebug, loggerError, loggerInfo, yellow } from "./logger.js";
18+
import { mapAll } from "./util-promise.js";
1819
import { ExecuteQueryCommand, NeptuneGraphClient } from "@aws-sdk/client-neptune-graph";
1920

2021
const NEPTUNE_DB = 'neptune-db';
@@ -191,7 +192,7 @@ async function findFromAndToLabels(edgeStructure) {
191192

192193

193194
async function getEdgesDirections() {
194-
await Promise.all(schema.edgeStructures.map(findFromAndToLabels))
195+
await mapAll(schema.edgeStructures, findFromAndToLabels);
195196
}
196197

197198

@@ -254,9 +255,7 @@ async function getEdgeProperties(edge) {
254255

255256
async function getEdgesProperties() {
256257
loggerInfo('Retrieving edge properties');
257-
await Promise.all(schema.edgeStructures.map(async (edge) => {
258-
await getEdgeProperties(edge);
259-
}));
258+
await mapAll(schema.edgeStructures, getEdgeProperties);
260259
}
261260

262261

@@ -281,9 +280,7 @@ async function getNodeProperties(node) {
281280

282281
async function getNodesProperties() {
283282
loggerInfo('Retrieving node properties');
284-
await Promise.all(schema.nodeStructures.map(async (node) => {
285-
await getNodeProperties(node);
286-
}));
283+
await mapAll(schema.nodeStructures, getNodeProperties);
287284
}
288285

289286

@@ -323,9 +320,7 @@ async function getEdgesDirectionsCardinality() {
323320
}
324321
}
325322

326-
await Promise.all(possibleDirections.map(async (d) => {
327-
await checkEdgeDirectionCardinality(d);
328-
}));
323+
await mapAll(possibleDirections, checkEdgeDirectionCardinality);
329324
}
330325

331326

src/test/util-promise.test.js

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { mapAll } from '../util-promise.js'
2+
3+
describe('mapAll', () => {
4+
test('should map each element, resolve each Promise, and resolve to the mapped values', async () => {
5+
await expect(mapAll([1, 2, 3, 4, 5, 6, 7, 8], double))
6+
.resolves
7+
.toEqual([2, 4, 6, 8, 10, 12, 14, 16]);
8+
});
9+
10+
test('should map each element in batches', async () => {
11+
const count = 100;
12+
const batchSize = 50;
13+
const batchCounter = makeBatchCounter(count);
14+
15+
await expect(mapAll(Array(count + 1).fill(0), (i) => batchCounter.fn(i), batchSize))
16+
.rejects
17+
.toThrow(Error);
18+
expect(batchCounter.count).toEqual(count);
19+
});
20+
21+
test('should reject with the same error as a rejected mapped Promise', async () => {
22+
await expect(mapAll([2, 4, 6, 3, 8, 10], assertEven))
23+
.rejects
24+
.toThrow(OddNumberError);
25+
});
26+
27+
test('should resolve to an empty array for empty input arrays', async () => {
28+
await expect(mapAll([], Promise.resolve)).resolves.toEqual([]);
29+
});
30+
31+
test('should resolve to an empty array for non-positive batch sizes', async () => {
32+
await expect(mapAll([1, 2, 3], Promise.resolve, -1)).resolves.toEqual([]);
33+
await expect(mapAll([1, 2, 3], Promise.resolve, 0)).resolves.toEqual([]);
34+
});
35+
36+
function double(i) {
37+
return Promise.resolve(i * 2);
38+
}
39+
40+
function makeBatchCounter(n) {
41+
return {
42+
count: 0,
43+
fn: function (i) {
44+
if (this.count >= n) {
45+
throw new Error();
46+
}
47+
this.count++;
48+
return Promise.resolve(i);
49+
}
50+
};
51+
}
52+
53+
function assertEven(i) {
54+
if (i % 2 !== 0) {
55+
throw new OddNumberError();
56+
}
57+
58+
return Promise.resolve(i);
59+
}
60+
61+
class OddNumberError extends Error {}
62+
});

src/util-promise.js

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
const BATCH_SIZE = 20;
2+
3+
/**
4+
* @callback MapCallback
5+
* @param {T} value
6+
* @param {number} index
7+
* @param {T[]} array
8+
* @returns {U}
9+
* @template T, U
10+
*/
11+
12+
/**
13+
* Calls a Promise-returning callback function on each element of an array and creates a Promise
14+
* that is resolved with an array of results when all of the returned Promises resolve, or rejected
15+
* when any Promise is rejected.
16+
*
17+
* The elements of the array are mapped and then resolved in batches of size `batchSize`, ensuring
18+
* that no more than `batchSize` Promises are pending at one time. This is useful for e.g. avoiding
19+
* too many simultaneous web requests.
20+
*
21+
* @param {T[]} array an array to map over
22+
* @param {MapCallback<T, Promise<U>>} callbackfn a function that accepts up to three arguments. The
23+
* mapAll function calls the callbackfn function one time for each element in the array
24+
* @param {number} batchSize the number of Promises to concurrently resolve
25+
* @returns {Promise<U[]>}
26+
* @template T, U
27+
*/
28+
async function mapAll(array, callbackfn, batchSize = BATCH_SIZE) {
29+
if (batchSize <= 0) {
30+
return [];
31+
}
32+
33+
const results = [];
34+
35+
for (let i = 0; i < array.length; i += batchSize) {
36+
const promises = array.slice(i, i + batchSize).map(callbackfn);
37+
const batchResults = await Promise.all(promises);
38+
results.push(...batchResults);
39+
}
40+
41+
return results;
42+
}
43+
44+
export { mapAll };

0 commit comments

Comments
 (0)