Skip to content

Commit

Permalink
refactor: merge host resolver and blocklist
Browse files Browse the repository at this point in the history
  • Loading branch information
Theodus committed Dec 6, 2024
1 parent 83d9d43 commit 1504214
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 103 deletions.
2 changes: 1 addition & 1 deletion src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use service::{NetworkService, ResolvedSubgraphInfo};

mod config;
mod errors;
pub mod indexer_host_resolver;
pub mod host_filter;
pub mod indexer_indexing_cost_model_resolver;
pub mod indexer_indexing_poi_blocklist;
pub mod indexer_indexing_poi_resolver;
Expand Down
72 changes: 72 additions & 0 deletions src/network/host_filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::{
collections::{HashMap, HashSet},
net::IpAddr,
time::Duration,
};

use hickory_resolver::TokioAsyncResolver as DnsResolver;
use ipnetwork::IpNetwork;
use parking_lot::RwLock;
use url::{Host, Url};

use crate::errors::UnavailableReason;

pub struct HostFilter {
blocklist: HashSet<IpNetwork>,
resolver: DnsResolver,
cache: RwLock<HashMap<String, Vec<IpAddr>>>,
}

impl HostFilter {
pub fn new(blocklist: HashSet<IpNetwork>) -> anyhow::Result<Self> {
Ok(Self {
blocklist,
resolver: DnsResolver::tokio_from_system_conf()?,
cache: Default::default(),
})
}

pub async fn check(&self, url: &Url) -> Result<(), UnavailableReason> {
if self.blocklist.is_empty() {
return Ok(());
}

let host_str = url.host_str().ok_or_else(UnavailableReason::invalid_url)?;
let cached_lookup = {
let cache = self.cache.read();
cache.get(host_str).cloned()
};
let lookup = match cached_lookup {
Some(lookup) => lookup,
None => {
let host = url.host().ok_or_else(UnavailableReason::invalid_url)?;
let lookup = match host {
Host::Ipv4(ip) => vec![IpAddr::V4(ip)],
Host::Ipv6(ip) => vec![IpAddr::V6(ip)],
Host::Domain(host) => self.resolve_host(host).await.map_err(|_| {
UnavailableReason::NoStatus("unable to resolve host".to_string())
})?,
};
self.cache
.write()
.insert(host_str.to_string(), lookup.clone());
lookup
}
};

if lookup
.into_iter()
.any(|ip| self.blocklist.iter().any(|net| net.contains(ip)))
{
return Err(UnavailableReason::Blocked("bad host".to_string()));
}

Ok(())
}

async fn resolve_host(&self, host: &str) -> anyhow::Result<Vec<IpAddr>> {
let lookup =
tokio::time::timeout(Duration::from_secs(5), self.resolver.lookup_ip(host)).await??;
Ok(lookup.into_iter().collect())
}
}
56 changes: 0 additions & 56 deletions src/network/indexer_host_resolver.rs

This file was deleted.

40 changes: 3 additions & 37 deletions src/network/internal/indexer_processing.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::{HashMap, HashSet};

