Skip to content

Commit

Permalink
add cache for indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
getong committed Jan 21, 2025
1 parent 4e99127 commit 3334239
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 24 deletions.
2 changes: 2 additions & 0 deletions .env-template
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
ACCOUNT_SK=""
PRIVITE_NET_KEY=""
PRIVITE_NET_ADDRESS="/ip4/0.0.0.0/tcp/8002"
USE_TESTNET_QUERY=true
10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ edition = "2021"
[dependencies]
alloy = "0.9"
base64 = "0.22"
dotenv = "0.15.0"
either = "1.13.0"
futures-util = "0.3.31"
hex = "0.4.3"
cached = "0.54"
dotenv = "0.15"
either = "1.13"
futures-util = "0.3"
hex = "0.4"
libp2p = { version = "0.55", features = [
"dns",
"tokio",
Expand All @@ -25,6 +26,7 @@ libp2p = { version = "0.55", features = [
"yamux",
"noise",
] }
once_cell = "1.20"
reqwest = { version = "0.12", features = ["json"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
Expand Down
65 changes: 45 additions & 20 deletions src/mod_libp2p/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::mod_libp2p::behavior::{AgentBehavior, AgentEvent};
use alloy::primitives::{keccak256, Address};
use base64::{engine::general_purpose::STANDARD, Engine};
use cached::{stores::SizedCache, Cached};
use either::Either;
use futures_util::StreamExt;
use libp2p::{
Expand All @@ -21,14 +22,26 @@ use libp2p::{
swarm::SwarmEvent,
tcp, yamux, PeerId, StreamProtocol, Swarm, Transport,
};
use once_cell::sync::Lazy;
use serde_json::{json, Value};
use std::{error::Error, time::Duration};
use tracing::info;
use tokio::sync::Mutex;
use tracing::{error, info};

pub(crate) struct EventLoop {
swarm: Swarm<AgentBehavior>,
}

const METRICS_ADDRESS: &str = "0x38285cb9e23ac02b9c6f4a2fff9c6df45c59f1dc";

const QUERY_INDEXER_URL: &str = match option_env!("USE_TESTNET_QUERY") {
Some(_value) => "https://api.subquery.network/sq/subquery/base-testnet",
None => "https://api.subquery.network/sq/subquery/subquery-mainnet",
};

static GLOBAL_INDEXER_CACHE: Lazy<Mutex<SizedCache<PeerId, ()>>> =
Lazy::new(|| Mutex::new(SizedCache::with_size(2000)));

impl EventLoop {
pub async fn new() -> Result<Self, Box<dyn Error>> {
match Self::start_swarm().await {
Expand Down Expand Up @@ -65,32 +78,48 @@ impl EventLoop {
}

async fn handle_identify_event(&mut self, event: IdentifyEvent) {
match event {
IdentifyEvent::Received {
connection_id,
peer_id,
info:
IdentifyInfo {
public_key,
listen_addrs,
..
},
} => {
if let IdentifyEvent::Received {
connection_id,
peer_id,
info:
IdentifyInfo {
public_key,
listen_addrs,
..
},
} = event
{
let mut indexer_cache = GLOBAL_INDEXER_CACHE.lock().await;
if indexer_cache.cache_get(&peer_id).is_none() {
if let Ok(controller_address) =
Self::libp2p_publickey_to_eth_address(&public_key).await
{
if let Ok(()) = Self::is_controller_valid(&controller_address).await {
if controller_address == METRICS_ADDRESS {
indexer_cache.cache_set(peer_id, ());
drop(indexer_cache);
for addr in listen_addrs {
self.swarm.behaviour_mut().kad.add_address(&peer_id, addr);
}
} else {
self.swarm.close_connection(connection_id);
if let Ok(()) = Self::is_controller_valid(&controller_address).await {
indexer_cache.cache_set(peer_id, ());
drop(indexer_cache);
for addr in listen_addrs {
self.swarm.behaviour_mut().kad.add_address(&peer_id, addr);
}
} else {
error!(
"peer_id {:?} not found, controller_address is {:?}",
peer_id, controller_address
);
self.swarm.close_connection(connection_id);
}
}
} else {
error!("peer_id {:?} not found, controller_address cannot be converted into ethereum address", peer_id);
self.swarm.close_connection(connection_id);
}
}
_ => {}
}
}

Expand Down Expand Up @@ -203,11 +232,7 @@ impl EventLoop {
"query": format!("{{\n indexers(filter: {{controller: {{equalToInsensitive: \"{}\"}}}}) {{\n nodes {{\n id\n }}\n }}\n}}", controller)
});

let response = client
.post("https://api.subquery.network/sq/subquery/subquery-mainnet")
.json(&query)
.send()
.await?;
let response = client.post(QUERY_INDEXER_URL).json(&query).send().await?;

let body = response.text().await?;

Expand Down

0 comments on commit 3334239

Please sign in to comment.