Skip to content

Commit

Permalink
fix: limit indexing status requests by active allocations (#460)
Browse files Browse the repository at this point in the history
Unfiltered indexing status requests can quickly become oversized when
the indexer has indexed many deployments without active allocations
associated with them. In the future we will also want to batch these
requests if the set of deployments is too large. See #459
  • Loading branch information
Theodus authored Dec 14, 2023
1 parent b9912cc commit f5a5d64
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 51 deletions.
50 changes: 31 additions & 19 deletions graph-gateway/src/indexers/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,32 @@ async fn update_statuses(
deployments: &HashMap<DeploymentId, Arc<Deployment>>,
) {
// There can only be one URL per indexer entity in the network subgraph
let indexers: HashMap<Address, Url> = deployments
.values()
.flat_map(|deployment| &deployment.indexers)
.map(|indexer| (indexer.id, indexer.url.clone()))
.collect();
let mut indexers: HashMap<Address, (Url, Vec<DeploymentId>)> = Default::default();
for deployment in deployments.values() {
for indexer in &deployment.indexers {
let (_, deployments) = indexers
.entry(indexer.id)
.or_insert_with(|| (indexer.url.clone(), vec![]));
deployments.push(deployment.id);
}
}

let statuses = join_all(indexers.into_iter().map(move |(indexer, url)| {
let client = client.clone();
async move {
match update_indexer(actor, client, indexer, url).await {
Ok(indexings) => indexings,
Err(indexer_status_err) => {
tracing::warn!(indexer_status_err);
vec![]
let statuses = join_all(
indexers
.into_iter()
.map(move |(indexer, (url, deployments))| {
let client = client.clone();
async move {
match update_indexer(actor, client, indexer, url, deployments).await {
Ok(indexings) => indexings,
Err(indexer_status_err) => {
tracing::warn!(indexer_status_err);
vec![]
}
}
}
}
}
}))
}),
)
.await
.into_iter()
.flatten();
Expand All @@ -114,6 +122,7 @@ async fn update_indexer(
client: reqwest::Client,
indexer: Address,
url: Url,
deployments: Vec<DeploymentId>,
) -> Result<Vec<(Indexing, Status)>, String> {
let version_url = url
.join("version")
Expand All @@ -127,7 +136,7 @@ async fn update_indexer(
apply_geoblocking(&mut locked_actor, &url).await?;
drop(locked_actor);

query_status(actor, &client, indexer, url, version)
query_status(actor, &client, indexer, url, deployments, version)
.await
.map_err(|err| format!("IndexerStatusError({err})"))
}
Expand Down Expand Up @@ -176,8 +185,9 @@ async fn apply_geoblocking(actor: &mut Actor, url: &Url) -> Result<(), String> {
async fn query_indexer_for_indexing_statuses(
client: reqwest::Client,
status_url: Url,
deployments: Vec<DeploymentId>,
) -> Result<Vec<IndexingStatusResponse>, String> {
indexing_statuses::query(client, status_url)
indexing_statuses::query(client, status_url, &deployments)
.await
.map_err(|err| err.to_string())
.map(|res| res.indexing_statuses)
Expand Down Expand Up @@ -213,10 +223,12 @@ async fn query_status(
client: &reqwest::Client,
indexer: Address,
url: Url,
deployments: Vec<DeploymentId>,
version: Version,
) -> Result<Vec<(Indexing, Status)>, String> {
let status_url = url.join("status").map_err(|err| err.to_string())?;
let statuses = query_indexer_for_indexing_statuses(client.clone(), status_url.into()).await?;
let statuses =
query_indexer_for_indexing_statuses(client.clone(), status_url.into(), deployments).await?;

let cost_url = url.join("cost").map_err(|err| err.to_string())?;
let cost_models = query_indexer_for_cost_models(
Expand Down
55 changes: 27 additions & 28 deletions graph-gateway/src/indexers/indexing_statuses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,44 @@ use std::borrow::Cow;

use alloy_primitives::BlockHash;
use graphql_http::http_client::ReqwestExt;
use indoc::indoc;
use indoc::formatdoc;
use itertools::Itertools;
use serde::{Deserialize, Deserializer};
use thegraph::types::DeploymentId;
use toolshed::url::Url;

pub async fn query(
client: reqwest::Client,
status_url: Url,
deployments: &[DeploymentId],
) -> anyhow::Result<IndexingStatusesResponse> {
let res = client
.post(status_url.0)
.send_graphql(INDEXING_STATUSES_QUERY_DOCUMENT)
.await;
match res {
let deployments = deployments.iter().map(|d| format!("\"{d}\"")).join(",");
let query = formatdoc! {
r#"{{
indexingStatuses(subgraphs: [{deployments}]) {{
subgraph
chains {{
network
latestBlock {{
number
hash
}}
earliestBlock {{
number
hash
}}
}}
}}
}}"#
};
match client.post(status_url.0).send_graphql(query).await {
Ok(res) => Ok(res?),
Err(e) => Err(anyhow::anyhow!(
"Error sending indexing statuses query: {}",
e
Err(err) => Err(anyhow::anyhow!(
"Error sending indexing statuses query: {err}"
)),
}
}

pub const INDEXING_STATUSES_QUERY_DOCUMENT: &str = indoc! {
r#"{
indexingStatuses(subgraphs: []) {
subgraph
chains {
network
latestBlock {
number
hash
}
earliestBlock {
number
hash
}
}
}
}"#
};

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct IndexingStatusesResponse {
Expand Down Expand Up @@ -88,6 +85,8 @@ mod tests {
mod response {
use super::*;

use indoc::indoc;

#[test]
fn deserialize_indexing_statuses_response() {
//// Given
Expand Down
11 changes: 7 additions & 4 deletions graph-gateway/tests/it_indexers_status_indexing_statuses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ async fn query_indexer_indexing_statuses() {
.parse()
.expect("Invalid status url");

let test_deployment = test_deployment_id("QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH");
let test_deployments = [
test_deployment_id("QmeYTH2fK2wv96XvnCGH2eyKFE8kmRfo53zYVy5dKysZtH"),
test_deployment_id("QmSqxfDGyGenGFPkqw9sqnYar4XgzaioVWNvhw5QQ3RB1U"),
];

//// When
let request = indexing_statuses::query(client, status_url);
let request = indexing_statuses::query(client, status_url, &test_deployments);
let response = timeout(Duration::from_secs(60), request)
.await
.expect("timeout");

//// Then
assert_matches!(response, Ok(resp) => {
assert!(!resp.indexing_statuses.is_empty());
assert!(resp.indexing_statuses.iter().any(|status| status.subgraph == test_deployment));
assert!(resp.indexing_statuses.len() == 2);
assert!(test_deployments.iter().all(|deployment| resp.indexing_statuses.iter().any(|status| &status.subgraph == deployment)));
});
}

0 comments on commit f5a5d64

Please sign in to comment.