From e8dcd8789e82c9a16d7b01b7976acde1a2ca80d5 Mon Sep 17 00:00:00 2001 From: Denis Carriere Date: Thu, 7 Dec 2023 21:41:01 -0500 Subject: [PATCH 1/4] feature: Refactor to use `createCallbackClient` --- README.md | 18 ++++++++++++++---- example.js | 15 ++++++++++++--- src/BlockEmitter.ts | 40 ++++++++++++++++++++++------------------ 3 files changed, 48 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index c909b6d..5c1fd7e 100644 --- a/README.md +++ b/README.md @@ -24,12 +24,13 @@ import { BlockEmitter, createNodeTransport } from "@substreams/node"; // auth API token // https://app.streamingfast.io/ +// https://app.pinax.network/ if (!process.env.SUBSTREAMS_API_TOKEN) { throw new Error("SUBSTREAMS_API_TOKEN is require"); } const token = process.env.SUBSTREAMS_API_TOKEN; -const baseUrl = "https://mainnet.eth.streamingfast.io:443"; +const baseUrl = "https://eth.substreams.pinax.network:443"; // User parameters const manifest = "https://github.com/pinax-network/subtivity-substreams/releases/download/v0.2.3/subtivity-ethereum-v0.2.3.spkg"; @@ -54,7 +55,6 @@ const request = createRequest({ outputModule, startBlockNum, stopBlockNum, - productionMode: true, }); // NodeJS Events @@ -72,6 +72,16 @@ emitter.on("anyMessage", (message, cursor, clock) => { console.dir(clock); }); -await emitter.start(); -console.log("✅ done"); +// End of Stream +emitter.on("close", (error) => { + if (error) { + console.error(error); + } + console.timeEnd("🆗 close"); +}); + +console.log("✅ start"); +console.time("🆗 close"); +const cancel = emitter.start(); +// cancel() to exit substreams session ``` \ No newline at end of file diff --git a/example.js b/example.js index 5d00436..eb39e6d 100644 --- a/example.js +++ b/example.js @@ -52,6 +52,15 @@ emitter.on("anyMessage", (message, cursor, clock) => { console.dir(clock); }); -console.time("✅ done"); -await emitter.start(); -console.timeEnd("✅ done"); +// End of Stream +emitter.on("close", (error) => { + if (error) { + console.error(error); + } + console.timeEnd("🆗 close"); +}); + +console.log("✅ start"); +console.time("🆗 close"); +const cancel = emitter.start(); +// cancel() to exit substreams session diff --git a/src/BlockEmitter.ts b/src/BlockEmitter.ts index 43d3635..e3c6bd6 100644 --- a/src/BlockEmitter.ts +++ b/src/BlockEmitter.ts @@ -1,16 +1,19 @@ +// import { type Request, type Response, Stream } from "@substreams/core/proto"; import { AnyMessage, IMessageTypeRegistry, JsonObject, Message } from "@bufbuild/protobuf"; -import type { CallOptions, Transport } from "@connectrpc/connect"; -import { isEmptyMessage, streamBlocks, unpackMapOutput } from "@substreams/core"; -import type { +import { type CallOptions, type ConnectError, type Transport, createCallbackClient } from "@connectrpc/connect"; +import { isEmptyMessage, unpackMapOutput } from "@substreams/core"; +import { BlockScopedData, BlockUndoSignal, Clock, + Error as FatalError, InitialSnapshotComplete, InitialSnapshotData, ModulesProgress, Request, Response, SessionInit, + Stream, } from "@substreams/core/proto"; import { EventEmitter } from "eventemitter3"; @@ -73,6 +76,10 @@ type LocalEventTypes = { clock: [clock: Clock]; output: [message: Message, cursor: string, clock: Clock]; anyMessage: [message: JsonObject, cursor: string, clock: Clock]; + + // error + close: [error?: ConnectError]; + fatalError: [error: FatalError]; }; export class BlockEmitter extends TypedEventEmitter { @@ -80,7 +87,6 @@ export class BlockEmitter extends TypedEventEmitter { public request: Request; public registry: IMessageTypeRegistry; public options?: CallOptions; - private stopped = false; constructor(transport: Transport, request: Request, registry: IMessageTypeRegistry, options?: CallOptions) { super(); @@ -90,23 +96,15 @@ export class BlockEmitter extends TypedEventEmitter { this.options = options; } - /** - * Stop streaming blocks - */ - public stop() { - this.stopped = true; - } - /** * Start streaming blocks */ - public async start() { - this.stopped = false; + public start() { + const closeCallback = (error?: ConnectError) => { + this.emit("close", error); + }; - for await (const response of streamBlocks(this.transport, this.request, this.options)) { - if (this.stopped) { - break; - } + const messageCallback = (response: Response) => { this.emit("response", response); switch (response.message.case) { @@ -147,7 +145,13 @@ export class BlockEmitter extends TypedEventEmitter { this.emit("debugSnapshotComplete", response.message.value); break; } + case "fatalError": { + this.emit("fatalError", response.message.value); + break; + } } - } + }; + const client = createCallbackClient(Stream, this.transport); + return client.blocks(this.request, messageCallback, closeCallback, this.options); } } From 5cc4c1791bd6d7854040fd640526528e91bf8041 Mon Sep 17 00:00:00 2001 From: Denis Carriere Date: Thu, 7 Dec 2023 21:41:35 -0500 Subject: [PATCH 2/4] bump minor release --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index f96eff2..5a9afc5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@substreams/node", - "version": "0.4.4", + "version": "0.5.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@substreams/node", - "version": "0.4.4", + "version": "0.5.0", "license": "MIT", "dependencies": { "@bufbuild/protobuf": "latest", diff --git a/package.json b/package.json index 42fdfc4..56c2bbd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@substreams/node", - "version": "0.4.4", + "version": "0.5.0", "description": "Substreams for Node.js", "license": "MIT", "repository": "substreams-js/substreams-node", From 3ee1e098cd35c825a8768e181c75e444c3b16567 Mon Sep 17 00:00:00 2001 From: Denis Carriere Date: Thu, 7 Dec 2023 21:44:27 -0500 Subject: [PATCH 3/4] add example --- README.md | 5 +++++ example.js | 9 +++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 5c1fd7e..ea8f3e4 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,11 @@ emitter.on("close", (error) => { console.timeEnd("🆗 close"); }); +// Fatal Error +emitter.on("fatalError", (error) => { + console.error(error); +}); + console.log("✅ start"); console.time("🆗 close"); const cancel = emitter.start(); diff --git a/example.js b/example.js index eb39e6d..740e5ff 100644 --- a/example.js +++ b/example.js @@ -52,12 +52,9 @@ emitter.on("anyMessage", (message, cursor, clock) => { console.dir(clock); }); -// End of Stream -emitter.on("close", (error) => { - if (error) { - console.error(error); - } - console.timeEnd("🆗 close"); +// Fatal Error +emitter.on("fatalError", (error) => { + console.error(error); }); console.log("✅ start"); From 88d40b96ce8188a22c63d1859b3ecd1ec319ae46 Mon Sep 17 00:00:00 2001 From: Denis Carriere Date: Thu, 7 Dec 2023 21:51:32 -0500 Subject: [PATCH 4/4] update example --- README.md | 4 +++- example.js | 12 +++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index ea8f3e4..2af935b 100644 --- a/README.md +++ b/README.md @@ -88,5 +88,7 @@ emitter.on("fatalError", (error) => { console.log("✅ start"); console.time("🆗 close"); const cancel = emitter.start(); -// cancel() to exit substreams session + +// Cancel after 3 seconds +setTimeout(cancel, 3000); ``` \ No newline at end of file diff --git a/example.js b/example.js index 740e5ff..d1d1f06 100644 --- a/example.js +++ b/example.js @@ -52,6 +52,14 @@ emitter.on("anyMessage", (message, cursor, clock) => { console.dir(clock); }); +// End of Stream +emitter.on("close", (error) => { + if (error) { + console.error(error); + } + console.timeEnd("🆗 close"); +}); + // Fatal Error emitter.on("fatalError", (error) => { console.error(error); @@ -60,4 +68,6 @@ emitter.on("fatalError", (error) => { console.log("✅ start"); console.time("🆗 close"); const cancel = emitter.start(); -// cancel() to exit substreams session + +// Cancel after 3 seconds +setTimeout(cancel, 3000);