use custom_debug::CustomDebug;
use ipnetwork::IpNetwork;
use semver::Version;
use thegraph_core::{alloy::primitives::BlockNumber, AllocationId, DeploymentId, IndexerId};
use tracing::Instrument;
Expand All @@ -12,8 +11,8 @@ use crate::{
config::BlockedIndexer,
errors::UnavailableReason,
network::{
config::VersionRequirements, indexer_host_resolver::HostResolver,
indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver,
config::VersionRequirements, indexer_indexing_poi_blocklist::PoiBlocklist,
indexer_indexing_poi_resolver::PoiResolver,
indexer_indexing_progress_resolver::IndexingProgressResolver,
indexer_version_resolver::VersionResolver,
},
Expand Down Expand Up @@ -164,17 +163,7 @@ pub(super) async fn process_info(
tracing::trace!(parent: &indexer_span, "processing");

async move {
// Check if the indexer's host is in the host blocklist
//
// If the indexer host cannot be resolved or is in the blocklist, the indexer must
// be marked as unhealthy
if let Err(err) = resolve_and_check_indexer_blocked_by_host_blocklist(
&state.indexer_host_resolver,
&state.indexer_host_blocklist,
&indexer.url,
)
.await
{
if let Err(err) = state.indexer_host_filter.check(&indexer.url).await {
tracing::debug!(%err);
return (*indexer_id, Err(err));
}
Expand Down Expand Up @@ -241,29 +230,6 @@ pub(super) async fn process_info(
FromIterator::from_iter(processed_info)
}

/// Resolve and check if the indexer's host is in the host blocklist.
///
/// - If the indexer's host is not resolvable: the indexer is BLOCKED.
/// - If the host blocklist was not configured: the indexer is ALLOWED.
/// - If the indexer's host is in the blocklist: the indexer is BLOCKED.
async fn resolve_and_check_indexer_blocked_by_host_blocklist(
resolver: &HostResolver,
blocklist: &HashSet<IpNetwork>,
url: &Url,
) -> Result<(), UnavailableReason> {
// Resolve the indexer's URL, if it fails (or times out), the indexer must be BLOCKED
let addrs = resolver.resolve_url(url).await?;

if addrs
.iter()
.any(|addr| blocklist.iter().any(|net| net.contains(*addr)))
{
return Err(UnavailableReason::Blocked("bad host".to_string()));
}

Ok(())
}

/// Resolve and check if the indexer's reported versions are supported.
async fn resolve_and_check_indexer_blocked_by_version(
resolver: &VersionResolver,
Expand Down
9 changes: 3 additions & 6 deletions src/network/internal/state.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::collections::{BTreeMap, HashSet};
use std::collections::BTreeMap;

use ipnetwork::IpNetwork;
use thegraph_core::alloy::primitives::Address;

use crate::{
config::BlockedIndexer,
network::{
config::VersionRequirements as IndexerVersionRequirements,
indexer_host_resolver::HostResolver,
config::VersionRequirements as IndexerVersionRequirements, host_filter::HostFilter,
indexer_indexing_cost_model_resolver::CostModelResolver,
indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver,
indexer_indexing_progress_resolver::IndexingProgressResolver,
Expand All @@ -17,8 +15,7 @@ use crate::{

pub struct InternalState {
pub indexer_blocklist: BTreeMap<Address, BlockedIndexer>,
pub indexer_host_resolver: HostResolver,
pub indexer_host_blocklist: HashSet<IpNetwork>,
pub indexer_host_filter: HostFilter,
pub indexer_version_requirements: IndexerVersionRequirements,
pub indexer_version_resolver: VersionResolver,
pub poi_blocklist: PoiBlocklist,
Expand Down
5 changes: 2 additions & 3 deletions src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::{sync::watch, time::MissedTickBehavior};
use super::{
config::VersionRequirements,
errors::{DeploymentError, SubgraphError},
indexer_host_resolver::HostResolver,
host_filter::HostFilter,
indexer_indexing_cost_model_resolver::CostModelResolver,
indexer_indexing_poi_blocklist::PoiBlocklist,
indexer_indexing_poi_resolver::PoiResolver,
Expand Down Expand Up @@ -174,9 +174,8 @@ pub fn spawn(
) -> NetworkService {
let internal_state = InternalState {
indexer_blocklist,
indexer_host_resolver: HostResolver::new(Duration::from_secs(5))
indexer_host_filter: HostFilter::new(indexer_host_blocklist)
.expect("failed to create host resolver"),
indexer_host_blocklist,
indexer_version_requirements: VersionRequirements {
min_indexer_service_version,
min_graph_node_version,
Expand Down

0 comments on commit 1504214

Please sign in to comment.