From 3862da78bba1952599e8539db9fbc676ced3815f Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Fri, 17 May 2024 12:17:43 -0400 Subject: [PATCH] fix: remove retry loop (#744) --- graph-gateway/src/client_query.rs | 271 ++++++++++------------ graph-gateway/src/client_query/context.rs | 1 - graph-gateway/src/config.rs | 4 - graph-gateway/src/main.rs | 1 - 4 files changed, 124 insertions(+), 153 deletions(-) diff --git a/graph-gateway/src/client_query.rs b/graph-gateway/src/client_query.rs index 9a9dc163..eb2662b1 100644 --- a/graph-gateway/src/client_query.rs +++ b/graph-gateway/src/client_query.rs @@ -373,161 +373,138 @@ async fn handle_client_query_inner( return Err(Error::BadIndexers(indexer_errors)); } - let mut total_indexer_fees_grt: u128 = 0; - for retry in 0..ctx.indexer_selection_retry_limit { - // Make sure our observations are up-to-date if retrying. - if retry > 0 { - ctx.indexing_perf.flush().await; - - // Update candidate performance. - let perf_snapshots = ctx.indexing_perf.latest(); - for candidate in &mut candidates { - let indexing = Indexing { - indexer: candidate.indexer, - deployment: candidate.deployment, - }; - if let Some(updated) = perf_snapshots.get(&indexing).and_then(|snapshot| { - perf(snapshot, &block_requirements, chain_head, blocks_per_minute) - }) { - candidate.perf = updated.response; - candidate.seconds_behind = updated.seconds_behind; - } - } - } - - let selected_candidates: ArrayVec<&Candidate, SELECTION_LIMIT> = - indexer_selection::select(&candidates); - let selections_len = selected_candidates.len(); - let mut selections: Vec = Default::default(); - for candidate in selected_candidates { - let indexing = Indexing { - indexer: candidate.indexer, - deployment: candidate.deployment, - }; - - // over-pay indexers to hit target - let min_fee = *ctx.budgeter.min_indexer_fees.borrow(); - let min_fee = *(min_fee.0 * grt_per_usd * one_grt) / selections_len as f64; - let indexer_fee = candidate.fee.as_f64() * budget as f64; - let fee = indexer_fee.max(min_fee) as u128; - - let receipt = match ctx.receipt_signer.create_receipt(&indexing, fee).await { - Some(receipt) => receipt, - None => { - tracing::error!(?indexing, "failed to create receipt"); - continue; - } - }; - debug_assert!(fee == receipt.grt_value()); + let selected_candidates: ArrayVec<&Candidate, SELECTION_LIMIT> = + indexer_selection::select(&candidates); + let selections_len = selected_candidates.len(); + let mut selections: Vec = Default::default(); + for candidate in selected_candidates { + let indexing = Indexing { + indexer: candidate.indexer, + deployment: candidate.deployment, + }; - let blocks_behind = (candidate.seconds_behind as f64 / 60.0) * blocks_per_minute as f64; - selections.push(Selection { - indexing, - url: candidate.url.clone(), - receipt, - blocks_behind: blocks_behind as u64, - }); - } - if selections.is_empty() { - // Candidates that would never be selected should be filtered out for improved errors. - tracing::error!("no candidates selected"); - continue; - } + // over-pay indexers to hit target + let min_fee = *ctx.budgeter.min_indexer_fees.borrow(); + let min_fee = *(min_fee.0 * grt_per_usd * one_grt) / selections_len as f64; + let indexer_fee = candidate.fee.as_f64() * budget as f64; + let fee = indexer_fee.max(min_fee) as u128; + + let receipt = match ctx.receipt_signer.create_receipt(&indexing, fee).await { + Some(receipt) => receipt, + None => { + tracing::error!(?indexing, "failed to create receipt"); + continue; + } + }; + debug_assert!(fee == receipt.grt_value()); + + let blocks_behind = (candidate.seconds_behind as f64 / 60.0) * blocks_per_minute as f64; + selections.push(Selection { + indexing, + url: candidate.url.clone(), + receipt, + blocks_behind: blocks_behind as u64, + }); + } + if selections.is_empty() { + // Candidates that would never be selected should be filtered out for improved errors. + tracing::error!("no candidates selected"); + return Err(Error::BadIndexers(indexer_errors)); + } - let (outcome_tx, mut outcome_rx) = mpsc::channel(SELECTION_LIMIT); - for selection in selections { - let deployment = deployments - .iter() - .find(|deployment| deployment.id == selection.indexing.deployment) - .unwrap() - .clone(); - let indexer_query_context = IndexerQueryContext { - indexer_client: ctx.indexer_client.clone(), - kafka_client: ctx.kafka_client, - chain: chain.clone(), - attestation_domain: ctx.attestation_domain, - indexing_perf: ctx.indexing_perf.clone(), - deployment, - response_time: Duration::default(), - }; + let mut total_indexer_fees_grt: u128 = 0; + let (outcome_tx, mut outcome_rx) = mpsc::channel(SELECTION_LIMIT); + for selection in selections { + let deployment = deployments + .iter() + .find(|deployment| deployment.id == selection.indexing.deployment) + .unwrap() + .clone(); + let indexer_query_context = IndexerQueryContext { + indexer_client: ctx.indexer_client.clone(), + kafka_client: ctx.kafka_client, + chain: chain.clone(), + attestation_domain: ctx.attestation_domain, + indexing_perf: ctx.indexing_perf.clone(), + deployment, + response_time: Duration::default(), + }; - // The Agora context must be cloned to preserve the state of the original client query. - // This to avoid the following scenario: - // 1. A client query has no block requirements set for some top-level operation - // 2. The first indexer is selected, with some indexing status at block number `n` - // 3. The query is made deterministic by setting the block requirement to the hash of - // block `n` - // 4. Some condition requires us to retry this query on another indexer with an indexing - // status at a block less than `n` - // 5. The same context is re-used, including the block requirement set to the hash of - // block `n` - // 6. The indexer is seen as being behind and is unnecessarily penalized - let indexer_request = { - let chain = chain.read().await; - rewrite_query( - &chain, - context.clone(), - &block_requirements, - selection.blocks_behind, - )? - }; + // The Agora context must be cloned to preserve the state of the original client query. + // This to avoid the following scenario: + // 1. A client query has no block requirements set for some top-level operation + // 2. The first indexer is selected, with some indexing status at block number `n` + // 3. The query is made deterministic by setting the block requirement to the hash of + // block `n` + // 4. Some condition requires us to retry this query on another indexer with an indexing + // status at a block less than `n` + // 5. The same context is re-used, including the block requirement set to the hash of + // block `n` + // 6. The indexer is seen as being behind and is unnecessarily penalized + let indexer_request = { + let chain = chain.read().await; + rewrite_query( + &chain, + context.clone(), + &block_requirements, + selection.blocks_behind, + )? + }; - total_indexer_fees_grt += selection.receipt.grt_value(); - - let indexer_query_context = indexer_query_context.clone(); - let outcome_tx = outcome_tx.clone(); - // We must manually construct this span before the spawned task, since otherwise - // there's a race between creating this span and another indexer responding which will - // close the outer client_query span. - let span = tracing::info_span!( - target: INDEXER_REQUEST_TARGET, - "indexer_request", - indexer = ?selection.indexing.indexer, - ); - let receipt_signer = ctx.receipt_signer; - tokio::spawn( - async move { - let response = - handle_indexer_query(indexer_query_context, &selection, indexer_request) - .await; - let receipt_status = match &response { - Ok(_) => ReceiptStatus::Success, - Err(IndexerError::Timeout) => ReceiptStatus::Unknown, - Err(_) => ReceiptStatus::Failure, - }; - receipt_signer - .record_receipt(&selection.indexing, &selection.receipt, receipt_status) - .await; - - let _ = outcome_tx.send((selection, response)).await; - } - .instrument(span), - ); - } - // This must be dropped to ensure the `outcome_rx.recv()` loop below can eventyually stop. - drop(outcome_tx); + total_indexer_fees_grt += selection.receipt.grt_value(); + + let indexer_query_context = indexer_query_context.clone(); + let outcome_tx = outcome_tx.clone(); + // We must manually construct this span before the spawned task, since otherwise + // there's a race between creating this span and another indexer responding which will + // close the outer client_query span. + let span = tracing::info_span!( + target: INDEXER_REQUEST_TARGET, + "indexer_request", + indexer = ?selection.indexing.indexer, + ); + let receipt_signer = ctx.receipt_signer; + tokio::spawn( + async move { + let response = + handle_indexer_query(indexer_query_context, &selection, indexer_request).await; + let receipt_status = match &response { + Ok(_) => ReceiptStatus::Success, + Err(IndexerError::Timeout) => ReceiptStatus::Unknown, + Err(_) => ReceiptStatus::Failure, + }; + receipt_signer + .record_receipt(&selection.indexing, &selection.receipt, receipt_status) + .await; - let total_indexer_fees_usd = - USD(NotNan::new(total_indexer_fees_grt as f64 * 1e-18).unwrap() / grt_per_usd); - tracing::info!( - target: CLIENT_REQUEST_TARGET, - indexer_fees_grt = (total_indexer_fees_grt as f64 * 1e-18) as f32, - indexer_fees_usd = *total_indexer_fees_usd.0 as f32, + let _ = outcome_tx.send((selection, response)).await; + } + .instrument(span), ); + } + // This must be dropped to ensure the `outcome_rx.recv()` loop below can eventyually stop. + drop(outcome_tx); - while let Some((selection, result)) = outcome_rx.recv().await { - match result { - Err(err) => { - indexer_errors.insert(selection.indexing.indexer, err); - } - Ok(outcome) => { - let _ = ctx.budgeter.feedback.send(total_indexer_fees_usd); + let total_indexer_fees_usd = + USD(NotNan::new(total_indexer_fees_grt as f64 * 1e-18).unwrap() / grt_per_usd); + tracing::info!( + target: CLIENT_REQUEST_TARGET, + indexer_fees_grt = (total_indexer_fees_grt as f64 * 1e-18) as f32, + indexer_fees_usd = *total_indexer_fees_usd.0 as f32, + ); - tracing::debug!(?indexer_errors); - return Ok((selection, outcome)); - } - }; - } + while let Some((selection, result)) = outcome_rx.recv().await { + match result { + Err(err) => { + indexer_errors.insert(selection.indexing.indexer, err); + } + Ok(outcome) => { + let _ = ctx.budgeter.feedback.send(total_indexer_fees_usd); + + tracing::debug!(?indexer_errors); + return Ok((selection, outcome)); + } + }; } Err(Error::BadIndexers(indexer_errors)) diff --git a/graph-gateway/src/client_query/context.rs b/graph-gateway/src/client_query/context.rs index 30ee7746..ea3565dc 100644 --- a/graph-gateway/src/client_query/context.rs +++ b/graph-gateway/src/client_query/context.rs @@ -24,7 +24,6 @@ pub struct Context { pub receipt_signer: &'static ReceiptSigner, pub kafka_client: &'static KafkaClient, pub budgeter: &'static Budgeter, - pub indexer_selection_retry_limit: usize, pub l2_gateway: Option, pub grt_per_usd: watch::Receiver>, pub chains: &'static Chains, diff --git a/graph-gateway/src/config.rs b/graph-gateway/src/config.rs index 6adc051e..c4873038 100644 --- a/graph-gateway/src/config.rs +++ b/graph-gateway/src/config.rs @@ -43,10 +43,6 @@ pub struct Config { pub gateway_id: Option, /// Graph network environment identifier, inserted into Kafka messages pub graph_env_id: String, - /// Rounds of indexer selection and queries to attempt. Note that indexer queries have a 20s - /// timeout, so setting this to 5 for example would result in a 100s worst case response time - /// for a client query. - pub indexer_selection_retry_limit: usize, /// File path of CSV containing rows of `IpNetwork,Country` pub ip_blocker_db: Option, /// IP rate limit in requests per second diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index a03cb4c4..a916fc3f 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -232,7 +232,6 @@ async fn main() { receipt_signer, kafka_client, budgeter, - indexer_selection_retry_limit: config.indexer_selection_retry_limit, l2_gateway: config.l2_gateway, chains: Box::leak(Box::new(Chains::new(config.chain_aliases))), grt_per_usd,