From 90d5fd080632d3f4f7fa5b2a769d2d528fa18359 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 2 Dec 2024 10:25:38 -0500 Subject: [PATCH] fix: remove Scalar support --- Cargo.lock | 45 +---- Cargo.toml | 10 +- src/client_query.rs | 38 +--- src/config.rs | 2 - src/indexer_client.rs | 5 +- src/json.rs | 20 --- src/main.rs | 30 ---- src/metrics.rs | 9 - src/network/internal/snapshot.rs | 17 +- src/receipts.rs | 296 ++----------------------------- src/reports.rs | 10 +- src/vouchers.rs | 195 -------------------- 12 files changed, 38 insertions(+), 639 deletions(-) delete mode 100644 src/json.rs delete mode 100644 src/vouchers.rs diff --git a/Cargo.lock b/Cargo.lock index dbf658776..94259d10f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -2135,14 +2135,11 @@ dependencies = [ "ordered-float", "parking_lot", "pin-project 1.1.7", - "primitive-types", "prometheus", "prost", "rand", "rdkafka", - "receipts", "reqwest", - "secp256k1", "semver 1.0.23", "serde", "serde_json", @@ -2831,15 +2828,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.13.0" @@ -3789,19 +3777,6 @@ dependencies = [ "sasl2-sys", ] -[[package]] -name = "receipts" -version = "0.1.0" -source = "git+https://github.com/edgeandnode/receipts?rev=e94e0f1#e94e0f11c41632328b349394b15c3d93fbda1d4b" -dependencies = [ - "itertools 0.12.1", - "lazy_static", - "primitive-types", - "rand", - "secp256k1", - "tiny-keccak", -] - [[package]] name = "recvmsg" version = "1.0.0" @@ -4185,24 +4160,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "secp256k1" -version = "0.29.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9465315bc9d4566e1724f0fffcbcc446268cb522e60f9a27bcded6b19c108113" -dependencies = [ - "secp256k1-sys", -] - -[[package]] -name = "secp256k1-sys" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4387882333d3aa8cb20530a17c69a3752e97837832f34f6dccc760e715001d9" -dependencies = [ - "cc", -] - [[package]] name = "secret-vault-value" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 445b5c839..3657a4834 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,25 +33,27 @@ lazy_static = "1.4.0" ordered-float = "4.2.0" parking_lot = "0.12.3" pin-project = "1.1.5" -primitive-types = "0.12.2" prometheus = { version = "0.13", default-features = false } prost = "0.13.1" rand = { version = "0.8", features = ["small_rng"] } rdkafka = { version = "0.36.2", features = ["gssapi", "tracing"] } -receipts = { git = "https://github.com/edgeandnode/receipts", rev = "e94e0f1" } reqwest = { version = "0.12", default-features = false, features = [ "json", "default-tls", "gzip", ] } -secp256k1 = { version = "0.29", default-features = false } semver = { version = "1.0", features = ["serde"] } serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0.116", features = ["raw_value"] } serde_with = "3.8.1" snmalloc-rs = "0.3" tap_core = { git = "https://github.com/semiotic-ai/timeline-aggregation-protocol", rev = "f680f4c" } -thegraph-core = { version = "0.8.5", features = ["alloy-contract", "alloy-signer-local", "attestation", "serde"] } +thegraph-core = { version = "0.8.5", features = [ + "alloy-contract", + "alloy-signer-local", + "attestation", + "serde", +] } thegraph-graphql-http = { version = "0.2.1", features = [ "http-client-reqwest", ] } diff --git a/src/client_query.rs b/src/client_query.rs index f2510afe4..b08c0e3f9 100644 --- a/src/client_query.rs +++ b/src/client_query.rs @@ -37,7 +37,6 @@ use crate::{ metrics::{with_metric, METRICS}, middleware::RequestId, network::{self, DeploymentError, Indexing, IndexingId, ResolvedSubgraphInfo, SubgraphError}, - receipts::ReceiptStatus, reports, }; @@ -289,26 +288,20 @@ async fn run_indexer_queries( let largest_allocation = selection.data.largest_allocation; let url = selection.data.url.clone(); let seconds_behind = selection.seconds_behind; - let legacy_scalar = !selection.data.tap_support; let subgraph_chain = subgraph.chain.clone(); // over-pay indexers to hit target let min_fee = *(min_fee.0 * grt_per_usd * one_grt) / selections.len() as f64; let indexer_fee = selection.fee.as_f64() * budget as f64; let fee = indexer_fee.max(min_fee) as u128; - let receipt = match if legacy_scalar { - ctx.receipt_signer - .create_legacy_receipt(largest_allocation, fee) - } else { - ctx.receipt_signer.create_receipt(largest_allocation, fee) - } { + let receipt = match ctx.receipt_signer.create_receipt(largest_allocation, fee) { Ok(receipt) => receipt, Err(err) => { tracing::error!(?indexer, %deployment, error=?err, "failed to create receipt"); continue; } }; - debug_assert!(fee == receipt.grt_value()); + debug_assert!(fee == receipt.value()); let blocks_behind = blocks_behind(seconds_behind, blocks_per_minute); let indexer_client = ctx.indexer_client.clone(); @@ -328,7 +321,6 @@ async fn run_indexer_queries( let report = reports::IndexerRequest { indexer, deployment, - largest_allocation, url: url.to_string(), receipt, subgraph_chain, @@ -358,17 +350,6 @@ async fn run_indexer_queries( } } - let receipt_status = match &report.result { - Ok(_) => ReceiptStatus::Success, - Err(IndexerError::Timeout) => ReceiptStatus::Unknown, - Err(_) => ReceiptStatus::Failure, - }; - ctx.receipt_signer.record_receipt( - &report.largest_allocation, - &report.receipt, - receipt_status, - ); - indexer_requests.push(report); } @@ -399,7 +380,7 @@ async fn run_indexer_queries( let total_fees_grt: f64 = indexer_requests .iter() - .map(|i| i.receipt.grt_value() as f64 * 1e-18) + .map(|i| i.receipt.value() as f64 * 1e-18) .sum(); let total_fees_usd = USD(NotNan::new(total_fees_grt / *grt_per_usd).unwrap()); let _ = ctx.budgeter.feedback.send(total_fees_usd); @@ -445,7 +426,7 @@ async fn run_indexer_queries( result = ?indexer_request.result.as_ref().map(|_| ()), response_time_ms = indexer_request.response_time_ms, seconds_behind = indexer_request.seconds_behind, - fee = indexer_request.receipt.grt_value() as f64 * 1e-18, + fee = indexer_request.receipt.value() as f64 * 1e-18, "indexer_request" ); tracing::trace!(indexer_request = indexer_request.request); @@ -485,7 +466,6 @@ struct CandidateMetadata { #[debug(with = std::fmt::Display::fmt)] url: Url, largest_allocation: AllocationId, - tap_support: bool, } /// Given a list of indexings, build a list of candidates that are within the required block range @@ -594,7 +574,6 @@ fn build_candidates_list( deployment, url: indexing.indexer.url.clone(), largest_allocation: indexing.largest_allocation, - tap_support: indexing.indexer.tap_support, }, perf: perf.response, fee: Normalized::new(indexing.fee as f64 / budget as f64).unwrap_or(Normalized::ONE), @@ -733,11 +712,7 @@ pub async fn handle_indexer_query( let fee = *(ctx.budgeter.query_fees_target.0 * grt_per_usd * one_grt) as u128; let allocation = indexing.largest_allocation; - let receipt = match if indexing.indexer.tap_support { - ctx.receipt_signer.create_receipt(allocation, fee) - } else { - ctx.receipt_signer.create_legacy_receipt(allocation, fee) - } { + let receipt = match ctx.receipt_signer.create_receipt(allocation, fee) { Ok(receipt) => receipt, Err(err) => { return Err(Error::Internal(anyhow!("failed to create receipt: {err}"))); @@ -762,7 +737,6 @@ pub async fn handle_indexer_query( let indexer_request = reports::IndexerRequest { indexer: indexing_id.indexer, deployment: indexing_id.deployment, - largest_allocation: allocation, url: indexing.indexer.url.to_string(), receipt, subgraph_chain: subgraph.chain, @@ -805,7 +779,7 @@ pub async fn handle_indexer_query( result = ?indexer_request.result.as_ref().map(|_| ()), response_time_ms = indexer_request.response_time_ms, seconds_behind = indexer_request.seconds_behind, - fee = indexer_request.receipt.grt_value() as f64 * 1e-18, + fee = indexer_request.receipt.value() as f64 * 1e-18, "indexer_request" ); diff --git a/src/config.rs b/src/config.rs index 2234bc703..960373b39 100644 --- a/src/config.rs +++ b/src/config.rs @@ -165,8 +165,6 @@ impl From for rdkafka::config::ClientConfig { pub struct Receipts { /// TAP verifier contract chain pub chain_id: U256, - /// Secret key for legacy voucher signing (Scalar) - pub legacy_signer: Option, /// TAP signer key pub signer: B256, /// TAP verifier contract address diff --git a/src/indexer_client.rs b/src/indexer_client.rs index 257e0fa8f..e17d0a463 100644 --- a/src/indexer_client.rs +++ b/src/indexer_client.rs @@ -1,3 +1,4 @@ +use http::header::CONTENT_TYPE; use reqwest::header::AUTHORIZATION; use serde::{Deserialize, Serialize}; use thegraph_core::{ @@ -47,14 +48,14 @@ impl IndexerClient { query: &str, ) -> Result { let (auth_key, auth_value) = match auth { - IndexerAuth::Paid(receipt, _) => (receipt.header_name(), receipt.serialize()), + IndexerAuth::Paid(receipt, _) => ("Tap-Receipt", receipt.serialize()), IndexerAuth::Free(token) => (AUTHORIZATION.as_str(), format!("Bearer {token}")), }; let result = self .client .post(deployment_url) - .header("Content-Type", "application/json") + .header(CONTENT_TYPE.as_str(), "application/json") .header(auth_key, auth_value) .body(query.to_string()) .send() diff --git a/src/json.rs b/src/json.rs deleted file mode 100644 index c82bf011d..000000000 --- a/src/json.rs +++ /dev/null @@ -1,20 +0,0 @@ -use std::iter; - -use axum::Json; -use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; - -pub type JsonResponse = (HeaderMap, Json); - -pub fn json_response(headers: H, payload: serde_json::Value) -> JsonResponse -where - H: IntoIterator, -{ - let headers = HeaderMap::from_iter( - iter::once(( - header::CONTENT_TYPE, - HeaderValue::from_static("application/json"), - )) - .chain(headers), - ); - (headers, Json(payload)) -} diff --git a/src/main.rs b/src/main.rs index 7189062bf..1a0764e98 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,6 @@ mod http_ext; mod indexer_client; mod indexers; mod indexing_performance; -mod json; mod metrics; mod middleware; mod network; @@ -25,7 +24,6 @@ mod time; #[allow(dead_code)] mod ttl_hash_map; mod unattestable_errors; -mod vouchers; use std::{ collections::HashSet, @@ -39,7 +37,6 @@ use std::{ use auth::AuthContext; use axum::{ - extract::DefaultBodyLimit, http::{self, status::StatusCode}, routing, Router, }; @@ -128,21 +125,10 @@ async fn main() { let indexing_perf = IndexingPerformance::new(network.clone()); network.wait_until_ready().await; - let legacy_signer: &'static secp256k1::SecretKey = Box::leak(Box::new( - secp256k1::SecretKey::from_slice( - conf.receipts - .legacy_signer - .as_ref() - .map(|s| s.0.as_slice()) - .unwrap_or(receipt_signer.to_bytes().as_slice()), - ) - .expect("invalid legacy signer key"), - )); let receipt_signer: &'static ReceiptSigner = Box::leak(Box::new(ReceiptSigner::new( receipt_signer, conf.receipts.chain_id, conf.receipts.verifier, - legacy_signer, ))); // Initialize the auth service @@ -241,22 +227,6 @@ async fn main() { .route("/", routing::get(|| async { "Ready to roll!" })) // This path is required by NGINX ingress controller .route("/ready", routing::get(|| async { "Ready" })) - .route( - "/collect-receipts", - routing::post(vouchers::handle_collect_receipts) - .with_state(legacy_signer) - .layer(DefaultBodyLimit::max(3_000_000)), - ) - .route( - "/partial-voucher", - routing::post(vouchers::handle_partial_voucher) - .with_state(legacy_signer) - .layer(DefaultBodyLimit::max(3_000_000)), - ) - .route( - "/voucher", - routing::post(vouchers::handle_voucher).with_state(legacy_signer), - ) .route( "/blocklist", routing::get(move || async move { diff --git a/src/metrics.rs b/src/metrics.rs index dd9f73487..afcc74409 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -14,9 +14,6 @@ pub struct Metrics { pub client_query: ResponseMetrics, pub avg_query_fees: Gauge, pub indexer_query: ResponseMetricVecs, - pub collect_receipts: ResponseMetrics, - pub partial_voucher: ResponseMetrics, - pub voucher: ResponseMetrics, pub blocks_per_minute: IntGaugeVec, } @@ -34,12 +31,6 @@ impl Metrics { "indexer query", &["deployment", "indexer"], ), - collect_receipts: ResponseMetrics::new( - "gw_collect_receipts", - "collect-receipts request", - ), - partial_voucher: ResponseMetrics::new("gw_partial_voucher", "partial-voucher request"), - voucher: ResponseMetrics::new("gw_voucher", "requests for voucher"), blocks_per_minute: register_int_gauge_vec!( "gw_blocks_per_minute", "chain blocks per minute", diff --git a/src/network/internal/snapshot.rs b/src/network/internal/snapshot.rs index 6b4c4c937..bcad301e7 100644 --- a/src/network/internal/snapshot.rs +++ b/src/network/internal/snapshot.rs @@ -3,7 +3,7 @@ use std::{ collections::{HashMap, HashSet}, fmt::Debug, - sync::{Arc, OnceLock}, + sync::Arc, }; use custom_debug::CustomDebug; @@ -19,12 +19,6 @@ use crate::network::{ internal::indexer_processing::ResolvedIndexerInfo, }; -/// The minimum indexer service version required to support Scalar TAP. -fn min_required_indexer_service_version_tap_support() -> &'static Version { - static VERSION: OnceLock = OnceLock::new(); - VERSION.get_or_init(|| "1.0.0-alpha".parse().expect("valid version")) -} - /// The [`IndexingId`] struct represents the unique identifier of an indexing. #[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)] pub struct IndexingId { @@ -97,9 +91,6 @@ pub struct Indexer { /// The indexer's "graph node" version. pub graph_node_version: Version, - /// Whether the indexer supports TAP payments. - pub tap_support: bool, - /// The indexer's staked tokens. pub staked_tokens: u128, } @@ -168,17 +159,11 @@ pub fn new_from( ( indexer_id, indexer.map(|info| { - // The indexer service version must be greater than or equal to the minimum - // required version to support Scalar TAP. - let indexer_tap_support = &info.indexer_service_version - >= min_required_indexer_service_version_tap_support(); - let indexer = Indexer { id: info.id, url: info.url.clone(), indexer_service_version: info.indexer_service_version.clone(), graph_node_version: info.graph_node_version.clone(), - tap_support: indexer_tap_support, staked_tokens: info.staked_tokens, }; diff --git a/src/receipts.rs b/src/receipts.rs index 5aa28fa07..512f710f6 100644 --- a/src/receipts.rs +++ b/src/receipts.rs @@ -1,73 +1,39 @@ -use std::{collections::HashMap, sync::Arc, time::SystemTime}; +use std::time::SystemTime; -use parking_lot::{Mutex, RwLock}; use rand::RngCore; -pub use receipts::QueryStatus as ReceiptStatus; -use receipts::ReceiptPool; -use secp256k1::SecretKey; use tap_core::{receipt::Receipt as TapReceipt, signed_message::EIP712SignedMessage}; use thegraph_core::{ alloy::{ dyn_abi::Eip712Domain, - hex, primitives::{Address, U256}, signers::local::PrivateKeySigner, }, AllocationId, }; -/// A receipt for an indexer request. -#[derive(Debug, Clone)] -pub enum Receipt { - Legacy(u128, Vec), - Tap(EIP712SignedMessage), -} +pub struct Receipt(EIP712SignedMessage); impl Receipt { - /// Returns the value of the receipt. - pub fn grt_value(&self) -> u128 { - match self { - Receipt::Legacy(value, _) => *value, - Receipt::Tap(receipt) => receipt.message.value, - } + pub fn value(&self) -> u128 { + self.0.message.value } - /// Returns the allocation ID of the receipt. pub fn allocation(&self) -> Address { - match self { - Receipt::Legacy(_, receipt) => Address::from_slice(&receipt[0..20]), - Receipt::Tap(receipt) => receipt.message.allocation_id, - } + self.0.message.allocation_id } - /// Serializes the receipt to a string. - // TODO: Move to a typed header. This code should be agnostic from the serialization format. pub fn serialize(&self) -> String { - match self { - Receipt::Legacy(_, receipt) => hex::encode(&receipt[..(receipt.len() - 32)]), - Receipt::Tap(receipt) => serde_json::to_string(&receipt).unwrap(), - } - } - - /// Returns the header name for the receipt. - // TODO: Move to a typed header. This code should be agnostic from the http headers. - pub fn header_name(&self) -> &'static str { - match self { - Receipt::Legacy(_, _) => "Scalar-Receipt", - Receipt::Tap(_) => "Tap-Receipt", - } + serde_json::to_string(&self.0).unwrap() } } -/// Scalar TAP signer. -struct TapSigner { +pub struct ReceiptSigner { signer: PrivateKeySigner, domain: Eip712Domain, } -impl TapSigner { - /// Creates a new `TapSigner`. - fn new(signer: PrivateKeySigner, chain_id: U256, verifying_contract: Address) -> Self { +impl ReceiptSigner { + pub fn new(signer: PrivateKeySigner, chain_id: U256, verifying_contract: Address) -> Self { Self { signer, domain: Eip712Domain { @@ -80,12 +46,7 @@ impl TapSigner { } } - /// Creates a new receipt for the given allocation and fee. - fn create_receipt( - &self, - allocation: AllocationId, - fee: u128, - ) -> anyhow::Result> { + pub fn create_receipt(&self, allocation: AllocationId, fee: u128) -> anyhow::Result { // Nonce generated with CSPRNG (ChaCha12), to avoid collision with receipts generated by // other gateway processes. // See https://docs.rs/rand/latest/rand/rngs/index.html#our-generators. @@ -107,112 +68,7 @@ impl TapSigner { let signed = EIP712SignedMessage::new(&self.domain, receipt, &self.signer) .map_err(|e| anyhow::anyhow!("failed to sign receipt: {:?}", e))?; - Ok(signed) - } -} - -/// Legacy Scalar signer. -struct LegacySigner { - secret_key: &'static SecretKey, - // Note: We are holding on to receipt pools indefinitely. This is acceptable, since the memory - // cost is minor and the typical duration of an allocation is 28 days. - receipt_pools: RwLock>>>, -} - -impl LegacySigner { - /// Creates a new `LegacySigner`. - fn new(secret_key: &'static SecretKey) -> Self { - Self { - secret_key, - receipt_pools: RwLock::default(), - } - } - - /// Creates a new receipt for the given allocation and fee. - fn create_receipt( - &self, - allocation: AllocationId, - fee: u128, - ) -> anyhow::Result<(u128, Vec)> { - // Get the pool for the allocation - let receipt_pool = self.receipt_pools.read().get(&allocation).cloned(); - - // If the pool for the allocation exists, use it. Otherwise, create a new pool. - let receipt = match receipt_pool { - Some(pool) => { - let mut pool = pool.lock(); - pool.commit(self.secret_key, fee.into()) - } - None => { - let mut pool = ReceiptPool::new(allocation.0 .0); - let receipt = pool.commit(self.secret_key, fee.into()); - - let mut write_guard = self.receipt_pools.write(); - write_guard.insert(allocation, Arc::new(Mutex::new(pool))); - - receipt - } - } - .map_err(|e| anyhow::anyhow!("failed to sign legacy receipt: {:?}", e))?; - - Ok((fee, receipt)) - } - - /// Record the receipt status and release it from the pool. - fn record_receipt(&self, allocation: &AllocationId, receipt: &[u8], status: ReceiptStatus) { - let legacy_pool = self.receipt_pools.read(); - if let Some(legacy_pool) = legacy_pool.get(allocation) { - legacy_pool.lock().release(receipt, status); - }; - } -} - -/// ReceiptSigner is responsible for creating receipts for indexing requests. -pub struct ReceiptSigner { - tap: TapSigner, - legacy: LegacySigner, -} - -impl ReceiptSigner { - /// Creates a new `ReceiptSigner`. - pub fn new( - signer: PrivateKeySigner, - chain_id: U256, - verifier: Address, - legacy_signer: &'static SecretKey, - ) -> Self { - Self { - tap: TapSigner::new(signer, chain_id, verifier), - legacy: LegacySigner::new(legacy_signer), - } - } - - /// Creates a new Scalar TAP receipt for the given allocation and fee. - pub fn create_receipt(&self, allocation: AllocationId, fee: u128) -> anyhow::Result { - self.tap.create_receipt(allocation, fee).map(Receipt::Tap) - } - - /// Creates a new Scalar legacy receipt for the given allocation and fee. - pub fn create_legacy_receipt( - &self, - allocation: AllocationId, - fee: u128, - ) -> anyhow::Result { - self.legacy - .create_receipt(allocation, fee) - .map(|(fee, receipt)| Receipt::Legacy(fee, receipt)) - } - - /// Record the receipt status and release it from the pool. - pub fn record_receipt( - &self, - allocation: &AllocationId, - receipt: &Receipt, - status: ReceiptStatus, - ) { - if let Receipt::Legacy(_, receipt) = receipt { - self.legacy.record_receipt(allocation, receipt, status); - } + Ok(Receipt(signed)) } } @@ -225,141 +81,25 @@ mod tests { use super::*; - mod legacy { - use thegraph_core::allocation_id; - - use super::*; - - #[test] - fn create_receipt() { - //* Given - let secret_key = Box::leak(Box::new( - SecretKey::from_slice(&[0xcd; 32]).expect("invalid secret key"), - )); - - let signer = LegacySigner::new(secret_key); - - // let indexer = address!("0xbdfb5ee5a2abf4fc7bb1bd1221067aef7f9de491"); - // let deployment = deployment_id!("QmaqcZxm6gcgWhWpQ88YKDm1keJDMpNxNGwtEDvjrjjNKh"); - let largest_allocation = allocation_id!("89b23fea4e46d40e8a4c6cca723e2a03fdd4bec2"); - let fee = 1000; - - //* When - let res = signer.create_receipt(largest_allocation, fee); - - //* Then - let receipt = res.expect("failed to create legacy receipt"); - - assert_eq!(receipt.0, fee); - assert!(!receipt.1.is_empty()); - } - - #[test] - fn create_receipt_with_preexisting_pool() { - //* Given - let secret_key = Box::leak(Box::new( - SecretKey::from_slice(&[0xcd; 32]).expect("invalid secret key"), - )); - - let signer = LegacySigner::new(secret_key); - - let largest_allocation = allocation_id!("89b23fea4e46d40e8a4c6cca723e2a03fdd4bec2"); - let fee = 1000; - - // Pre-condition: Create a receipt so the pool for the allocation exists - let _ = signer.create_receipt(largest_allocation, fee); - - //* When - let res = signer.create_receipt(largest_allocation, fee); - - //* Then - let receipt = res.expect("failed to create legacy receipt"); - - assert_eq!(receipt.0, fee); - assert!(!receipt.1.is_empty()); - } - } - - mod tap { - use thegraph_core::{ - allocation_id, - alloy::{primitives::address, signers::local::PrivateKeySigner}, - }; - - use super::*; - - #[test] - fn create_receipt() { - //* Given - let secret_key = PrivateKeySigner::from_slice(&[0xcd; 32]).expect("invalid secret key"); - let signer = TapSigner::new( - secret_key, - 1.try_into().expect("invalid chain id"), - address!("177b557b12f22bb17a9d73dcc994d978dd6f5f89"), - ); - - let allocation = allocation_id!("89b23fea4e46d40e8a4c6cca723e2a03fdd4bec2"); - let fee = 1000; - - //* When - let res = signer.create_receipt(allocation, fee); - - //* Then - let receipt = res.expect("failed to create tap receipt"); - - assert_eq!(receipt.message.value, fee); - } - } - - #[test] - fn create_legacy_receipt() { - //* Given - let tap_signer = PrivateKeySigner::from_slice(&[0xcd; 32]).expect("invalid secret key"); - let legacy_secret_key = Box::leak(Box::new( - SecretKey::from_slice(&[0xcd; 32]).expect("invalid secret key"), - )); - - let signer = ReceiptSigner::new( - tap_signer, - 1.try_into().expect("invalid chain id"), - allocation_id!("177b557b12f22bb17a9d73dcc994d978dd6f5f89").into_inner(), - legacy_secret_key, - ); - - let largest_allocation = allocation_id!("89b23fea4e46d40e8a4c6cca723e2a03fdd4bec2"); - let fee = 1000; - - //* When - let res = signer.create_legacy_receipt(largest_allocation, fee); - - //* Then - let receipt = res.expect("failed to create legacy receipt"); - assert!(matches!(receipt, Receipt::Legacy(_, _))); - } - #[test] - fn create_tap_receipt() { + fn create_receipt() { //* Given - let tap_signer = PrivateKeySigner::from_slice(&[0xcd; 32]).expect("invalid secret key"); - let legacy_secret_key = Box::leak(Box::new( - SecretKey::from_slice(&[0xcd; 32]).expect("invalid secret key"), - )); - + let secret_key = PrivateKeySigner::from_slice(&[0xcd; 32]).expect("invalid secret key"); let signer = ReceiptSigner::new( - tap_signer, + secret_key, 1.try_into().expect("invalid chain id"), address!("177b557b12f22bb17a9d73dcc994d978dd6f5f89"), - legacy_secret_key, ); - let largest_allocation = allocation_id!("89b23fea4e46d40e8a4c6cca723e2a03fdd4bec2"); + let allocation = allocation_id!("89b23fea4e46d40e8a4c6cca723e2a03fdd4bec2"); let fee = 1000; //* When - let res = signer.create_receipt(largest_allocation, fee); + let res = signer.create_receipt(allocation, fee); //* Then let receipt = res.expect("failed to create tap receipt"); - assert!(matches!(receipt, Receipt::Tap(_))); + + assert_eq!(receipt.value(), fee); } } diff --git a/src/reports.rs b/src/reports.rs index 21edbaf58..0a18c23b0 100644 --- a/src/reports.rs +++ b/src/reports.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, Context}; use ordered_float::NotNan; use prost::Message; -use thegraph_core::{alloy::primitives::Address, AllocationId, DeploymentId, IndexerId}; +use thegraph_core::{alloy::primitives::Address, DeploymentId, IndexerId}; use tokio::sync::mpsc; use crate::{concat_bytes, errors, indexer_client::IndexerResponse, receipts::Receipt}; @@ -21,7 +21,6 @@ pub struct ClientRequest { pub struct IndexerRequest { pub indexer: IndexerId, pub deployment: DeploymentId, - pub largest_allocation: AllocationId, pub url: String, pub receipt: Receipt, pub subgraph_chain: String, @@ -88,7 +87,7 @@ impl Reporter { allocation: indexer_request.receipt.allocation().to_vec(), indexed_chain: indexer_request.subgraph_chain.clone(), url: indexer_request.url.clone(), - fee_grt: indexer_request.receipt.grt_value() as f64 * 1e-18, + fee_grt: indexer_request.receipt.value() as f64 * 1e-18, response_time_ms: indexer_request.response_time_ms as u32, seconds_behind: indexer_request.seconds_behind, result: indexer_request @@ -108,14 +107,13 @@ impl Reporter { }) .unwrap_or_default(), blocks_behind: indexer_request.blocks_behind, - legacy_scalar: matches!(&indexer_request.receipt, Receipt::Legacy(_, _)), }) .collect(); let total_fees_grt: f64 = client_request .indexer_requests .iter() - .map(|i| i.receipt.grt_value() as f64 * 1e-18) + .map(|i| i.receipt.value() as f64 * 1e-18) .sum(); let total_fees_usd: f64 = total_fees_grt / *client_request.grt_per_usd; @@ -239,8 +237,6 @@ pub struct IndexerQueryProtobuf { indexer_errors: String, #[prost(uint64, tag = "11")] blocks_behind: u64, - #[prost(bool, tag = "12")] - legacy_scalar: bool, } #[derive(prost::Message)] diff --git a/src/vouchers.rs b/src/vouchers.rs deleted file mode 100644 index c934d650a..000000000 --- a/src/vouchers.rs +++ /dev/null @@ -1,195 +0,0 @@ -use axum::{body::Bytes, extract::State, http::StatusCode}; -use lazy_static::lazy_static; -use receipts::{self, combine_partial_vouchers, receipts_to_partial_voucher, receipts_to_voucher}; -use secp256k1::{PublicKey, Secp256k1, SecretKey}; -use serde::Deserialize; -use serde_json::json; -use thegraph_core::alloy::{ - hex, - primitives::{Address, FixedBytes, U256}, -}; - -use crate::{ - json::{json_response, JsonResponse}, - metrics::METRICS, -}; - -lazy_static! { - static ref SECP256K1: Secp256k1 = Secp256k1::new(); -} - -pub async fn handle_collect_receipts( - State(signer): State<&'static SecretKey>, - payload: Bytes, -) -> Result { - let _timer = METRICS.collect_receipts.duration.start_timer(); - match process_oneshot_voucher(signer, &payload) { - Ok(response) => { - METRICS.collect_receipts.ok.inc(); - Ok(response) - } - Err(collect_receipts_err) => { - METRICS.collect_receipts.err.inc(); - tracing::info!(%collect_receipts_err); - Err((StatusCode::BAD_REQUEST, collect_receipts_err)) - } - } -} - -fn process_oneshot_voucher(signer: &SecretKey, payload: &Bytes) -> Result { - let (allocation_id, receipts) = parse_receipts(payload)?; - let allocation_signer = PublicKey::from_secret_key(&SECP256K1, signer); - let voucher = receipts_to_voucher(&allocation_id, &allocation_signer, signer, receipts) - .map_err(|err| err.to_string())?; - tracing::info!( - allocation = %Address::from(allocation_id), - receipts_size = receipts.len(), - fees = %voucher.fees.to_string(), - "collect receipts request", - ); - // Don't allow more than 10M GRT in a single collection - if voucher.fees > primitive_types::U256::from(10000000000000000000000000_u128) { - tracing::error!(excessive_voucher_fees = %voucher.fees); - return Err("Voucher value too large".into()); - } - Ok(json_response( - [], - json!({ - "allocation": format!("0x{}", hex::encode(voucher.allocation_id)), - "amount": voucher.fees.to_string(), - "signature": format!("0x{}", hex::encode(voucher.signature)), - }), - )) -} - -pub async fn handle_partial_voucher( - State(signer): State<&'static SecretKey>, - payload: Bytes, -) -> Result { - let _timer = METRICS.partial_voucher.duration.start_timer(); - match process_partial_voucher(signer, &payload) { - Ok(response) => { - METRICS.partial_voucher.ok.inc(); - Ok(response) - } - Err(partial_voucher_err) => { - METRICS.partial_voucher.err.inc(); - tracing::info!(%partial_voucher_err); - Err((StatusCode::BAD_REQUEST, partial_voucher_err)) - } - } -} - -fn process_partial_voucher(signer: &SecretKey, payload: &Bytes) -> Result { - let (allocation_id, receipts) = parse_receipts(payload)?; - let allocation_signer = PublicKey::from_secret_key(&SECP256K1, signer); - let partial_voucher = - receipts_to_partial_voucher(&allocation_id, &allocation_signer, signer, receipts) - .map_err(|err| err.to_string())?; - tracing::info!( - allocation = %Address::from(allocation_id), - receipts_size = receipts.len(), - fees = %partial_voucher.voucher.fees.to_string(), - "partial voucher request", - ); - // 10M GRT - if partial_voucher.voucher.fees > primitive_types::U256::from(10000000000000000000000000u128) { - tracing::error!(excessive_voucher_fees = %partial_voucher.voucher.fees); - return Err("Voucher value too large".into()); - } - Ok(json_response( - [], - json!({ - "allocation": format!("0x{}", hex::encode(partial_voucher.voucher.allocation_id)), - "fees": partial_voucher.voucher.fees.to_string(), - "signature": format!("0x{}", hex::encode(partial_voucher.voucher.signature)), - "receipt_id_min": format!("0x{}", hex::encode(partial_voucher.receipt_id_min)), - "receipt_id_max": format!("0x{}", hex::encode(partial_voucher.receipt_id_max)), - }), - )) -} - -pub async fn handle_voucher( - State(signer): State<&'static SecretKey>, - payload: Bytes, -) -> Result { - let _timer = METRICS.voucher.duration.start_timer(); - match process_voucher(signer, &payload) { - Ok(response) => { - METRICS.voucher.ok.inc(); - Ok(response) - } - Err(voucher_err) => { - METRICS.voucher.err.inc(); - tracing::info!(%voucher_err); - Err((StatusCode::BAD_REQUEST, voucher_err)) - } - } -} - -fn process_voucher(signer: &SecretKey, payload: &Bytes) -> Result { - let request = - serde_json::from_slice::(payload).map_err(|err| err.to_string())?; - let allocation_id = request.allocation; - let partial_vouchers = request - .partial_vouchers - .into_iter() - .map(|pv| receipts::PartialVoucher { - voucher: receipts::Voucher { - allocation_id: allocation_id.into(), - fees: primitive_types::U256::from_little_endian(&pv.fees.as_le_bytes()), - signature: pv.signature.into(), - }, - receipt_id_min: *pv.receipt_id_min, - receipt_id_max: *pv.receipt_id_max, - }) - .collect::>(); - let voucher = combine_partial_vouchers(&allocation_id.0, signer, &partial_vouchers) - .map_err(|err| err.to_string())?; - tracing::info!( - allocation = %allocation_id, - partial_vouchers = partial_vouchers.len(), - fees = %voucher.fees.to_string(), - "voucher request", - ); - // 10M GRT - if voucher.fees > primitive_types::U256::from(10000000000000000000000000u128) { - tracing::error!(excessive_voucher_fees = %voucher.fees); - return Err("Voucher value too large".into()); - } - Ok(json_response( - [], - json!({ - "allocation": allocation_id.to_string(), - "fees": voucher.fees.to_string(), - "signature": format!("0x{}", hex::encode(voucher.signature)), - }), - )) -} - -fn parse_receipts(payload: &[u8]) -> Result<([u8; 20], &[u8]), String> { - if payload.len() < 20 { - return Err("Invalid request data".into()); - } - let mut allocation_id = [0u8; 20]; - allocation_id.copy_from_slice(&payload[..20]); - Ok((allocation_id, &payload[20..])) -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct VoucherRequest { - allocation: Address, - partial_vouchers: Vec, -} - -#[derive(Deserialize)] -struct PartialVoucher { - signature: Signature, - fees: U256, - receipt_id_min: ReceiptID, - receipt_id_max: ReceiptID, -} - -type Signature = FixedBytes<65>; -type ReceiptID = FixedBytes<15>;