From e7d1dc68a77af0703386116d562153214773277b Mon Sep 17 00:00:00 2001 From: Denis Carriere Date: Fri, 1 Mar 2024 12:08:50 -0500 Subject: [PATCH] add logging for session --- .gitignore | 1 + index.ts | 36 +++++++++++++++++++++++++++++++++--- src/parseFilename.ts | 6 ++++-- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 1738eee..3628a81 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ lerna-debug.log* *.cursor *.clock *.spkg +*.session # Diagnostic reports (https://nodejs.org/api/report.html) report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json diff --git a/index.ts b/index.ts index 9988622..86fef6e 100644 --- a/index.ts +++ b/index.ts @@ -25,7 +25,7 @@ export async function action(options: CSVRunOptions ) { // Cursor const moduleHash = await getModuleHash(options); - const { name, cursorFile, clockFile } = parseFilename(moduleHash, options); + const { name, cursorFile, clockFile, sessionFile } = parseFilename(moduleHash, options); const startCursor = fs.existsSync(cursorFile) ? fs.readFileSync(cursorFile, "utf8") : ''; // CSV writer (append) @@ -45,9 +45,31 @@ export async function action(options: CSVRunOptions ) { // Block Emitter const { emitter } = await setup({ ...options, cursor: startCursor }); - // stats + // log stats let rows = 0; let blocks = 0; + let last_block_num = 0; + let last_timestamp = ""; + let totalBytesRead = 0; + let totalBytesWritten = 0; + let traceId = ""; + let start_block = 0; + let workers = 0; + + emitter.on("session", (session) => { + fs.writeFileSync(sessionFile, JSON.stringify(session, null, 2)); + traceId = session.traceId; + start_block = Number(session.resolvedStartBlock); + workers = Number(session.maxParallelWorkers) + }); + + emitter.on("progress", (progress) => { + if ( progress.processedBytes ) { + totalBytesRead += Number(progress.processedBytes.totalBytesRead); + totalBytesWritten += Number(progress.processedBytes.totalBytesWritten); + } + log(); + }); emitter.on("clock", (clock) => { // write block to file @@ -90,9 +112,17 @@ export async function action(options: CSVRunOptions ) { }; // logging - logUpdate(`[substreams-sink-csv] block_num=${block_num} timestamp=${timestamp} blocks=${++blocks} rows=${rows}`); + blocks++; + log(); }); + function log() { + logUpdate(`[substreams-sink-csv] +trace_id=${traceId} start_block=${start_block} module_hash=${moduleHash} workers=${workers} +last_block_num=${last_block_num} last_timestamp=${last_timestamp} blocks=${blocks} rows=${rows} bytes_read=${totalBytesRead} bytes_written=${totalBytesWritten} +`); + } + fileCursor.onCursor(emitter, cursorFile); emitter.start(); } diff --git a/src/parseFilename.ts b/src/parseFilename.ts index b11591a..0d44767 100644 --- a/src/parseFilename.ts +++ b/src/parseFilename.ts @@ -11,12 +11,14 @@ export function parseFilename(moduleHash: string, options: CSVRunOptions) { if ( !fs.existsSync(dirname) ) fs.mkdirSync(dirname, { recursive: true }); const cursorFile = `${name}.cursor`; const clockFile = `${name}.clock`; - return { name, cursorFile, clockFile }; + const sessionFile = `${name}.session`; + return { name, cursorFile, clockFile, sessionFile }; } // auto-generate filename (--.csv) const network = options.substreamsEndpoint.split(":")[0]; const name = `${network}-${moduleHash}-${options.moduleName}` const cursorFile = `${name}.cursor`; const clockFile = `${name}.clock`; - return { name, cursorFile, clockFile }; + const sessionFile = `${name}.session`; + return { name, cursorFile, clockFile, sessionFile }; } \ No newline at end of file