diff --git a/.env-template b/.env-template index 508cf09..20ed6e8 100644 --- a/.env-template +++ b/.env-template @@ -1,2 +1,4 @@ +ACCOUNT_SK="" PRIVITE_NET_KEY="" PRIVITE_NET_ADDRESS="/ip4/0.0.0.0/tcp/8002" +USE_TESTNET_QUERY=true diff --git a/Cargo.toml b/Cargo.toml index 3aed800..ed298b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,10 +6,11 @@ edition = "2021" [dependencies] alloy = "0.9" base64 = "0.22" -dotenv = "0.15.0" -either = "1.13.0" -futures-util = "0.3.31" -hex = "0.4.3" +cached = "0.54" +dotenv = "0.15" +either = "1.13" +futures-util = "0.3" +hex = "0.4" libp2p = { version = "0.55", features = [ "dns", "tokio", @@ -25,6 +26,7 @@ libp2p = { version = "0.55", features = [ "yamux", "noise", ] } +once_cell = "1.20" reqwest = { version = "0.12", features = ["json"] } serde_json = "1" tokio = { version = "1", features = ["full"] } diff --git a/src/mod_libp2p/network.rs b/src/mod_libp2p/network.rs index 2981e8e..c6b7a9f 100644 --- a/src/mod_libp2p/network.rs +++ b/src/mod_libp2p/network.rs @@ -1,6 +1,7 @@ use crate::mod_libp2p::behavior::{AgentBehavior, AgentEvent}; use alloy::primitives::{keccak256, Address}; use base64::{engine::general_purpose::STANDARD, Engine}; +use cached::{stores::SizedCache, Cached}; use either::Either; use futures_util::StreamExt; use libp2p::{ @@ -21,14 +22,26 @@ use libp2p::{ swarm::SwarmEvent, tcp, yamux, PeerId, StreamProtocol, Swarm, Transport, }; +use once_cell::sync::Lazy; use serde_json::{json, Value}; use std::{error::Error, time::Duration}; -use tracing::info; +use tokio::sync::Mutex; +use tracing::{error, info}; pub(crate) struct EventLoop { swarm: Swarm, } +const METRICS_ADDRESS: &str = "0x38285cb9e23ac02b9c6f4a2fff9c6df45c59f1dc"; + +const QUERY_INDEXER_URL: &str = match option_env!("USE_TESTNET_QUERY") { + Some(_value) => "https://api.subquery.network/sq/subquery/base-testnet", + None => "https://api.subquery.network/sq/subquery/subquery-mainnet", +}; + +static GLOBAL_INDEXER_CACHE: Lazy>> = + Lazy::new(|| Mutex::new(SizedCache::with_size(2000))); + impl EventLoop { pub async fn new() -> Result> { match Self::start_swarm().await { @@ -65,32 +78,48 @@ impl EventLoop { } async fn handle_identify_event(&mut self, event: IdentifyEvent) { - match event { - IdentifyEvent::Received { - connection_id, - peer_id, - info: - IdentifyInfo { - public_key, - listen_addrs, - .. - }, - } => { + if let IdentifyEvent::Received { + connection_id, + peer_id, + info: + IdentifyInfo { + public_key, + listen_addrs, + .. + }, + } = event + { + let mut indexer_cache = GLOBAL_INDEXER_CACHE.lock().await; + if indexer_cache.cache_get(&peer_id).is_none() { if let Ok(controller_address) = Self::libp2p_publickey_to_eth_address(&public_key).await { - if let Ok(()) = Self::is_controller_valid(&controller_address).await { + if controller_address == METRICS_ADDRESS { + indexer_cache.cache_set(peer_id, ()); + drop(indexer_cache); for addr in listen_addrs { self.swarm.behaviour_mut().kad.add_address(&peer_id, addr); } } else { - self.swarm.close_connection(connection_id); + if let Ok(()) = Self::is_controller_valid(&controller_address).await { + indexer_cache.cache_set(peer_id, ()); + drop(indexer_cache); + for addr in listen_addrs { + self.swarm.behaviour_mut().kad.add_address(&peer_id, addr); + } + } else { + error!( + "peer_id {:?} not found, controller_address is {:?}", + peer_id, controller_address + ); + self.swarm.close_connection(connection_id); + } } } else { + error!("peer_id {:?} not found, controller_address cannot be converted into ethereum address", peer_id); self.swarm.close_connection(connection_id); } } - _ => {} } } @@ -203,11 +232,7 @@ impl EventLoop { "query": format!("{{\n indexers(filter: {{controller: {{equalToInsensitive: \"{}\"}}}}) {{\n nodes {{\n id\n }}\n }}\n}}", controller) }); - let response = client - .post("https://api.subquery.network/sq/subquery/subquery-mainnet") - .json(&query) - .send() - .await?; + let response = client.post(QUERY_INDEXER_URL).json(&query).send().await?; let body = response.text().await?;