Skip to content

Commit

Permalink
Merge pull request #114 from darkforestry/0xOsiris/artemis-example
Browse files Browse the repository at this point in the history
  • Loading branch information
0xKitsune authored Feb 18, 2024
2 parents 243d568 + c9cd448 commit f47984b
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 1 deletion.
165 changes: 165 additions & 0 deletions examples/artemis-collector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use amms::{
amm::{
factory::Factory, uniswap_v2::factory::UniswapV2Factory,
uniswap_v3::factory::UniswapV3Factory, AutomatedMarketMaker, AMM,
},
state_space::{StateSpace, StateSpaceManager},
sync,
};
use artemis_core::engine::Engine;
use artemis_core::types::Strategy;
use async_trait::async_trait;
use ethers::{
providers::{Http, Provider, Ws},
types::{Transaction, H160},
};
use std::{collections::HashMap, ops::Deref, str::FromStr, sync::Arc};
use tokio::sync::RwLock;
#[tokio::main]
async fn main() -> eyre::Result<()> {
tracing_subscriber::fmt::init();
let rpc_endpoint = std::env::var("ETHEREUM_RPC_ENDPOINT")?;
let ws_endpoint = std::env::var("ETHEREUM_WS_ENDPOINT")?;
let middleware = Arc::new(Provider::<Http>::try_from(rpc_endpoint)?);
let stream_middleware: Arc<Provider<Ws>> =
Arc::new(Provider::<Ws>::connect(ws_endpoint).await?);

let factories = vec![
//Add UniswapV2
Factory::UniswapV2Factory(UniswapV2Factory::new(
H160::from_str("0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f")?,
2638438,
300,
)),
//Add Sushiswap
Factory::UniswapV2Factory(UniswapV2Factory::new(
H160::from_str("0xC0AEe478e3658e2610c5F7A4A2E1777cE9e4f2Ac")?,
10794229,
300,
)),
//Add UniswapV3
Factory::UniswapV3Factory(UniswapV3Factory::new(
H160::from_str("0x1F98431c8aD98523631AE4a59f267346ea31F984")?,
185,
)),
];

//Sync amms
let (amms, last_synced_block) =
sync::sync_amms(factories, middleware.clone(), None, 10000).await?;

//Initialize state space manager
let state_space_manager = StateSpaceManager::new(
amms,
last_synced_block,
100,
100,
middleware.clone(),
stream_middleware,
);

// Group amm addresses by token pairs
let pairs = aggregate_pairs(state_space_manager.state.read().await.deref());

let simple_arbitrage_strategy = SimpleArbitrage {
state_space: state_space_manager.state.clone(),
pairs,
};

let mut engine: Engine<Vec<H160>, Transaction> = Engine::new();
engine.add_collector(Box::new(state_space_manager));
engine.add_strategy(Box::new(simple_arbitrage_strategy));

//Start the engine
if let Ok(mut set) = engine.run().await {
while let Some(res) = set.join_next().await {
tracing::info!("res: {:?}", res);
}
}
Ok(())
}

pub fn aggregate_pairs(state_space: &StateSpace) -> HashMap<(H160, H160), Vec<H160>> {
let mut pairs: HashMap<(H160, H160), Vec<H160>> = HashMap::new();

for (amm_address, amm) in state_space {
let tokens = amm.tokens();

// This assumes that all pairs only have two tokens for simplicity of the example
let (token_a, token_b) = if tokens[0] < tokens[1] {
(tokens[0], tokens[1])
} else {
(tokens[1], tokens[0])
};

let pair = (token_a, token_b);

if let Some(pair_addresses) = pairs.get_mut(&pair) {
pair_addresses.push(*amm_address);
} else {
pairs.insert(pair, vec![*amm_address]);
}
}

pairs
}

struct SimpleArbitrage {
state_space: Arc<RwLock<StateSpace>>,
pairs: HashMap<(H160, H160), Vec<H160>>,
}

#[async_trait]
impl Strategy<Vec<H160>, Transaction> for SimpleArbitrage {
async fn sync_state(&mut self) -> anyhow::Result<()> {
Ok(())
}

async fn process_event(&mut self, event: Vec<H160>) -> Vec<Transaction> {
for addr in event {
let state_space = self.state_space.read().await;

let amm: &AMM = state_space
.get(&addr)
// We can expect here because we know the address is from the state space collector
.expect("Could not find amm in Statespace");

let tokens = amm.tokens();
let pair_key = if tokens[0] < tokens[1] {
(tokens[0], tokens[1])
} else {
(tokens[1], tokens[0])
};

if let Some(pair_addresses) = self.pairs.get(&pair_key) {
let transactions = vec![];

for amm_address in pair_addresses {
let congruent_amm = state_space
.get(amm_address)
// We can expect here because we know the address is from the state space collector
.expect("Could not find amm in Statespace");
let amm_weight_0 = amm.calculate_price(tokens[0]).unwrap();
let amm_weight_1 = amm.calculate_price(tokens[1]).unwrap();

let congruent_amm_weight_0 = congruent_amm.calculate_price(tokens[0]).unwrap();
let congruent_amm_weight_1 = congruent_amm.calculate_price(tokens[1]).unwrap();

// Negative cycle
if amm_weight_0 * congruent_amm_weight_1 > 1_f64 {
tracing::info!("Simple Arbitrage detected path from {:?} to {:?} token path {:?} - {:?} - {:?}", addr, amm_address, tokens[0], tokens[1], tokens[0]);
}

// Negative cycle
if amm_weight_1 * congruent_amm_weight_0 > 1_f64 {
tracing::info!("Simple Arbitrage detected path from {:?} to {:?} token path {:?} - {:?} - {:?}", addr, amm_address, tokens[1], tokens[0], tokens[1]);
}
}

return transactions;
}
}

vec![]
}
}
8 changes: 8 additions & 0 deletions src/amm/uniswap_v3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,7 @@ mod test {
}

