Skip to content

Commit

Permalink
Update SQL schema
Browse files Browse the repository at this point in the history
  • Loading branch information
DenisCarriere committed Feb 26, 2025
1 parent 0cd2b33 commit c449fcf
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 25 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: CI
on: [push, pull_request]

jobs:
check:
name: Check
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v3

- name: Run cargo check
run: cargo check

test:
name: Test Suite
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v3

- name: Run cargo test
run: cargo test

lints:
name: Lints
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v3

- name: Run cargo fmt
run: cargo fmt --all -- --check

- name: Run cargo clippy
run: cargo clippy -- -D warnings
14 changes: 13 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
.PHONY: all
all:
make build
substreams info

.PHONY: build
build:
cargo build --target wasm32-unknown-unknown --release
substreams pack
substreams info

.PHONY: protogen
protogen:
Expand All @@ -16,6 +20,14 @@ run: build
gui: build
substreams gui substreams.yaml balance_change_stats -e mainnet.eth.streamingfast.io:443 --production-mode --network eth -s 21841000 -t +1

.PHONY: sql
sql: build
substreams-sink-sql run clickhouse://default:default@localhost:9000/default substreams.yaml -e eth.substreams.pinax.network:443 21529220:21529235 --final-blocks-only --undo-buffer-size 1 --on-module-hash-mistmatch=warn --batch-block-flush-interval 1 --development-mode

.PHONY: sql-setup
sql-setup: build
substreams-sink-sql setup clickhouse://default:default@localhost:9000/default substreams.yaml

.PHONY: parquet
parquet:
rm -f state.yaml && substreams-sink-files run eth.substreams.pinax.network:443 substreams.yaml map_events "./out" 21529220:21529235 --encoder parquet --file-block-count 1 --development-mode
103 changes: 89 additions & 14 deletions schema.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,93 @@
CREATE TABLE IF NOT EXISTS balance_changes (
-------------------------------------------------
-- Meta tables to store Substreams information --
-------------------------------------------------
CREATE TABLE IF NOT EXISTS cursors
(
id String,
cursor String,
block_num Int64,
block_id String
)
ENGINE = ReplacingMergeTree()
PRIMARY KEY (id)
ORDER BY (id);

-------------------------------------------------
-- Balance changes events --
-------------------------------------------------
CREATE TABLE IF NOT EXISTS balance_changes (
-- block --
block_num UInt32,
block_hash FixedString(64),
timestamp DateTime(0, 'UTC'),
date Date,

-- transaction --
-- storage --
transaction_id FixedString(64),

-- call --
call_index UInt32,

-- log --
log_index UInt32,
log_block_index UInt32,
log_ordinal UInt64,

-- storage change --
storage_key FixedString(64),
storage_ordinal UInt64,

-- balance change --
"contract" TEXT NOT NULL,
"owner" TEXT NOT NULL,
"amount" NUMERIC NOT NULL,
"old_balance" NUMERIC NOT NULL,
"new_balance" NUMERIC NOT NULL,
"transaction_id" TEXT NOT NULL,
"block_num" INT NOT NULL,
"timestamp" TEXT NOT NULL,
"change_type" INT NOT NULL,
"call_index" INT NOT NULL,
PRIMARY KEY ("block_num", "transaction_id", "call_index")
);
contract FixedString(40),
owner FixedString(40),
old_balance UInt256,
new_balance UInt256,
amount UInt256,

-- transfer --
`from` FixedString(40),
`to` FixedString(40),
value UInt256,

-- indexing --
version UInt64,

-- debug --
balance_change_type Int32
)
ENGINE = ReplacingMergeTree PRIMARY KEY (transaction_id, storage_ordinal)
ORDER BY (transaction_id, storage_ordinal);

