Skip to content

Commit

Permalink
fix: remove retry loop (#744)
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus authored May 17, 2024
1 parent 6365890 commit 3862da7
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 153 deletions.
271 changes: 124 additions & 147 deletions graph-gateway/src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,161 +373,138 @@ async fn handle_client_query_inner(
return Err(Error::BadIndexers(indexer_errors));
}

let mut total_indexer_fees_grt: u128 = 0;
for retry in 0..ctx.indexer_selection_retry_limit {
// Make sure our observations are up-to-date if retrying.
if retry > 0 {
ctx.indexing_perf.flush().await;

// Update candidate performance.
let perf_snapshots = ctx.indexing_perf.latest();
for candidate in &mut candidates {
let indexing = Indexing {
indexer: candidate.indexer,
deployment: candidate.deployment,
};
if let Some(updated) = perf_snapshots.get(&indexing).and_then(|snapshot| {
perf(snapshot, &block_requirements, chain_head, blocks_per_minute)
}) {
candidate.perf = updated.response;
candidate.seconds_behind = updated.seconds_behind;
}
}
}

let selected_candidates: ArrayVec<&Candidate, SELECTION_LIMIT> =
indexer_selection::select(&candidates);
let selections_len = selected_candidates.len();
let mut selections: Vec<Selection> = Default::default();
for candidate in selected_candidates {
let indexing = Indexing {
indexer: candidate.indexer,
deployment: candidate.deployment,
};

// over-pay indexers to hit target
let min_fee = *ctx.budgeter.min_indexer_fees.borrow();
let min_fee = *(min_fee.0 * grt_per_usd * one_grt) / selections_len as f64;
let indexer_fee = candidate.fee.as_f64() * budget as f64;
let fee = indexer_fee.max(min_fee) as u128;

let receipt = match ctx.receipt_signer.create_receipt(&indexing, fee).await {
Some(receipt) => receipt,
None => {
tracing::error!(?indexing, "failed to create receipt");
continue;
}
};
debug_assert!(fee == receipt.grt_value());
let selected_candidates: ArrayVec<&Candidate, SELECTION_LIMIT> =
indexer_selection::select(&candidates);
let selections_len = selected_candidates.len();
let mut selections: Vec<Selection> = Default::default();
for candidate in selected_candidates {
let indexing = Indexing {
indexer: candidate.indexer,
deployment: candidate.deployment,
};

let blocks_behind = (candidate.seconds_behind as f64 / 60.0) * blocks_per_minute as f64;
selections.push(Selection {
indexing,
url: candidate.url.clone(),
receipt,
blocks_behind: blocks_behind as u64,
});
}
if selections.is_empty() {
// Candidates that would never be selected should be filtered out for improved errors.
tracing::error!("no candidates selected");
continue;
}
// over-pay indexers to hit target
let min_fee = *ctx.budgeter.min_indexer_fees.borrow();
let min_fee = *(min_fee.0 * grt_per_usd * one_grt) / selections_len as f64;
let indexer_fee = candidate.fee.as_f64() * budget as f64;
let fee = indexer_fee.max(min_fee) as u128;

let receipt = match ctx.receipt_signer.create_receipt(&indexing, fee).await {
Some(receipt) => receipt,
None => {
tracing::error!(?indexing, "failed to create receipt");
continue;
}
};
debug_assert!(fee == receipt.grt_value());

let blocks_behind = (candidate.seconds_behind as f64 / 60.0) * blocks_per_minute as f64;
selections.push(Selection {
indexing,
url: candidate.url.clone(),
receipt,
blocks_behind: blocks_behind as u64,
});
}
if selections.is_empty() {
// Candidates that would never be selected should be filtered out for improved errors.
tracing::error!("no candidates selected");
return Err(Error::BadIndexers(indexer_errors));
}

let (outcome_tx, mut outcome_rx) = mpsc::channel(SELECTION_LIMIT);
for selection in selections {
let deployment = deployments
.iter()
.find(|deployment| deployment.id == selection.indexing.deployment)
.unwrap()
.clone();
let indexer_query_context = IndexerQueryContext {
indexer_client: ctx.indexer_client.clone(),
kafka_client: ctx.kafka_client,
chain: chain.clone(),
attestation_domain: ctx.attestation_domain,
indexing_perf: ctx.indexing_perf.clone(),
deployment,
response_time: Duration::default(),
};
let mut total_indexer_fees_grt: u128 = 0;
let (outcome_tx, mut outcome_rx) = mpsc::channel(SELECTION_LIMIT);
for selection in selections {
let deployment = deployments
.iter()
.find(|deployment| deployment.id == selection.indexing.deployment)
.unwrap()
.clone();
let indexer_query_context = IndexerQueryContext {
indexer_client: ctx.indexer_client.clone(),
kafka_client: ctx.kafka_client,
chain: chain.clone(),
attestation_domain: ctx.attestation_domain,
indexing_perf: ctx.indexing_perf.clone(),
deployment,
response_time: Duration::default(),
};

// The Agora context must be cloned to preserve the state of the original client query.
// This to avoid the following scenario:
// 1. A client query has no block requirements set for some top-level operation
// 2. The first indexer is selected, with some indexing status at block number `n`
// 3. The query is made deterministic by setting the block requirement to the hash of
// block `n`
// 4. Some condition requires us to retry this query on another indexer with an indexing
// status at a block less than `n`
// 5. The same context is re-used, including the block requirement set to the hash of
// block `n`
// 6. The indexer is seen as being behind and is unnecessarily penalized
let indexer_request = {
let chain = chain.read().await;
rewrite_query(
&chain,
context.clone(),
&block_requirements,
selection.blocks_behind,
)?
};
// The Agora context must be cloned to preserve the state of the original client query.
// This to avoid the following scenario:
// 1. A client query has no block requirements set for some top-level operation
// 2. The first indexer is selected, with some indexing status at block number `n`
// 3. The query is made deterministic by setting the block requirement to the hash of
// block `n`
// 4. Some condition requires us to retry this query on another indexer with an indexing
// status at a block less than `n`
// 5. The same context is re-used, including the block requirement set to the hash of
// block `n`
// 6. The indexer is seen as being behind and is unnecessarily penalized
let indexer_request = {
let chain = chain.read().await;
rewrite_query(
&chain,
context.clone(),
&block_requirements,
selection.blocks_behind,
)?
};

total_indexer_fees_grt += selection.receipt.grt_value();

let indexer_query_context = indexer_query_context.clone();
let outcome_tx = outcome_tx.clone();
// We must manually construct this span before the spawned task, since otherwise
// there's a race between creating this span and another indexer responding which will
// close the outer client_query span.
let span = tracing::info_span!(
target: INDEXER_REQUEST_TARGET,
"indexer_request",
indexer = ?selection.indexing.indexer,
);
let receipt_signer = ctx.receipt_signer;
tokio::spawn(
async move {
let response =
handle_indexer_query(indexer_query_context, &selection, indexer_request)
.await;
let receipt_status = match &response {
Ok(_) => ReceiptStatus::Success,
Err(IndexerError::Timeout) => ReceiptStatus::Unknown,
Err(_) => ReceiptStatus::Failure,
};
receipt_signer
.record_receipt(&selection.indexing, &selection.receipt, receipt_status)
.await;

let _ = outcome_tx.send((selection, response)).await;
}
.instrument(span),
);
}
// This must be dropped to ensure the `outcome_rx.recv()` loop below can eventyually stop.
drop(outcome_tx);
total_indexer_fees_grt += selection.receipt.grt_value();

let indexer_query_context = indexer_query_context.clone();
let outcome_tx = outcome_tx.clone();
// We must manually construct this span before the spawned task, since otherwise
// there's a race between creating this span and another indexer responding which will
// close the outer client_query span.
let span = tracing::info_span!(
target: INDEXER_REQUEST_TARGET,
"indexer_request",
indexer = ?selection.indexing.indexer,
);
let receipt_signer = ctx.receipt_signer;
tokio::spawn(
async move {
let response =
handle_indexer_query(indexer_query_context, &selection, indexer_request).await;
let receipt_status = match &response {
Ok(_) => ReceiptStatus::Success,
Err(IndexerError::Timeout) => ReceiptStatus::Unknown,
Err(_) => ReceiptStatus::Failure,
};
receipt_signer
.record_receipt(&selection.indexing, &selection.receipt, receipt_status)
.await;

let total_indexer_fees_usd =
USD(NotNan::new(total_indexer_fees_grt as f64 * 1e-18).unwrap() / grt_per_usd);
tracing::info!(
target: CLIENT_REQUEST_TARGET,
indexer_fees_grt = (total_indexer_fees_grt as f64 * 1e-18) as f32,
indexer_fees_usd = *total_indexer_fees_usd.0 as f32,
let _ = outcome_tx.send((selection, response)).await;
}
.instrument(span),
);
}
// This must be dropped to ensure the `outcome_rx.recv()` loop below can eventyually stop.
drop(outcome_tx);

while let Some((selection, result)) = outcome_rx.recv().await {
match result {
Err(err) => {
indexer_errors.insert(selection.indexing.indexer, err);
}
Ok(outcome) => {
let _ = ctx.budgeter.feedback.send(total_indexer_fees_usd);
let total_indexer_fees_usd =
USD(NotNan::new(total_indexer_fees_grt as f64 * 1e-18).unwrap() / grt_per_usd);
tracing::info!(
target: CLIENT_REQUEST_TARGET,
indexer_fees_grt = (total_indexer_fees_grt as f64 * 1e-18) as f32,
indexer_fees_usd = *total_indexer_fees_usd.0 as f32,
);

tracing::debug!(?indexer_errors);
return Ok((selection, outcome));
}
};
}
while let Some((selection, result)) = outcome_rx.recv().await {
match result {
Err(err) => {
indexer_errors.insert(selection.indexing.indexer, err);
}
Ok(outcome) => {
let _ = ctx.budgeter.feedback.send(total_indexer_fees_usd);

tracing::debug!(?indexer_errors);
return Ok((selection, outcome));
}
};
}

Err(Error::BadIndexers(indexer_errors))
Expand Down
1 change: 0 additions & 1 deletion graph-gateway/src/client_query/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pub struct Context {
pub receipt_signer: &'static ReceiptSigner,
pub kafka_client: &'static KafkaClient,
pub budgeter: &'static Budgeter,
pub indexer_selection_retry_limit: usize,
pub l2_gateway: Option<Url>,
pub grt_per_usd: watch::Receiver<NotNan<f64>>,
pub chains: &'static Chains,
Expand Down
4 changes: 0 additions & 4 deletions graph-gateway/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ pub struct Config {
pub gateway_id: Option<String>,
/// Graph network environment identifier, inserted into Kafka messages
pub graph_env_id: String,
/// Rounds of indexer selection and queries to attempt. Note that indexer queries have a 20s
/// timeout, so setting this to 5 for example would result in a 100s worst case response time
/// for a client query.
pub indexer_selection_retry_limit: usize,
/// File path of CSV containing rows of `IpNetwork,Country`
pub ip_blocker_db: Option<PathBuf>,
/// IP rate limit in requests per second
Expand Down
1 change: 0 additions & 1 deletion graph-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ async fn main() {
receipt_signer,
kafka_client,
budgeter,
indexer_selection_retry_limit: config.indexer_selection_retry_limit,
l2_gateway: config.l2_gateway,
chains: Box::leak(Box::new(Chains::new(config.chain_aliases))),
grt_per_usd,
Expand Down

0 comments on commit 3862da7

Please sign in to comment.