diff --git a/index.ts b/index.ts index 35fa1f1..fe55aa0 100644 --- a/index.ts +++ b/index.ts @@ -96,11 +96,11 @@ export async function action(options: CSVRunOptions ) { // Stream Messages emitter.on("anyMessage", async (data, cursor, clock) => { const { block_number, timestamp, seconds } = parseClock(clock); + blocks += last_block_number ? block_number - last_block_number : 1; last_block_number = block_number; last_timestamp = timestamp; last_seconds = seconds; - if (dataType == OutputType.EntityChanges) { for ( const entityChange of EntityChanges.parse(data).entityChanges ) { const writer = writers.get(entityChange.entity); @@ -150,17 +150,15 @@ export async function action(options: CSVRunOptions ) { } // logging - blocks++; log(); }); function log() { const now = Math.floor(Date.now() / 1000); - if ( last_update != now) { - last_update = now; - const blocksPerSecond = Math.floor(blocks / (last_update - start)); - logUpdate(JSON.stringify({last_block_number, last_timestamp, blocks, rows, blocksPerSecond, totalBytesRead, totalBytesWritten, runningJobs})); - } + if ( last_update == now ) return; + last_update = now; + const blocksPerSecond = Math.floor(blocks / (last_update - start)); + logUpdate(JSON.stringify({last_block_number, last_timestamp, blocks, rows, blocksPerSecond, totalBytesRead, totalBytesWritten, runningJobs})); } fileCursor.onCursor(emitter, cursorFile); diff --git a/package.json b/package.json index 5610165..926620f 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "version": "0.3.2", + "version": "0.3.3", "name": "substreams-sink-csv", "description": "Substreams Sink CSV", "type": "module",