From 150421487025fe19dab3b3b456609ee86b8bb8f2 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Thu, 5 Dec 2024 22:39:04 -0500 Subject: [PATCH] refactor: merge host resolver and blocklist --- src/network.rs | 2 +- src/network/host_filter.rs | 72 ++++++++++++++++++++++ src/network/indexer_host_resolver.rs | 56 ----------------- src/network/internal/indexer_processing.rs | 40 +----------- src/network/internal/state.rs | 9 +-- src/network/service.rs | 5 +- 6 files changed, 81 insertions(+), 103 deletions(-) create mode 100644 src/network/host_filter.rs delete mode 100644 src/network/indexer_host_resolver.rs diff --git a/src/network.rs b/src/network.rs index f9e4afbe..e3280bb2 100644 --- a/src/network.rs +++ b/src/network.rs @@ -8,7 +8,7 @@ pub use service::{NetworkService, ResolvedSubgraphInfo}; mod config; mod errors; -pub mod indexer_host_resolver; +pub mod host_filter; pub mod indexer_indexing_cost_model_resolver; pub mod indexer_indexing_poi_blocklist; pub mod indexer_indexing_poi_resolver; diff --git a/src/network/host_filter.rs b/src/network/host_filter.rs new file mode 100644 index 00000000..88804073 --- /dev/null +++ b/src/network/host_filter.rs @@ -0,0 +1,72 @@ +use std::{ + collections::{HashMap, HashSet}, + net::IpAddr, + time::Duration, +}; + +use hickory_resolver::TokioAsyncResolver as DnsResolver; +use ipnetwork::IpNetwork; +use parking_lot::RwLock; +use url::{Host, Url}; + +use crate::errors::UnavailableReason; + +pub struct HostFilter { + blocklist: HashSet, + resolver: DnsResolver, + cache: RwLock>>, +} + +impl HostFilter { + pub fn new(blocklist: HashSet) -> anyhow::Result { + Ok(Self { + blocklist, + resolver: DnsResolver::tokio_from_system_conf()?, + cache: Default::default(), + }) + } + + pub async fn check(&self, url: &Url) -> Result<(), UnavailableReason> { + if self.blocklist.is_empty() { + return Ok(()); + } + + let host_str = url.host_str().ok_or_else(UnavailableReason::invalid_url)?; + let cached_lookup = { + let cache = self.cache.read(); + cache.get(host_str).cloned() + }; + let lookup = match cached_lookup { + Some(lookup) => lookup, + None => { + let host = url.host().ok_or_else(UnavailableReason::invalid_url)?; + let lookup = match host { + Host::Ipv4(ip) => vec![IpAddr::V4(ip)], + Host::Ipv6(ip) => vec![IpAddr::V6(ip)], + Host::Domain(host) => self.resolve_host(host).await.map_err(|_| { + UnavailableReason::NoStatus("unable to resolve host".to_string()) + })?, + }; + self.cache + .write() + .insert(host_str.to_string(), lookup.clone()); + lookup + } + }; + + if lookup + .into_iter() + .any(|ip| self.blocklist.iter().any(|net| net.contains(ip))) + { + return Err(UnavailableReason::Blocked("bad host".to_string())); + } + + Ok(()) + } + + async fn resolve_host(&self, host: &str) -> anyhow::Result> { + let lookup = + tokio::time::timeout(Duration::from_secs(5), self.resolver.lookup_ip(host)).await??; + Ok(lookup.into_iter().collect()) + } +} diff --git a/src/network/indexer_host_resolver.rs b/src/network/indexer_host_resolver.rs deleted file mode 100644 index eee0eed3..00000000 --- a/src/network/indexer_host_resolver.rs +++ /dev/null @@ -1,56 +0,0 @@ -use std::{collections::HashMap, net::IpAddr, time::Duration}; - -use hickory_resolver::TokioAsyncResolver as DnsResolver; -use parking_lot::RwLock; -use url::{Host, Url}; - -use crate::errors::UnavailableReason; - -pub struct HostResolver { - inner: DnsResolver, - cache: RwLock, UnavailableReason>>>, - timeout: Duration, -} - -impl HostResolver { - pub fn new(timeout: Duration) -> anyhow::Result { - Ok(Self { - inner: DnsResolver::tokio_from_system_conf()?, - cache: Default::default(), - timeout, - }) - } - - async fn resolve_domain(&self, domain: &str) -> anyhow::Result> { - let lookup = tokio::time::timeout(self.timeout, self.inner.lookup_ip(domain)).await??; - Ok(lookup.into_iter().collect()) - } - - pub async fn resolve_url(&self, url: &Url) -> Result, UnavailableReason> { - let host_str = url.host_str().ok_or_else(UnavailableReason::invalid_url)?; - let cached_response = { - let cache = self.cache.read(); - cache.get(host_str).cloned() - }; - match cached_response { - Some(result) => result, - None => { - let host = url.host().ok_or_else(UnavailableReason::invalid_url)?; - - let result = match host { - Host::Ipv4(ip) => Ok(vec![IpAddr::V4(ip)]), - Host::Ipv6(ip) => Ok(vec![IpAddr::V6(ip)]), - Host::Domain(domain) => self.resolve_domain(domain).await.map_err(|err| { - UnavailableReason::NoStatus(format!("DNS resolution error: {err}")) - }), - }; - - self.cache - .write() - .insert(host_str.to_string(), result.clone()); - - result - } - } - } -} diff --git a/src/network/internal/indexer_processing.rs b/src/network/internal/indexer_processing.rs index 9518ec3c..4206880d 100644 --- a/src/network/internal/indexer_processing.rs +++ b/src/network/internal/indexer_processing.rs @@ -1,7 +1,6 @@ use std::collections::{HashMap, HashSet}; use custom_debug::CustomDebug; -use ipnetwork::IpNetwork; use semver::Version; use thegraph_core::{alloy::primitives::BlockNumber, AllocationId, DeploymentId, IndexerId}; use tracing::Instrument; @@ -12,8 +11,8 @@ use crate::{ config::BlockedIndexer, errors::UnavailableReason, network::{ - config::VersionRequirements, indexer_host_resolver::HostResolver, - indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver, + config::VersionRequirements, indexer_indexing_poi_blocklist::PoiBlocklist, + indexer_indexing_poi_resolver::PoiResolver, indexer_indexing_progress_resolver::IndexingProgressResolver, indexer_version_resolver::VersionResolver, }, @@ -164,17 +163,7 @@ pub(super) async fn process_info( tracing::trace!(parent: &indexer_span, "processing"); async move { - // Check if the indexer's host is in the host blocklist - // - // If the indexer host cannot be resolved or is in the blocklist, the indexer must - // be marked as unhealthy - if let Err(err) = resolve_and_check_indexer_blocked_by_host_blocklist( - &state.indexer_host_resolver, - &state.indexer_host_blocklist, - &indexer.url, - ) - .await - { + if let Err(err) = state.indexer_host_filter.check(&indexer.url).await { tracing::debug!(%err); return (*indexer_id, Err(err)); } @@ -241,29 +230,6 @@ pub(super) async fn process_info( FromIterator::from_iter(processed_info) } -/// Resolve and check if the indexer's host is in the host blocklist. -/// -/// - If the indexer's host is not resolvable: the indexer is BLOCKED. -/// - If the host blocklist was not configured: the indexer is ALLOWED. -/// - If the indexer's host is in the blocklist: the indexer is BLOCKED. -async fn resolve_and_check_indexer_blocked_by_host_blocklist( - resolver: &HostResolver, - blocklist: &HashSet, - url: &Url, -) -> Result<(), UnavailableReason> { - // Resolve the indexer's URL, if it fails (or times out), the indexer must be BLOCKED - let addrs = resolver.resolve_url(url).await?; - - if addrs - .iter() - .any(|addr| blocklist.iter().any(|net| net.contains(*addr))) - { - return Err(UnavailableReason::Blocked("bad host".to_string())); - } - - Ok(()) -} - /// Resolve and check if the indexer's reported versions are supported. async fn resolve_and_check_indexer_blocked_by_version( resolver: &VersionResolver, diff --git a/src/network/internal/state.rs b/src/network/internal/state.rs index bd70737c..1d871833 100644 --- a/src/network/internal/state.rs +++ b/src/network/internal/state.rs @@ -1,13 +1,11 @@ -use std::collections::{BTreeMap, HashSet}; +use std::collections::BTreeMap; -use ipnetwork::IpNetwork; use thegraph_core::alloy::primitives::Address; use crate::{ config::BlockedIndexer, network::{ - config::VersionRequirements as IndexerVersionRequirements, - indexer_host_resolver::HostResolver, + config::VersionRequirements as IndexerVersionRequirements, host_filter::HostFilter, indexer_indexing_cost_model_resolver::CostModelResolver, indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver, indexer_indexing_progress_resolver::IndexingProgressResolver, @@ -17,8 +15,7 @@ use crate::{ pub struct InternalState { pub indexer_blocklist: BTreeMap, - pub indexer_host_resolver: HostResolver, - pub indexer_host_blocklist: HashSet, + pub indexer_host_filter: HostFilter, pub indexer_version_requirements: IndexerVersionRequirements, pub indexer_version_resolver: VersionResolver, pub poi_blocklist: PoiBlocklist, diff --git a/src/network/service.rs b/src/network/service.rs index dd92ba29..378e2dad 100644 --- a/src/network/service.rs +++ b/src/network/service.rs @@ -18,7 +18,7 @@ use tokio::{sync::watch, time::MissedTickBehavior}; use super::{ config::VersionRequirements, errors::{DeploymentError, SubgraphError}, - indexer_host_resolver::HostResolver, + host_filter::HostFilter, indexer_indexing_cost_model_resolver::CostModelResolver, indexer_indexing_poi_blocklist::PoiBlocklist, indexer_indexing_poi_resolver::PoiResolver, @@ -174,9 +174,8 @@ pub fn spawn( ) -> NetworkService { let internal_state = InternalState { indexer_blocklist, - indexer_host_resolver: HostResolver::new(Duration::from_secs(5)) + indexer_host_filter: HostFilter::new(indexer_host_blocklist) .expect("failed to create host resolver"), - indexer_host_blocklist, indexer_version_requirements: VersionRequirements { min_indexer_service_version, min_graph_node_version,