diff --git a/src/discovery/factory.rs b/src/discovery/factory.rs index f95b42e2..f0129e6c 100644 --- a/src/discovery/factory.rs +++ b/src/discovery/factory.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use futures::stream::{FuturesUnordered, StreamExt}; + use alloy::{ network::Network, primitives::{Address, B256}, @@ -38,12 +40,12 @@ pub async fn discover_factories( factories: Vec, number_of_amms_threshold: u64, provider: P, - step: u64, + block_step: u64, ) -> Result, AMMError> where - T: Transport + Clone, - N: Network, - P: Provider + Clone, + T: Transport + Clone + 'static, + N: Network + 'static, + P: Provider + Clone + Send + Sync + 'static, { let mut event_signatures = vec![]; @@ -52,60 +54,60 @@ where } tracing::trace!(?event_signatures); - let block_filter = Filter::new().event_signature(event_signatures); - - let mut from_block = 0; - let current_block = provider.get_block_number().await?; - // For each block within the range, get all pairs asynchronously - // let step = 100000; + let mut from_block = 0; + let block_number = provider.get_block_number().await?; - // Set up filter and events to filter each block you are searching by - let mut identified_factories: HashMap = HashMap::new(); + // set up a vector with the block range for each batch + let mut block_num_vec: Vec<(u64, u64)> = Vec::new(); - // TODO: make this async - while from_block < current_block { + // populate the vector + while from_block < block_number { // Get pair created event logs within the block range - let mut target_block = from_block + step - 1; - if target_block > current_block { - target_block = current_block; + let mut target_block = from_block + block_step - 1; + if target_block > block_number { + target_block = block_number; } + block_num_vec.push((from_block, target_block)); + + from_block += block_step; + } + + // Create futures unordered + let factories_tasks = FuturesUnordered::new(); + // Set up filter and events to filter each block you are searching by + let block_filter = Filter::new().event_signature(event_signatures); + + // Push task to futures unordered + for (from_block, target_block) in block_num_vec { let block_filter = block_filter.clone(); - let logs = provider - .get_logs(&block_filter.from_block(from_block).to_block(target_block)) - .await?; - - for log in logs { - tracing::trace!("found matching event at factory {}", log.address()); - if let Some((_, amms_length)) = identified_factories.get_mut(&log.address()) { - *amms_length += 1; - } else { - let mut factory = Factory::try_from(log.topics()[0])?; - - match &mut factory { - Factory::UniswapV2Factory(uniswap_v2_factory) => { - uniswap_v2_factory.address = log.address(); - uniswap_v2_factory.creation_block = - log.block_number.ok_or(AMMError::BlockNumberNotFound)?; - } - Factory::UniswapV3Factory(uniswap_v3_factory) => { - uniswap_v3_factory.address = log.address(); - uniswap_v3_factory.creation_block = - log.block_number.ok_or(AMMError::BlockNumberNotFound)?; - } - Factory::BalancerV2Factory(balancer_v2_factory) => { - balancer_v2_factory.address = log.address(); - balancer_v2_factory.creation_block = - log.block_number.ok_or(AMMError::BlockNumberNotFound)?; - } - } + let provider = provider.clone(); + factories_tasks.push(async move { + process_block_logs_batch(&from_block, &target_block, provider, &block_filter).await + }); + } + + // collect the results when they are finished + let factory_results = factories_tasks.collect::>().await; - identified_factories.insert(log.address(), (factory, 0)); + // process resulst + let mut identified_factories: HashMap = HashMap::new(); + for result in factory_results { + match result { + Ok(local_identified_factories) => { + for (addrs, (factory, count)) in local_identified_factories { + identified_factories + .entry(addrs) + .and_modify(|entry| entry.1 += count) // Increment the count if the address exists + .or_insert((factory, count)); // Insert new entry if the address doesn't exist + } + } + Err(e) => { + // The task itself failed (possibly panicked). + tracing::error!("Task error: {:?}", e) } } - - from_block += step; } let mut filtered_factories = vec![]; @@ -121,3 +123,52 @@ where Ok(filtered_factories) } + +async fn process_block_logs_batch( + from_block: &u64, + target_block: &u64, + provider: P, + block_filter: &Filter, +) -> Result, AMMError> +where + T: Transport + Clone, + N: Network, + P: Provider + Clone, +{ + let block_filter = block_filter.clone(); + let mut local_identified_factories: HashMap = HashMap::new(); + + let logs = provider + .get_logs(&block_filter.from_block(*from_block).to_block(*target_block)) + .await?; + + for log in logs { + if let Some((_, amms_length)) = local_identified_factories.get_mut(&log.address()) { + *amms_length += 1; + } else { + let mut factory = Factory::try_from(log.topics()[0])?; + + match &mut factory { + Factory::UniswapV2Factory(uniswap_v2_factory) => { + uniswap_v2_factory.address = log.address(); + uniswap_v2_factory.creation_block = + log.block_number.ok_or(AMMError::BlockNumberNotFound)?; + } + Factory::UniswapV3Factory(uniswap_v3_factory) => { + uniswap_v3_factory.address = log.address(); + uniswap_v3_factory.creation_block = + log.block_number.ok_or(AMMError::BlockNumberNotFound)?; + } + Factory::BalancerV2Factory(balancer_v2_factory) => { + balancer_v2_factory.address = log.address(); + balancer_v2_factory.creation_block = + log.block_number.ok_or(AMMError::BlockNumberNotFound)?; + } + } + + local_identified_factories.insert(log.address(), (factory, 0)); + } + } + + Ok(local_identified_factories) +}