-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Neptune Analytics Support & Logging #23
Closed
Closed
Changes from 29 commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
3395a29
Update getEdgesDirections queries
Cole-Greer 6f1afeb
Neptune Analytics - Logger
AndreaNassisi c102a84
Merge remote-tracking branch 'origin/updateGetEdgesDirections' into N…
AndreaNassisi 5e54131
Update getEdgesDirections queries
Cole-Greer 83307b6
cherry pick 'Neptune Analytics - Logger' from #19
AndreaNassisi 9ac1bfa
cherry pick 'Merge remote-tracking branch...' from #19
sophiadt 936614f
Test milestone
AndreaNassisi ebb2875
Test package
AndreaNassisi 615569c
change consoleOut.error to console.error
sophiadt b95c4d3
Skipped retrieving neptune cluster info for analytics.
andreachild 4254af8
added in AndreaC's fixes to todo schema, commas and incorrect orderin…
sophiadt 90cffc8
reverted endpoint config in unit tests
sophiadt 8573166
Revert "Update getEdgesDirections queries"
sophiadt b5e80c2
fixed error message typo
sophiadt bbbd7b6
Modified logger method to accept flag which dictates if given text sh…
andreachild 71791fa
Reverted a change which would have too verbose output to console.
andreachild d59383d
change if statement to check if endpoint has exactly 2 parts
sophiadt 63ee6a4
Merge branch 'sophiadt/Neptune-Analytics---Logger' into achild/Neptun…
sophiadt 6c27318
Merge pull request #2 from Bit-Quill/achild/Neptune-Analytics---Logger
sophiadt a5dbb3a
Removed default region and changed parsing of region from URL to diff…
andreachild 8623a70
removed code for dual lamba option and read-only endpoints
sophiadt 12691dd
added host and port placeholders
sophiadt d60e36b
Changed queryNeptune and queryNeptuneSdk to take optional parameters …
andreachild f1326f5
Merge branch 'main' into sophiadt/Neptune-Analytics---Logger
sophiadt f4b7335
use parameterized query for getEdgeProperties
sophiadt c162b62
use parameterized queries for the rest of the functions and added typ…
sophiadt 16c431a
merge 'Update getEdgesDirections queries' from #16
Cole-Greer 77794f0
add sanitize placeholder function and reverted queries to use labels
sophiadt dbecfb8
remove params in function call for queries that aren't parameterized
sophiadt dc987c0
Replaced file and console logging with the pino logger library for ea…
andreachild 8f63bb9
Remove duplicated sanitize function.
andreachild 88385cb
Integrate usage of neptune graph SDK for schema queries and lambda lo…
andreachild a6e89ac
Fixed pino dependencies and updated tarball with latest.
andreachild 121a8ae
Fixed packaging to include the new analytics sdk template.
andreachild ee28939
Fixed lambda environment variables to include neptune type which is r…
andreachild 70293fc
fixed typo in README.md
sophiadt 38ad71c
fixed typos
sophiadt 477d819
Refactor console output and introduce file log levels (#6)
andreachild 98f4a05
fixed typos in README.md
sophiadt 20bd7e7
fixed typos and added some more details in README.md
sophiadt cda89f1
Merge branch 'main' into sophiadt/Neptune-Analytics---Logger
andreachild 563c2da
Merge branch 'main' into sophiadt/Neptune-Analytics---Logger
andreachild 4c06de9
Merge branch 'main' into sophiadt/Neptune-Analytics---Logger
andreachild 79069b3
Merge branch 'main' into sophiadt/Neptune-Analytics---Logger
andreachild d737463
Cleanup of changes that shouldn't have been included when merging fro…
andreachild File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,8 +10,4 @@ type Todo { | |
|
||
type Comment { | ||
content: String | ||
} | ||
|
||
input Options { | ||
limit: Int | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,15 +14,17 @@ import axios from "axios"; | |
import { aws4Interceptor } from "aws4-axios"; | ||
import { fromNodeProviderChain } from "@aws-sdk/credential-providers"; | ||
import { NeptunedataClient, ExecuteOpenCypherQueryCommand } from "@aws-sdk/client-neptunedata"; | ||
import { loggerLog } from "./logger.js"; | ||
|
||
let HOST = ''; | ||
let PORT = 8182; | ||
let REGION = '' | ||
let SAMPLE = 5000; | ||
let VERBOSE = false; | ||
let VERBOSE = false; | ||
let NEPTUNE_TYPE = 'neptune-db'; | ||
let language = 'openCypher'; | ||
let useSDK = false; | ||
|
||
let msg = ''; | ||
|
||
async function getAWSCredentials() { | ||
const credentialProvider = fromNodeProviderChain(); | ||
|
@@ -31,7 +33,7 @@ async function getAWSCredentials() { | |
const interceptor = aws4Interceptor({ | ||
options: { | ||
region: REGION, | ||
service: "neptune-db", | ||
service: NEPTUNE_TYPE, | ||
}, | ||
credentials: cred | ||
}); | ||
|
@@ -52,46 +54,68 @@ function yellow(text) { | |
|
||
|
||
function consoleOut(text) { | ||
if (VERBOSE) { | ||
console.log(text); | ||
} | ||
loggerLog(text, VERBOSE); | ||
} | ||
|
||
function sanitize(text) { | ||
// TODO implement sanitization logic | ||
return text; | ||
} | ||
|
||
async function queryNeptune(q) { | ||
/** | ||
* Executes a neptune query | ||
* @param query the query to execute | ||
* @param params optional query params | ||
* @returns {Promise<ExecuteOpenCypherQueryCommandOutput|any>} | ||
*/ | ||
async function queryNeptune(query, params) { | ||
if (useSDK) { | ||
const response = await queryNeptuneSDK(q); | ||
return response; | ||
const response = await queryNeptuneSDK(query, params); | ||
return response; | ||
} else { | ||
try { | ||
const response = await axios.post(`https://${HOST}:${PORT}/${language}`, `query=${encodeURIComponent(q)}`); | ||
return response.data; | ||
try { | ||
let data = { | ||
...{query: query}, | ||
...(params) && {parameters: params} | ||
}; | ||
const response = await axios.post(`https://${HOST}:${PORT}/${language}`, data); | ||
return response.data; | ||
} catch (error) { | ||
console.error("Http query request failed: ", error.message); | ||
consoleOut("Trying with the AWS SDK"); | ||
const response = await queryNeptuneSDK(q); | ||
useSDK = true; | ||
return response; | ||
msg = `Http query request failed: ${error.message}`; | ||
console.error(msg); | ||
loggerLog(msg + ': ' + JSON.stringify(error)); | ||
|
||
if (NEPTUNE_TYPE == 'neptune-db') { | ||
consoleOut("Trying with the AWS SDK"); | ||
const response = await queryNeptuneSDK(query, params); | ||
useSDK = true; | ||
return response; | ||
} | ||
|
||
throw new Error('AWS SDK for Neptune Analytics is not available, yet.'); | ||
} | ||
} | ||
} | ||
|
||
|
||
async function queryNeptuneSDK(q) { | ||
async function queryNeptuneSDK(query, params) { | ||
try { | ||
const config = { | ||
endpoint: `https://${HOST}:${PORT}` | ||
}; | ||
const client = new NeptunedataClient(config); | ||
const input = { | ||
openCypherQuery: q | ||
...{openCypherQuery: query}, | ||
...(params) && {parameters: params} | ||
}; | ||
const command = new ExecuteOpenCypherQueryCommand(input); | ||
const response = await client.send(command); | ||
return response; | ||
|
||
} catch (error) { | ||
console.error("SDK query request failed: ", error.message); | ||
msg = `SDK query request failed: ${error.message}`; | ||
console.error(msg); | ||
loggerLog(msg + ': ' + JSON.stringify(error)); | ||
process.exit(1); | ||
} | ||
} | ||
|
@@ -100,6 +124,7 @@ async function queryNeptuneSDK(q) { | |
async function getNodesNames() { | ||
let query = `MATCH (a) RETURN labels(a), count(a)`; | ||
let response = await queryNeptune(query); | ||
loggerLog('Getting nodes names'); | ||
|
||
try { | ||
response.results.forEach(result => { | ||
|
@@ -108,7 +133,9 @@ async function getNodesNames() { | |
}); | ||
} | ||
catch (e) { | ||
consoleOut(" No nodes found"); | ||
msg = " No nodes found"; | ||
consoleOut(msg); | ||
loggerLog(msg + ': ' + JSON.stringify(e)); | ||
return; | ||
} | ||
} | ||
|
@@ -117,6 +144,7 @@ async function getNodesNames() { | |
async function getEdgesNames() { | ||
let query = `MATCH ()-[e]->() RETURN type(e), count(e)`; | ||
let response = await queryNeptune(query); | ||
loggerLog('Getting edges names'); | ||
|
||
try { | ||
response.results.forEach(result => { | ||
|
@@ -125,35 +153,31 @@ async function getEdgesNames() { | |
}); | ||
} | ||
catch (e) { | ||
consoleOut(" No edges found"); | ||
msg = " No edges found"; | ||
consoleOut(msg); | ||
loggerLog(msg + ': ' + JSON.stringify(e)); | ||
return; | ||
} | ||
|
||
} | ||
|
||
|
||
async function checkEdgeDirection(direction) { | ||
let query = `MATCH (from:${direction.from})-[r:${direction.edge.label}]->(to:${direction.to}) RETURN r as edge LIMIT 1`; | ||
async function findFromAndToLabels(edgeStructure) { | ||
let query = `MATCH (from)-[r:${sanitize(edgeStructure.label)}]->(to) RETURN DISTINCT labels(from) as fromLabel, labels(to) as toLabel`; | ||
let response = await queryNeptune(query); | ||
let result = response.results[0]; | ||
if (result !== undefined) { | ||
direction.edge.directions.push({from:direction.from, to:direction.to}); | ||
consoleOut(' Found edge: ' + yellow(direction.edge.label) + ' direction: ' + yellow(direction.from) + ' -> ' + yellow(direction.to)); | ||
for (let result of response.results) { | ||
for (let fromLabel of result.fromLabel) { | ||
for (let toLabel of result.toLabel) { | ||
edgeStructure.directions.push({from:fromLabel, to:toLabel}); | ||
consoleOut(' Found edge: ' + yellow(edgeStructure.label) + ' direction: ' + yellow(fromLabel) + ' -> ' + yellow(toLabel)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the expectation that calls to |
||
} | ||
} | ||
} | ||
} | ||
|
||
|
||
async function getEdgesDirections() { | ||
let possibleDirections = []; | ||
for (const edge of schema.edgeStructures) { | ||
for (const fromNode of schema.nodeStructures) { | ||
for (const toNode of schema.nodeStructures) { | ||
possibleDirections.push({edge:edge, from:fromNode.label, to:toNode.label}); | ||
} | ||
} | ||
} | ||
|
||
await Promise.all(possibleDirections.map(checkEdgeDirection)) | ||
await Promise.all(schema.edgeStructures.map(findFromAndToLabels)) | ||
} | ||
|
||
|
||
|
@@ -196,9 +220,10 @@ function addUpdateEdgeProperty(edgeName, name, value) { | |
|
||
|
||
async function getEdgeProperties(edge) { | ||
let query = `MATCH ()-[n:${edge.label}]->() RETURN properties(n) as properties LIMIT ${SAMPLE}`; | ||
let query = `MATCH ()-[n:${sanitize(edge.label)}]->() RETURN properties(n) as properties LIMIT $sample`; | ||
let parameters = `{"sample": ${SAMPLE}}`; | ||
try { | ||
let response = await queryNeptune(query); | ||
let response = await queryNeptune(query, parameters); | ||
let result = response.results; | ||
result.forEach(e => { | ||
Object.keys(e.properties).forEach(key => { | ||
|
@@ -207,7 +232,9 @@ async function getEdgeProperties(edge) { | |
}); | ||
} | ||
catch (e) { | ||
consoleOut(" No properties found for edge: " + edge.label); | ||
msg = " No properties found for edge: " + edge.label; | ||
consoleOut(msg); | ||
loggerLog(msg + ': ' + JSON.stringify(e)); | ||
} | ||
} | ||
|
||
|
@@ -220,9 +247,10 @@ async function getEdgesProperties() { | |
|
||
|
||
async function getNodeProperties(node) { | ||
let query = `MATCH (n:${node.label}) RETURN properties(n) as properties LIMIT ${SAMPLE}`; | ||
let query = `MATCH (n:${sanitize(node.label)}) RETURN properties(n) as properties LIMIT $sample`; | ||
let parameters = `{"sample": ${SAMPLE}}`; | ||
try { | ||
let response = await queryNeptune(query); | ||
let response = await queryNeptune(query, parameters); | ||
let result = response.results; | ||
result.forEach(e => { | ||
Object.keys(e.properties).forEach(key => { | ||
|
@@ -231,7 +259,9 @@ async function getNodeProperties(node) { | |
}); | ||
} | ||
catch (e) { | ||
consoleOut(" No properties found for node: " + node.label); | ||
msg = " No properties found for node: " + node.label; | ||
consoleOut(msg); | ||
loggerLog(msg + ': ' + JSON.stringify(e)); | ||
} | ||
} | ||
|
||
|
@@ -244,10 +274,10 @@ async function getNodesProperties() { | |
|
||
|
||
async function checkEdgeDirectionCardinality(d) { | ||
let queryFrom = `MATCH (from:${d.from})-[r:${d.edge.label}]->(to:${d.to}) WITH to, count(from) as rels WHERE rels > 1 RETURN rels LIMIT 1`; | ||
let queryFrom = `MATCH (from:${sanitize(d.from)})-[r:${sanitize(d.edge.label)}]->(to:${sanitize(d.to)}) WITH to, count(from) as rels WHERE rels > 1 RETURN rels LIMIT 1`; | ||
let responseFrom = await queryNeptune(queryFrom); | ||
let resultFrom = responseFrom.results[0]; | ||
let queryTo = `MATCH (from:${d.from})-[r:${d.edge.label}]->(to:${d.to}) WITH from, count(to) as rels WHERE rels > 1 RETURN rels LIMIT 1`; | ||
let queryTo = `MATCH (from:${sanitize(d.from)})-[r:${sanitize(d.edge.label)}]->(to:${sanitize(d.to)}) WITH from, count(to) as rels WHERE rels > 1 RETURN rels LIMIT 1`; | ||
let responseTo = await queryNeptune(queryTo); | ||
let resultTo = responseTo.results[0]; | ||
let c = ''; | ||
|
@@ -283,11 +313,12 @@ async function getEdgesDirectionsCardinality() { | |
} | ||
|
||
|
||
function setGetNeptuneSchemaParameters(host, port, region, verbose = false) { | ||
function setGetNeptuneSchemaParameters(host, port, region, verbose = false, neptuneType) { | ||
HOST = host; | ||
PORT = port; | ||
REGION = region; | ||
VERBOSE = verbose; | ||
NEPTUNE_TYPE = neptuneType; | ||
} | ||
|
||
|
||
|
@@ -307,6 +338,7 @@ async function getSchemaViaSummaryAPI() { | |
return true; | ||
|
||
} catch (error) { | ||
loggerLog(`Getting the schema via Neptune Summary API failed: ${JSON.stringify(error)}`); | ||
return false; | ||
} | ||
} | ||
|
@@ -318,8 +350,10 @@ async function getNeptuneSchema(quiet) { | |
|
||
try { | ||
await getAWSCredentials(); | ||
} catch (error) { | ||
consoleOut("There are no AWS credetials configured. \nGetting the schema from an Amazon Neptune database with IAM authentication works only with AWS credentials."); | ||
} catch (error) { | ||
msg = "There are no AWS credentials configured. \nGetting the schema from an Amazon Neptune database with IAM authentication works only with AWS credentials."; | ||
consoleOut(msg); | ||
loggerLog(msg + ': ' + JSON.stringify(error)); | ||
} | ||
|
||
if (await getSchemaViaSummaryAPI()) { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this be coming in a future PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - still need to talk with AWS about the logic.