From 7beec65d0ec321cbefb93b514784812b79e9bb82 Mon Sep 17 00:00:00 2001 From: Denis Carriere Date: Wed, 7 Feb 2024 09:37:25 -0500 Subject: [PATCH] add ethereum erc-20 balance chnages example --- examples/ethereum-erc20-balance-changes.mjs | 69 +++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 examples/ethereum-erc20-balance-changes.mjs diff --git a/examples/ethereum-erc20-balance-changes.mjs b/examples/ethereum-erc20-balance-changes.mjs new file mode 100644 index 0000000..968af8c --- /dev/null +++ b/examples/ethereum-erc20-balance-changes.mjs @@ -0,0 +1,69 @@ +import { createModuleHashHex, createRegistry, createRequest } from "@substreams/core"; +import { readPackage } from "@substreams/manifest"; +import { BlockEmitter, createNodeTransport } from "@substreams/node"; + +// auth API token +// https://app.streamingfast.io/ +// https://app.pinax.network/ +if (!process.env.SUBSTREAMS_API_TOKEN) { + throw new Error("SUBSTREAMS_API_TOKEN is require"); +} + +const token = process.env.SUBSTREAMS_API_TOKEN; +const baseUrl = "https://eth.substreams.pinax.network:443"; + +// User parameters +const manifest = "https://github.com/pinax-network/substreams-erc20-balance-changes/releases/download/v1.2.0/erc20-balance-changes-mainnet-v1.2.0.spkg"; +const outputModule = "map_balance_changes"; +const startBlockNum = -1000; + +// Read Substream +const substreamPackage = await readPackage(manifest); +if (!substreamPackage.modules) { + throw new Error("No modules found in substream package"); +} +const moduleHash = await createModuleHashHex(substreamPackage.modules, outputModule); +console.log({ moduleHash }); + +// Connect Transport +const headers = new Headers({ "User-Agent": "@substreams/node" }); +const registry = createRegistry(substreamPackage); +const transport = createNodeTransport(baseUrl, token, registry, headers); +const request = createRequest({ + substreamPackage, + outputModule, + startBlockNum, +}); + +// NodeJS Events +const emitter = new BlockEmitter(transport, request, registry); + +// Session Trace ID +emitter.on("session", (session) => { + console.dir(session); +}); + +// Stream Blocks +emitter.on("anyMessage", (message, cursor, clock) => { + for ( const balanceChange of message.balanceChanges ?? []) { + console.dir(balanceChange); + } + console.dir({cursor, ...clock}); +}); + +// End of Stream +emitter.on("close", (error) => { + if (error) { + console.error(error); + } + console.timeEnd("🆗 close"); +}); + +// Fatal Error +emitter.on("fatalError", (error) => { + console.error(error); +}); + +console.log("✅ start"); +console.time("🆗 close"); +emitter.start();