From 99edfbc33fe8c0ac9a2ed57a188fd98c9e60f9b1 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Thu, 5 Dec 2024 23:59:14 -0500 Subject: [PATCH] refactor: consolidate cost model handling --- src/indexers.rs | 1 - src/indexers/cost_models.rs | 112 ---------------- src/indexers/urls.rs | 19 +-- src/network.rs | 2 +- src/network/cost_model.rs | 121 ++++++++++++++++++ .../indexer_indexing_cost_model_resolver.rs | 93 -------------- src/network/internal/state.rs | 2 +- src/network/service.rs | 4 +- 8 files changed, 126 insertions(+), 228 deletions(-) delete mode 100644 src/indexers/cost_models.rs create mode 100644 src/network/cost_model.rs delete mode 100644 src/network/indexer_indexing_cost_model_resolver.rs diff --git a/src/indexers.rs b/src/indexers.rs index bb3e6397..65c8448b 100644 --- a/src/indexers.rs +++ b/src/indexers.rs @@ -1,6 +1,5 @@ pub use urls::*; -pub mod cost_models; pub mod indexing_progress; pub mod public_poi; mod urls; diff --git a/src/indexers/cost_models.rs b/src/indexers/cost_models.rs deleted file mode 100644 index 353953ab..00000000 --- a/src/indexers/cost_models.rs +++ /dev/null @@ -1,112 +0,0 @@ -use serde::Deserialize; -use thegraph_core::DeploymentId; -use thegraph_graphql_http::{ - graphql::{Document, IntoDocument, IntoDocumentWithVariables}, - http_client::{RequestError, ReqwestExt, ResponseError}, -}; - -use super::urls::CostUrl; - -const COST_MODEL_QUERY_DOCUMENT: &str = r#" - query costModels($deployments: [String!]!) { - costModels(deployments: $deployments) { - deployment - model - } - } -"#; - -/// Errors that can occur while fetching the cost models. -#[derive(Clone, Debug, thiserror::Error)] -pub enum Error { - /// The request failed. - #[error("request error: {0}")] - Request(String), - - /// Invalid response. - /// - /// The response could not be deserialized or is missing required fields. - #[error("invalid response: {0}")] - InvalidResponse(String), - - /// The response did not contain any cost model information. - #[error("empty response")] - EmptyResponse, -} - -/// Send a request to the indexer to get the cost models of the given deployments. -pub async fn send_request( - client: &reqwest::Client, - url: CostUrl, - deployments: impl IntoIterator, -) -> Result, Error> { - let resp = client - .post(url.into_inner()) - .send_graphql::(Request::new(deployments)) - .await - .map_err(|err| match err { - RequestError::RequestSerializationError(..) => { - unreachable!("request serialization should not fail") - } - RequestError::RequestSendError(..) | RequestError::ResponseRecvError(..) => { - Error::Request(err.to_string()) - } - RequestError::ResponseDeserializationError { .. } => { - Error::InvalidResponse(err.to_string()) - } - })? - .map_err(|err| match err { - ResponseError::Failure { .. } => Error::Request(err.to_string()), - ResponseError::Empty => Error::EmptyResponse, - })?; - - Ok(resp.cost_models) -} - -/// The request type for the cost model query. -/// -/// This is a GraphQL query that fetches the cost models for a set of deployments. -/// -/// See [`COST_MODEL_QUERY_DOCUMENT`] for the query document. -#[derive(Debug, Clone)] -struct Request { - document: Document, - vars_deployments: Vec, -} - -impl Request { - /// Create a new cost model query request. - pub fn new<'a>(deployments: impl IntoIterator) -> Self { - let deployments = deployments - .into_iter() - .map(|item| item.to_string()) - .collect(); - Self { - document: COST_MODEL_QUERY_DOCUMENT.into_document(), - vars_deployments: deployments, - } - } -} - -impl IntoDocumentWithVariables for Request { - type Variables = serde_json::Value; - - fn into_document_with_variables(self) -> (Document, Self::Variables) { - ( - self.document, - serde_json::json!({ "deployments": self.vars_deployments }), - ) - } -} - -#[derive(Deserialize)] -#[serde(rename_all = "camelCase")] -struct Response { - cost_models: Vec, -} - -#[derive(Debug, Deserialize)] -pub struct CostModelSource { - pub deployment: DeploymentId, - pub model: String, -} diff --git a/src/indexers/urls.rs b/src/indexers/urls.rs index 442d3163..5931ff1a 100644 --- a/src/indexers/urls.rs +++ b/src/indexers/urls.rs @@ -14,18 +14,6 @@ pub fn status_url>(url: U) -> StatusUrl { StatusUrl(url) } -/// Builds the URL to the cost model endpoint of the indexer. -/// -/// # Panics -/// The function panics if the URL cannot be built. -pub fn cost_url>(url: U) -> CostUrl { - let url = url - .borrow() - .join("cost/") - .expect("failed to build indexer cost URL"); - CostUrl(url) -} - /// Newtype wrapper around `Url` to provide type safety. macro_rules! url_new_type { ($name:ident) => { @@ -69,13 +57,12 @@ macro_rules! url_new_type { } url_new_type!(StatusUrl); -url_new_type!(CostUrl); #[cfg(test)] mod tests { use url::Url; - use super::{cost_url, status_url}; + use super::status_url; /// Ensure the different URL builder functions accept owned and borrowed URL parameters. #[test] @@ -85,9 +72,5 @@ mod tests { // Status URL let _ = status_url(&url); let _ = status_url(url.clone()); - - // Cost URL - let _ = cost_url(&url); - let _ = cost_url(url.clone()); } } diff --git a/src/network.rs b/src/network.rs index 2f4f93af..d9b9f807 100644 --- a/src/network.rs +++ b/src/network.rs @@ -2,9 +2,9 @@ pub use errors::{DeploymentError, SubgraphError}; pub use internal::{Indexing, IndexingId}; pub use service::{NetworkService, ResolvedSubgraphInfo}; +pub mod cost_model; mod errors; pub mod host_filter; -pub mod indexer_indexing_cost_model_resolver; pub mod indexer_indexing_poi_blocklist; pub mod indexer_indexing_poi_resolver; pub mod indexer_indexing_progress_resolver; diff --git a/src/network/cost_model.rs b/src/network/cost_model.rs new file mode 100644 index 00000000..24296d37 --- /dev/null +++ b/src/network/cost_model.rs @@ -0,0 +1,121 @@ +use std::collections::HashMap; + +use anyhow::anyhow; +use thegraph_core::DeploymentId; +use thegraph_graphql_http::{ + graphql::{Document, IntoDocument, IntoDocumentWithVariables}, + http_client::ReqwestExt, +}; +use url::Url; + +pub struct CostModelResolver { + http: reqwest::Client, + cache: parking_lot::Mutex>, +} + +impl CostModelResolver { + pub fn new(http: reqwest::Client) -> Self { + Self { + http, + cache: Default::default(), + } + } + + pub async fn resolve( + &self, + url: &Url, + indexings: &[DeploymentId], + ) -> HashMap { + let sources = match self.fetch_cost_model_sources(url, indexings).await { + Ok(sources) => sources, + Err(cost_model_err) => { + tracing::debug!(%url, %cost_model_err); + return self.cache.lock().clone(); + } + }; + + // Only support cost models of the form `default => x;`. + let cost_models: HashMap = sources + .into_iter() + .filter_map(|(deployment, src)| Some((deployment, parse_simple_cost_model(&src)?))) + .collect(); + + *self.cache.lock() = cost_models.clone(); + cost_models + } + + async fn fetch_cost_model_sources( + &self, + url: &Url, + deployments: &[DeploymentId], + ) -> anyhow::Result> { + let url = url.join("cost").map_err(|_| anyhow!("invalid URL"))?; + + const QUERY: &str = r#" + query costModels($deployments: [String!]!) { + costModels(deployments: $deployments) { + deployment + model + } + } + "#; + let deployments = deployments.iter().map(|item| item.to_string()).collect(); + #[derive(serde::Deserialize)] + #[serde(rename_all = "camelCase")] + struct Response { + cost_models: Vec, + } + #[derive(serde::Deserialize)] + pub struct CostModelSource { + pub deployment: DeploymentId, + pub model: String, + } + struct Request { + deployments: Vec, + } + impl IntoDocumentWithVariables for Request { + type Variables = serde_json::Value; + fn into_document_with_variables(self) -> (Document, Self::Variables) { + ( + QUERY.into_document(), + serde_json::json!({ "deployments": self.deployments }), + ) + } + } + let resp = self + .http + .post(url) + .send_graphql::(Request { deployments }) + .await??; + Ok(resp + .cost_models + .into_iter() + .map(|CostModelSource { deployment, model }| (deployment, model)) + .collect()) + } +} + +fn parse_simple_cost_model(src: &str) -> Option { + let (_, rest) = src.split_once("default")?; + let (_, rest) = rest.split_once("=>")?; + let (consumed, _) = rest.split_once(";")?; + let token = consumed.trim(); + let fee: f64 = token.parse().ok()?; + Some((fee * 1e18) as u128) +} + +#[cfg(test)] +mod test { + #[test] + fn parse_simple_cost_model() { + let tests = [ + ("default => 0;", 0), + ("default => 1;", 1000000000000000000), + ("default => 0.00001;", 10000000000000), + (" default => 0.004100 ; ", 4100000000000000), + ]; + for (src, expected) in tests { + assert_eq!(super::parse_simple_cost_model(src), Some(expected)); + } + } +} diff --git a/src/network/indexer_indexing_cost_model_resolver.rs b/src/network/indexer_indexing_cost_model_resolver.rs deleted file mode 100644 index 95c9919c..00000000 --- a/src/network/indexer_indexing_cost_model_resolver.rs +++ /dev/null @@ -1,93 +0,0 @@ -//! Resolves the cost models for the indexers' deployments. -//! -//! The cost models are fetched from the indexer's cost URL. - -use std::{collections::HashMap, time::Duration}; - -use thegraph_core::DeploymentId; -use url::Url; - -use crate::{indexers, indexers::cost_models::CostModelSource}; - -/// Resolve the indexers' cost models sources and compile them into cost models. -pub struct CostModelResolver { - client: reqwest::Client, - timeout: Duration, - cache: parking_lot::Mutex>, -} - -impl CostModelResolver { - pub fn new(client: reqwest::Client, timeout: Duration) -> Self { - Self { - client, - timeout, - cache: Default::default(), - } - } - - async fn fetch_cost_model_sources( - &self, - url: &Url, - indexings: &[DeploymentId], - ) -> anyhow::Result> { - let indexer_cost_url = indexers::cost_url(url); - tokio::time::timeout( - self.timeout, - indexers::cost_models::send_request(&self.client, indexer_cost_url, indexings), - ) - .await? - .map_err(Into::into) - } - - /// Fetches the cost model sources for the given deployments from the indexer. - /// - /// Returns a map of deployment IDs to the retrieved cost model sources. If certain deployment - /// ID's cost model fetch fails, the corresponding value in the map is `None`. - pub async fn resolve( - &self, - url: &Url, - indexings: &[DeploymentId], - ) -> HashMap { - let sources = match self.fetch_cost_model_sources(url, indexings).await { - Ok(sources) => sources, - Err(cost_model_err) => { - tracing::debug!(%url, %cost_model_err); - return self.cache.lock().clone(); - } - }; - - // Only support cost models of the form `default => x;`. - let cost_models: HashMap = sources - .into_iter() - .filter_map(|src| Some((src.deployment, parse_simple_cost_model(&src.model)?))) - .collect(); - - *self.cache.lock() = cost_models.clone(); - cost_models - } -} - -fn parse_simple_cost_model(src: &str) -> Option { - let (_, rest) = src.split_once("default")?; - let (_, rest) = rest.split_once("=>")?; - let (consumed, _) = rest.split_once(";")?; - let token = consumed.trim(); - let fee: f64 = token.parse().ok()?; - Some((fee * 1e18) as u128) -} - -#[cfg(test)] -mod test { - #[test] - fn parse_simple_cost_model() { - let tests = [ - ("default => 0;", 0), - ("default => 1;", 1000000000000000000), - ("default => 0.00001;", 10000000000000), - (" default => 0.004100 ; ", 4100000000000000), - ]; - for (src, expected) in tests { - assert_eq!(super::parse_simple_cost_model(src), Some(expected)); - } - } -} diff --git a/src/network/internal/state.rs b/src/network/internal/state.rs index f1ff8c0d..9867f0c9 100644 --- a/src/network/internal/state.rs +++ b/src/network/internal/state.rs @@ -5,7 +5,7 @@ use thegraph_core::alloy::primitives::Address; use crate::{ config::BlockedIndexer, network::{ - host_filter::HostFilter, indexer_indexing_cost_model_resolver::CostModelResolver, + cost_model::CostModelResolver, host_filter::HostFilter, indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver, indexer_indexing_progress_resolver::IndexingProgressResolver, version_filter::VersionFilter, diff --git a/src/network/service.rs b/src/network/service.rs index d1634f91..41bb765c 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -16,9 +16,9 @@ use thegraph_core::{ use tokio::{sync::watch, time::MissedTickBehavior}; use super::{ + cost_model::CostModelResolver, errors::{DeploymentError, SubgraphError}, host_filter::HostFilter, - indexer_indexing_cost_model_resolver::CostModelResolver, indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver, indexer_indexing_progress_resolver::IndexingProgressResolver, @@ -192,7 +192,7 @@ pub fn spawn( http.clone(), Duration::from_secs(25), ), - cost_model_resolver: CostModelResolver::new(http.clone(), Duration::from_secs(5)), + cost_model_resolver: CostModelResolver::new(http.clone()), }; let update_interval = Duration::from_secs(60); let network = spawn_updater_task(subgraph_client, internal_state, update_interval);