Skip to content

Commit

Permalink
refactor: remove error indirections
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Dec 6, 2024
1 parent f3943eb commit 83d9d43
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 332 deletions.
35 changes: 4 additions & 31 deletions src/client_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
indexing_performance,
metrics::{with_metric, METRICS},
middleware::RequestId,
network::{self, DeploymentError, Indexing, IndexingId, ResolvedSubgraphInfo, SubgraphError},
network::{DeploymentError, Indexing, IndexingId, ResolvedSubgraphInfo, SubgraphError},
reports,
};

Expand Down Expand Up @@ -464,7 +464,7 @@ fn build_candidates_list(
blocks_per_minute: u64,
block_requirements: &BlockRequirements,
subgraph_versions: &[DeploymentId],
indexings: HashMap<IndexingId, Result<Indexing, network::ResolutionError>>,
indexings: HashMap<IndexingId, Result<Indexing, UnavailableReason>>,
) -> (
Vec<Candidate<IndexerId, CandidateMetadata>>,
BTreeMap<IndexerId, IndexerError>,
Expand Down Expand Up @@ -492,7 +492,7 @@ fn build_candidates_list(
let indexing = match indexing {
Ok(indexing) => indexing,
Err(err) => {
candidates_errors.insert(indexing_id.indexer, err.into());
candidates_errors.insert(indexing_id.indexer, IndexerError::Unavailable(err));
continue;
}
};
Expand Down Expand Up @@ -630,33 +630,6 @@ fn blocks_behind(seconds_behind: u32, blocks_per_minute: u64) -> u64 {
((seconds_behind as f64 / 60.0) * blocks_per_minute as f64) as u64
}

impl From<network::ResolutionError> for IndexerError {
fn from(err: network::ResolutionError) -> Self {
match err {
network::ResolutionError::Unavailable(reason) => {
let reason = match reason {
network::UnavailableReason::Blocked(reason) => {
UnavailableReason::Blocked(reason)
}
reason @ network::UnavailableReason::IndexerServiceVersionBelowMin
| reason @ network::UnavailableReason::GraphNodeVersionBelowMin => {
UnavailableReason::NotSupported(reason.to_string())
}
reason @ network::UnavailableReason::IndexerResolutionError { .. }
| reason @ network::UnavailableReason::IndexingProgressNotFound => {
UnavailableReason::NoStatus(reason.to_string())
}
};
IndexerError::Unavailable(reason)
}
network::ResolutionError::Internal(err) => {
tracing::error!(error = ?err, "internal error");
IndexerError::Unavailable(UnavailableReason::Internal(err))
}
}
}
}

pub async fn handle_indexer_query(
State(ctx): State<Context>,
Extension(auth): Extension<AuthSettings>,
Expand All @@ -680,7 +653,7 @@ pub async fn handle_indexer_query(
.get(&indexing_id)
.ok_or_else(|| Error::NoIndexers)?
.as_ref()
.map_err(|err| bad_indexers(err.clone().into()))?;
.map_err(|err| bad_indexers(IndexerError::Unavailable(err.clone())))?;

let (latest_block, blocks_per_minute) = {
let chain = ctx.chains.chain(&subgraph.chain);
Expand Down
6 changes: 6 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ pub enum UnavailableReason {
Internal(&'static str),
}

impl UnavailableReason {
pub fn invalid_url() -> Self {
UnavailableReason::NoStatus("invalid URL".to_string())
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct MissingBlockError {
pub missing: Option<BlockNumber>,
Expand Down
2 changes: 1 addition & 1 deletion src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! provides information about the subgraphs (and subgraph deployments) registered in the network
//! smart contract, as well as the indexers that are indexing them.
pub use errors::{DeploymentError, ResolutionError, SubgraphError, UnavailableReason};
pub use errors::{DeploymentError, SubgraphError};
pub use internal::{Indexing, IndexingId};
pub use service::{NetworkService, ResolvedSubgraphInfo};

Expand Down
145 changes: 0 additions & 145 deletions src/network/errors.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
use semver::Version;

use crate::network::{
indexer_host_resolver::ResolutionError as HostResolutionError,
indexer_version_resolver::ResolutionError as VersionResolutionError,
};

/// Subgraph validation error.
#[derive(Clone, Debug, thiserror::Error)]
pub enum SubgraphError {
/// No allocations were found for the subgraph.
Expand All @@ -17,146 +9,9 @@ pub enum SubgraphError {
NoValidVersions,
}

/// Deployment validation error
#[derive(Clone, Debug, thiserror::Error)]
pub enum DeploymentError {
/// No allocations were found for the subgraph.
#[error("no allocations")]
NoAllocations,
}

#[derive(Debug, Clone, thiserror::Error)]
pub enum ResolutionError {
/// The indexing is unavailable.
#[error(transparent)]
Unavailable(UnavailableReason),

/// Errors that should only occur in exceptional conditions.
#[error("internal error: {0}")]
Internal(&'static str),
}

#[derive(Debug, Clone, thiserror::Error)]
pub enum UnavailableReason {
#[error("blocked ({0})")]
Blocked(String),
#[error("{0}")]
IndexerResolutionError(&'static str),
#[error("indexer service version below the minimum required")]
IndexerServiceVersionBelowMin,
#[error("graph node version below the minimum required")]
GraphNodeVersionBelowMin,
#[error("indexing progress not found")]
IndexingProgressNotFound,
}

impl From<IndexingError> for ResolutionError {
fn from(error: IndexingError) -> Self {
match error {
IndexingError::Indexer(err) => {
let reason = match err {
IndexerInfoResolutionError::BlockedHost => {
UnavailableReason::Blocked("host".to_string())
}
IndexerInfoResolutionError::HostResolutionFailed(err) => {
tracing::debug!(error=?err, "host resolution failed");

let reason = match err {
HostResolutionError::InvalidUrl(_) => "invalid indexer URL",
HostResolutionError::Dns(_) => "indexer URL DNS resolution failed",
HostResolutionError::Timeout => {
"indexer URL DNS resolution failed (timeout)"
}
};
UnavailableReason::IndexerResolutionError(reason)
}
IndexerInfoResolutionError::IndexerServiceVersionResolutionFailed(err) => {
let reason = match err {
VersionResolutionError::FetchError(_) => {
"indexer service version resolution failed"
}
VersionResolutionError::Timeout => {
"indexer service version resolution failed (timeout)"
}
};
UnavailableReason::IndexerResolutionError(reason)
}
IndexerInfoResolutionError::IndexerServiceVersionBelowMin(..) => {
UnavailableReason::IndexerServiceVersionBelowMin
}
IndexerInfoResolutionError::GraphNodeVersionResolutionFailed(err) => {
tracing::debug!(error=?err, "graph node version resolution failed");

let reason = match err {
VersionResolutionError::FetchError(_) => {
"graph node version resolution failed"
}
VersionResolutionError::Timeout => {
"graph node version resolution failed (timeout)"
}
};
UnavailableReason::IndexerResolutionError(reason)
}
IndexerInfoResolutionError::GraphNodeVersionBelowMin(..) => {
UnavailableReason::GraphNodeVersionBelowMin
}
};
ResolutionError::Unavailable(reason)
}
IndexingError::Indexing(err) => {
let reason = match err {
IndexingInfoResolutionError::Blocked(reason) => {
UnavailableReason::Blocked(reason)
}
IndexingInfoResolutionError::IndexingProgressNotFound => {
UnavailableReason::IndexingProgressNotFound
}
};
ResolutionError::Unavailable(reason)
}
IndexingError::Internal(reason) => ResolutionError::Internal(reason),
}
}
}

/// Indexing error.
#[derive(Clone, Debug, thiserror::Error)]
pub enum IndexingError {
#[error(transparent)]
Indexer(#[from] IndexerInfoResolutionError),

#[error(transparent)]
Indexing(#[from] IndexingInfoResolutionError),

#[error("internal error: {0}")]
Internal(&'static str),
}

/// Errors when processing the indexer information.
#[derive(Clone, Debug, thiserror::Error)]
pub enum IndexerInfoResolutionError {
#[error("indexer host blocked")]
BlockedHost,
#[error("indexer host resolution failed: {0}")]
HostResolutionFailed(#[from] HostResolutionError),
#[error("indexer service version resolution failed: {0}")]
IndexerServiceVersionResolutionFailed(VersionResolutionError),
#[error("service version {0} below the minimum required {1}")]
IndexerServiceVersionBelowMin(Version, Version),
#[error("graph node version resolution failed: {0}")]
#[allow(dead_code)] // TODO: Remove once the graph node version requirement is enforced
GraphNodeVersionResolutionFailed(VersionResolutionError),
#[error("graph node version {0} below the minimum required {1}")]
GraphNodeVersionBelowMin(Version, Version),
}

/// Error when processing the indexer's indexing information.
#[derive(Clone, Debug, thiserror::Error)]
pub enum IndexingInfoResolutionError {
/// The indexing has been blocked by the public POIs blocklist.
#[error("indexing blocked: {0}")]
Blocked(String),
/// The indexing progress information was not found.
#[error("indexing progress information not found")]
IndexingProgressNotFound,
}
111 changes: 24 additions & 87 deletions src/network/indexer_host_resolver.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,14 @@
//! Resolves the IP address of a URL host.
//!
//! This module provides a resolver for URL hosts. The resolver caches the results of host
//! resolution to avoid repeated DNS lookups.
use std::{borrow::Borrow, collections::HashMap, net::IpAddr, time::Duration};
use std::{collections::HashMap, net::IpAddr, time::Duration};

use hickory_resolver::{error::ResolveError, TokioAsyncResolver as DnsResolver};
use hickory_resolver::TokioAsyncResolver as DnsResolver;
use parking_lot::RwLock;
use url::{Host, Url};

/// Error that can occur during URL host resolution.
#[derive(Clone, Debug, thiserror::Error)]
pub enum ResolutionError {
/// The URL is invalid.
///
/// For example, the URL does not contain a host.
#[error("invalid URL: {0}")]
InvalidUrl(String),
use crate::errors::UnavailableReason;

/// Failed to resolve the host.
///
/// This error occurs when the host could not be resolved to an IP address.
///
/// This is a wrapper around [`ResolveError`].
#[error("dns resolution error: {0}")]
Dns(#[from] ResolveError),

/// Resolution timed out.
#[error("timeout")]
Timeout,
}

impl ResolutionError {
/// Create a new [`ResolutionError::InvalidUrl`] error from an invalid URL error.
pub fn invalid_url<E: ToString>(reason: E) -> Self {
Self::InvalidUrl(reason.to_string())
}
}

/// A resolver for URL hosts.
///
/// This resolver caches the results of host resolution to avoid repeated DNS lookups.
pub struct HostResolver {
inner: DnsResolver,
cache: RwLock<HashMap<String, Result<Vec<IpAddr>, ResolutionError>>>,
cache: RwLock<HashMap<String, Result<Vec<IpAddr>, UnavailableReason>>>,
timeout: Duration,
}

Expand All @@ -55,64 +21,35 @@ impl HostResolver {
})
}

/// Resolve the IP address of the given domain with a timeout.
async fn resolve_domain(&self, domain: &str) -> Result<Vec<IpAddr>, ResolutionError> {
tokio::time::timeout(self.timeout, self.inner.lookup_ip(domain))
.await
.map_err(|_| ResolutionError::Timeout)?
.map_err(Into::into)
.map(FromIterator::from_iter)
async fn resolve_domain(&self, domain: &str) -> anyhow::Result<Vec<IpAddr>> {
let lookup = tokio::time::timeout(self.timeout, self.inner.lookup_ip(domain)).await??;
Ok(lookup.into_iter().collect())
}

/// Gets the cached DNS resolution result for the given host.
///
/// This method locks the cache in read mode and returns the cached information.
fn get_from_cache(&self, host: &str) -> Option<Result<Vec<IpAddr>, ResolutionError>> {
let cache_read = self.cache.read();
cache_read.get(host).cloned()
}

/// Updates the cache with the given DNS resolution result.
///
/// This method locks the cache in write mode and updates the cache with the given progress
/// information.
fn update_cache(&self, host: &str, res: Result<Vec<IpAddr>, ResolutionError>) {
let mut cache_write = self.cache.write();
cache_write.insert(host.to_owned(), res);
}

/// Resolve the IP address of the given URL.
///
/// The URL is resolved to an IP address. The result is cached so that subsequent calls with the
/// same URL will return the same result.
pub async fn resolve_url<U: Borrow<Url>>(
&self,
url: U,
) -> Result<Vec<IpAddr>, ResolutionError> {
let url = url.borrow();

// Check if the result is already cached, otherwise resolve the URLs' associated IP
// addresses
let host_str = url
.host_str()
.ok_or(ResolutionError::invalid_url("no host"))?;

match self.get_from_cache(host_str) {
Some(state) => state,
pub async fn resolve_url(&self, url: &Url) -> Result<Vec<IpAddr>, UnavailableReason> {
let host_str = url.host_str().ok_or_else(UnavailableReason::invalid_url)?;
let cached_response = {
let cache = self.cache.read();
cache.get(host_str).cloned()
};
match cached_response {
Some(result) => result,
None => {
// Resolve the URL IP addresses
let host = url.host().ok_or(ResolutionError::invalid_url("no host"))?;
let host = url.host().ok_or_else(UnavailableReason::invalid_url)?;

let resolution = match host {
let result = match host {
Host::Ipv4(ip) => Ok(vec![IpAddr::V4(ip)]),
Host::Ipv6(ip) => Ok(vec![IpAddr::V6(ip)]),
Host::Domain(domain) => self.resolve_domain(domain).await,
Host::Domain(domain) => self.resolve_domain(domain).await.map_err(|err| {
UnavailableReason::NoStatus(format!("DNS resolution error: {err}"))
}),
};

// Cache the result
self.update_cache(host_str, resolution.clone());
self.cache
.write()
.insert(host_str.to_string(), result.clone());

resolution
result
}
}
}
Expand Down
Loading

0 comments on commit 83d9d43

Please sign in to comment.