-------------------------------------------------
-- Transfer events --
-------------------------------------------------
CREATE TABLE IF NOT EXISTS transfers (
-- block --
block_num UInt32,
block_hash FixedString(64),
timestamp DateTime(0, 'UTC'),
date Date,

-- transaction --
transaction_id FixedString(64),

-- call --
call_index UInt32,
call_address FixedString(40),

-- log --
log_index UInt32,
log_block_index UInt32,
log_ordinal UInt64,

-- transfer --
contract FixedString(40), -- log.address
`from` FixedString(40),
`to` FixedString(40),
value UInt256,

-- debug --
transfer_type Int32
)
ENGINE = ReplacingMergeTree PRIMARY KEY (transaction_id, log_index)
ORDER BY (transaction_id, log_index);
12 changes: 12 additions & 0 deletions schema.views.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- latest balances --
CREATE MATERIALIZED VIEW balances
ENGINE = ReplacingMergeTree(version)
ORDER BY (owner, contract)
POPULATE
AS
SELECT
owner,
new_balance AS balance,
contract,
version
FROM balance_changes;
4 changes: 2 additions & 2 deletions src/algorithms/fishing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub fn is_fishing_transfers<'a>(trx: &'a TransactionTrace, call: &'a Call) -> bo
count += storage_changes.count();
if count == 0 {
info!("ignoring fishing token transfer trx: {:?}", trx.hash);
return false;
return true;
}
true
false
}
14 changes: 10 additions & 4 deletions src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub fn db_out(events: Events) -> Result<DatabaseChanges, Error> {
// -- block --
.set("block_num", balance_change.block_num.to_string())
.set("block_hash", balance_change.block_hash)
.set("timestamp", balance_change.timestamp.expect("missing timestamp"))
.set("timestamp", balance_change.timestamp.expect("missing timestamp").seconds.to_string())
.set("date", balance_change.date)
// -- transaction --
.set("transaction_id", balance_change.transaction_id)
Expand All @@ -111,16 +111,19 @@ pub fn db_out(events: Events) -> Result<DatabaseChanges, Error> {
.set("log_ordinal", balance_change.log_ordinal.to_string())
// -- storage --
.set("storage_key", balance_change.storage_key)
.set("storage_ordinal", balance_change.storage_ordinal)
.set("storage_ordinal", balance_change.storage_ordinal.to_string())
// -- balance change --
.set("contract", balance_change.contract)
.set("owner", balance_change.owner)
.set("old_balance", balance_change.old_balance)
.set("new_balance", balance_change.new_balance)
.set("amount", balance_change.amount)
// -- transfer --
.set("from", balance_change.from)
.set("to", balance_change.to)
.set("value", balance_change.value)
// -- indexing --
.set("version", balance_change.version.to_string())
// -- debug --
.set("balance_change_type", balance_change.balance_change_type.to_string());
}
Expand All @@ -137,12 +140,13 @@ pub fn db_out(events: Events) -> Result<DatabaseChanges, Error> {
// -- block --
.set("block_num", transfer.block_num.to_string())
.set("block_hash", transfer.block_hash)
.set("timestamp", transfer.timestamp.expect("missing timestamp"))
.set("timestamp", transfer.timestamp.expect("missing timestamp").seconds.to_string())
.set("date", transfer.date)
// -- transaction --
.set("transaction_id", transfer.transaction_id)
// -- call --
.set("call_index", transfer.call_index.to_string())
.set("call_address", transfer.call_address.to_string())
// -- log --
.set("log_index", transfer.log_index.to_string())
.set("log_block_index", transfer.log_block_index.to_string())
Expand All @@ -151,7 +155,9 @@ pub fn db_out(events: Events) -> Result<DatabaseChanges, Error> {
.set("contract", transfer.contract)
.set("from", transfer.from)
.set("to", transfer.to)
.set("value", transfer.value);
.set("value", transfer.value)
// -- debug --
.set("transfer_type", transfer.transfer_type.to_string());
}

Ok(tables.to_database_changes())
Expand Down
4 changes: 0 additions & 4 deletions substreams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ modules:
- name: db_out
kind: map
inputs:
- source: sf.substreams.v1.Clock
- map: map_events
output:
type: proto:sf.substreams.sink.database.v1.DatabaseChanges

- name: graph_out
kind: map
inputs:
- source: sf.substreams.v1.Clock
- map: map_events
output:
type: proto:sf.substreams.sink.entity.v1.EntityChanges
Expand All @@ -67,8 +65,6 @@ modules:
output:
type: proto:erc20.types.v1.BalanceChangeStats

network: mainnet

sink:
module: db_out
type: sf.substreams.sink.sql.v1.Service
Expand Down

0 comments on commit c449fcf

Please sign in to comment.