diff --git a/README.md b/README.md index c909b6d..2af935b 100644 --- a/README.md +++ b/README.md @@ -24,12 +24,13 @@ import { BlockEmitter, createNodeTransport } from "@substreams/node"; // auth API token // https://app.streamingfast.io/ +// https://app.pinax.network/ if (!process.env.SUBSTREAMS_API_TOKEN) { throw new Error("SUBSTREAMS_API_TOKEN is require"); } const token = process.env.SUBSTREAMS_API_TOKEN; -const baseUrl = "https://mainnet.eth.streamingfast.io:443"; +const baseUrl = "https://eth.substreams.pinax.network:443"; // User parameters const manifest = "https://github.com/pinax-network/subtivity-substreams/releases/download/v0.2.3/subtivity-ethereum-v0.2.3.spkg"; @@ -54,7 +55,6 @@ const request = createRequest({ outputModule, startBlockNum, stopBlockNum, - productionMode: true, }); // NodeJS Events @@ -72,6 +72,23 @@ emitter.on("anyMessage", (message, cursor, clock) => { console.dir(clock); }); -await emitter.start(); -console.log("✅ done"); +// End of Stream +emitter.on("close", (error) => { + if (error) { + console.error(error); + } + console.timeEnd("🆗 close"); +}); + +// Fatal Error +emitter.on("fatalError", (error) => { + console.error(error); +}); + +console.log("✅ start"); +console.time("🆗 close"); +const cancel = emitter.start(); + +// Cancel after 3 seconds +setTimeout(cancel, 3000); ``` \ No newline at end of file diff --git a/example.js b/example.js index 5d00436..d1d1f06 100644 --- a/example.js +++ b/example.js @@ -52,6 +52,22 @@ emitter.on("anyMessage", (message, cursor, clock) => { console.dir(clock); }); -console.time("✅ done"); -await emitter.start(); -console.timeEnd("✅ done"); +// End of Stream +emitter.on("close", (error) => { + if (error) { + console.error(error); + } + console.timeEnd("🆗 close"); +}); + +// Fatal Error +emitter.on("fatalError", (error) => { + console.error(error); +}); + +console.log("✅ start"); +console.time("🆗 close"); +const cancel = emitter.start(); + +// Cancel after 3 seconds +setTimeout(cancel, 3000); diff --git a/package-lock.json b/package-lock.json index f96eff2..5a9afc5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@substreams/node", - "version": "0.4.4", + "version": "0.5.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@substreams/node", - "version": "0.4.4", + "version": "0.5.0", "license": "MIT", "dependencies": { "@bufbuild/protobuf": "latest", diff --git a/package.json b/package.json index 42fdfc4..56c2bbd 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@substreams/node", - "version": "0.4.4", + "version": "0.5.0", "description": "Substreams for Node.js", "license": "MIT", "repository": "substreams-js/substreams-node", diff --git a/src/BlockEmitter.ts b/src/BlockEmitter.ts index 43d3635..e3c6bd6 100644 --- a/src/BlockEmitter.ts +++ b/src/BlockEmitter.ts @@ -1,16 +1,19 @@ +// import { type Request, type Response, Stream } from "@substreams/core/proto"; import { AnyMessage, IMessageTypeRegistry, JsonObject, Message } from "@bufbuild/protobuf"; -import type { CallOptions, Transport } from "@connectrpc/connect"; -import { isEmptyMessage, streamBlocks, unpackMapOutput } from "@substreams/core"; -import type { +import { type CallOptions, type ConnectError, type Transport, createCallbackClient } from "@connectrpc/connect"; +import { isEmptyMessage, unpackMapOutput } from "@substreams/core"; +import { BlockScopedData, BlockUndoSignal, Clock, + Error as FatalError, InitialSnapshotComplete, InitialSnapshotData, ModulesProgress, Request, Response, SessionInit, + Stream, } from "@substreams/core/proto"; import { EventEmitter } from "eventemitter3"; @@ -73,6 +76,10 @@ type LocalEventTypes = { clock: [clock: Clock]; output: [message: Message, cursor: string, clock: Clock]; anyMessage: [message: JsonObject, cursor: string, clock: Clock]; + + // error + close: [error?: ConnectError]; + fatalError: [error: FatalError]; }; export class BlockEmitter extends TypedEventEmitter { @@ -80,7 +87,6 @@ export class BlockEmitter extends TypedEventEmitter { public request: Request; public registry: IMessageTypeRegistry; public options?: CallOptions; - private stopped = false; constructor(transport: Transport, request: Request, registry: IMessageTypeRegistry, options?: CallOptions) { super(); @@ -90,23 +96,15 @@ export class BlockEmitter extends TypedEventEmitter { this.options = options; } - /** - * Stop streaming blocks - */ - public stop() { - this.stopped = true; - } - /** * Start streaming blocks */ - public async start() { - this.stopped = false; + public start() { + const closeCallback = (error?: ConnectError) => { + this.emit("close", error); + }; - for await (const response of streamBlocks(this.transport, this.request, this.options)) { - if (this.stopped) { - break; - } + const messageCallback = (response: Response) => { this.emit("response", response); switch (response.message.case) { @@ -147,7 +145,13 @@ export class BlockEmitter extends TypedEventEmitter { this.emit("debugSnapshotComplete", response.message.value); break; } + case "fatalError": { + this.emit("fatalError", response.message.value); + break; + } } - } + }; + const client = createCallbackClient(Stream, this.transport); + return client.blocks(this.request, messageCallback, closeCallback, this.options); } }