Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added async to discover factories #230

Merged
merged 4 commits into from
Dec 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 99 additions & 48 deletions src/discovery/factory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashMap;

use futures::stream::{FuturesUnordered, StreamExt};

use alloy::{
network::Network,
primitives::{Address, B256},
Expand Down Expand Up @@ -38,12 +40,12 @@ pub async fn discover_factories<T, N, P>(
factories: Vec<DiscoverableFactory>,
number_of_amms_threshold: u64,
provider: P,
step: u64,
block_step: u64,
) -> Result<Vec<Factory>, AMMError>
where
T: Transport + Clone,
N: Network,
P: Provider<T, N> + Clone,
T: Transport + Clone + 'static,
N: Network + 'static,
P: Provider<T, N> + Clone + Send + Sync + 'static,
{
let mut event_signatures = vec![];

Expand All @@ -52,60 +54,60 @@ where
}
tracing::trace!(?event_signatures);

let block_filter = Filter::new().event_signature(event_signatures);

let mut from_block = 0;
let current_block = provider.get_block_number().await?;

// For each block within the range, get all pairs asynchronously
// let step = 100000;
let mut from_block = 0;
let block_number = provider.get_block_number().await?;

// Set up filter and events to filter each block you are searching by
let mut identified_factories: HashMap<Address, (Factory, u64)> = HashMap::new();
// set up a vector with the block range for each batch
let mut block_num_vec: Vec<(u64, u64)> = Vec::new();

// TODO: make this async
while from_block < current_block {
// populate the vector
while from_block < block_number {
// Get pair created event logs within the block range
let mut target_block = from_block + step - 1;
if target_block > current_block {
target_block = current_block;
let mut target_block = from_block + block_step - 1;
if target_block > block_number {
target_block = block_number;
}

block_num_vec.push((from_block, target_block));

from_block += block_step;
}

// Create futures unordered
let factories_tasks = FuturesUnordered::new();
// Set up filter and events to filter each block you are searching by
let block_filter = Filter::new().event_signature(event_signatures);

// Push task to futures unordered
for (from_block, target_block) in block_num_vec {
let block_filter = block_filter.clone();
let logs = provider
.get_logs(&block_filter.from_block(from_block).to_block(target_block))
.await?;

for log in logs {
tracing::trace!("found matching event at factory {}", log.address());
if let Some((_, amms_length)) = identified_factories.get_mut(&log.address()) {
*amms_length += 1;
} else {
let mut factory = Factory::try_from(log.topics()[0])?;

match &mut factory {
Factory::UniswapV2Factory(uniswap_v2_factory) => {
uniswap_v2_factory.address = log.address();
uniswap_v2_factory.creation_block =
log.block_number.ok_or(AMMError::BlockNumberNotFound)?;
}
Factory::UniswapV3Factory(uniswap_v3_factory) => {
uniswap_v3_factory.address = log.address();
uniswap_v3_factory.creation_block =
log.block_number.ok_or(AMMError::BlockNumberNotFound)?;
}
Factory::BalancerV2Factory(balancer_v2_factory) => {
balancer_v2_factory.address = log.address();
balancer_v2_factory.creation_block =
log.block_number.ok_or(AMMError::BlockNumberNotFound)?;
}
}
let provider = provider.clone();
factories_tasks.push(async move {
process_block_logs_batch(&from_block, &target_block, provider, &block_filter).await
});
}

// collect the results when they are finished
let factory_results = factories_tasks.collect::<Vec<_>>().await;

identified_factories.insert(log.address(), (factory, 0));
// process resulst
let mut identified_factories: HashMap<Address, (Factory, u64)> = HashMap::new();
for result in factory_results {
match result {
Ok(local_identified_factories) => {
for (addrs, (factory, count)) in local_identified_factories {
identified_factories
.entry(addrs)
.and_modify(|entry| entry.1 += count) // Increment the count if the address exists
.or_insert((factory, count)); // Insert new entry if the address doesn't exist
}
}
Err(e) => {
// The task itself failed (possibly panicked).
tracing::error!("Task error: {:?}", e)
}
}

from_block += step;
}

let mut filtered_factories = vec![];
Expand All @@ -121,3 +123,52 @@ where

Ok(filtered_factories)
}

async fn process_block_logs_batch<T, N, P>(
from_block: &u64,
target_block: &u64,
provider: P,
block_filter: &Filter,
) -> Result<HashMap<Address, (Factory, u64)>, AMMError>
where
T: Transport + Clone,
N: Network,
P: Provider<T, N> + Clone,
{
let block_filter = block_filter.clone();
let mut local_identified_factories: HashMap<Address, (Factory, u64)> = HashMap::new();

let logs = provider
.get_logs(&block_filter.from_block(*from_block).to_block(*target_block))
.await?;

for log in logs {
if let Some((_, amms_length)) = local_identified_factories.get_mut(&log.address()) {
*amms_length += 1;
} else {
let mut factory = Factory::try_from(log.topics()[0])?;

match &mut factory {
Factory::UniswapV2Factory(uniswap_v2_factory) => {
uniswap_v2_factory.address = log.address();
uniswap_v2_factory.creation_block =
log.block_number.ok_or(AMMError::BlockNumberNotFound)?;
}
Factory::UniswapV3Factory(uniswap_v3_factory) => {
uniswap_v3_factory.address = log.address();
uniswap_v3_factory.creation_block =
log.block_number.ok_or(AMMError::BlockNumberNotFound)?;
}
Factory::BalancerV2Factory(balancer_v2_factory) => {
balancer_v2_factory.address = log.address();
balancer_v2_factory.creation_block =
log.block_number.ok_or(AMMError::BlockNumberNotFound)?;
}
}

local_identified_factories.insert(log.address(), (factory, 0));
}
}

Ok(local_identified_factories)
}
Loading