#[tokio::test]
#[ignore] //Ignoring to not throttle the Provider on workflows
async fn test_simulate_swap_usdc_weth() -> eyre::Result<()> {
let rpc_endpoint = std::env::var("ETHEREUM_RPC_ENDPOINT")?;
let middleware = Arc::new(Provider::<Http>::try_from(rpc_endpoint)?);
Expand Down Expand Up @@ -1204,6 +1205,7 @@ mod test {
}

#[tokio::test]
#[ignore] //Ignoring to not throttle the Provider on workflows
async fn test_simulate_swap_weth_usdc() -> eyre::Result<()> {
let rpc_endpoint = std::env::var("ETHEREUM_RPC_ENDPOINT")?;
let middleware = Arc::new(Provider::<Http>::try_from(rpc_endpoint)?);
Expand Down Expand Up @@ -1287,6 +1289,7 @@ mod test {
}

#[tokio::test]
#[ignore] //Ignoring to not throttle the Provider on workflows
async fn test_simulate_swap_link_weth() -> eyre::Result<()> {
let rpc_endpoint = std::env::var("ETHEREUM_RPC_ENDPOINT")?;
let middleware = Arc::new(Provider::<Http>::try_from(rpc_endpoint)?);
Expand Down Expand Up @@ -1370,6 +1373,7 @@ mod test {
}

#[tokio::test]
#[ignore] //Ignoring to not throttle the Provider on workflows
async fn test_simulate_swap_weth_link() -> eyre::Result<()> {
let rpc_endpoint = std::env::var("ETHEREUM_RPC_ENDPOINT")?;
let middleware = Arc::new(Provider::<Http>::try_from(rpc_endpoint)?);
Expand Down Expand Up @@ -1453,6 +1457,7 @@ mod test {
}

#[tokio::test]
#[ignore] //Ignoring to not throttle the Provider on workflows
async fn test_simulate_swap_mut_usdc_weth() -> eyre::Result<()> {
let rpc_endpoint = std::env::var("ETHEREUM_RPC_ENDPOINT")?;
let middleware = Arc::new(Provider::<Http>::try_from(rpc_endpoint)?);
Expand Down Expand Up @@ -1535,6 +1540,7 @@ mod test {
}

#[tokio::test]
#[ignore] //Ignoring to not throttle the Provider on workflows
async fn test_simulate_swap_mut_weth_usdc() -> eyre::Result<()> {
let rpc_endpoint = std::env::var("ETHEREUM_RPC_ENDPOINT")?;
let middleware = Arc::new(Provider::<Http>::try_from(rpc_endpoint)?);
Expand Down Expand Up @@ -1618,6 +1624,7 @@ mod test {
}

#[tokio::test]
#[ignore] //Ignoring to not throttle the Provider on workflows
async fn test_simulate_swap_mut_link_weth() -> eyre::Result<()> {
let rpc_endpoint = std::env::var("ETHEREUM_RPC_ENDPOINT")?;
let middleware = Arc::new(Provider::<Http>::try_from(rpc_endpoint)?);
Expand Down Expand Up @@ -1701,6 +1708,7 @@ mod test {
}

#[tokio::test]
#[ignore] //Ignoring to not throttle the Provider on workflows
async fn test_simulate_swap_mut_weth_link() -> eyre::Result<()> {
let rpc_endpoint = std::env::var("ETHEREUM_RPC_ENDPOINT")?;
let middleware = Arc::new(Provider::<Http>::try_from(rpc_endpoint)?);
Expand Down
2 changes: 1 addition & 1 deletion src/state_space/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ pub async fn handle_state_changes_from_logs<M: Middleware>(
let mut updated_amms = vec![];
let mut state_changes = vec![];

let mut last_log_block_number = if let Some(log) = logs.get(0) {
let mut last_log_block_number = if let Some(log) = logs.first() {
get_block_number_from_log(log)?
} else {
return Ok(updated_amms);
Expand Down

0 comments on commit f47984b

Please sign in to comment.