|
1 |
| -use std::sync::Arc; |
| 1 | +use std::{sync::Arc, time::Duration}; |
2 | 2 |
|
| 3 | +use anyhow::{anyhow, Error}; |
3 | 4 | use ethers::providers::Middleware;
|
| 5 | +use futures::FutureExt; |
4 | 6 | use ismp::{
|
5 | 7 | consensus::{StateMachineHeight, StateMachineId},
|
6 |
| - events::StateMachineUpdated, |
| 8 | + events::{Event, StateMachineUpdated}, |
| 9 | + host::StateMachine, |
7 | 10 | };
|
8 |
| -use tesseract_primitives::{ByzantineHandler, IsmpProvider}; |
| 11 | +use tesseract_primitives::{BoxStream, ByzantineHandler, IsmpProvider}; |
9 | 12 |
|
10 | 13 | use crate::EvmClient;
|
11 | 14 |
|
12 | 15 | #[async_trait::async_trait]
|
13 | 16 | impl ByzantineHandler for EvmClient {
|
14 | 17 | async fn check_for_byzantine_attack(
|
15 | 18 | &self,
|
| 19 | + _coprocessor: StateMachine, |
16 | 20 | counterparty: Arc<dyn IsmpProvider>,
|
17 | 21 | event: StateMachineUpdated,
|
18 | 22 | ) -> Result<(), anyhow::Error> {
|
@@ -46,4 +50,89 @@ impl ByzantineHandler for EvmClient {
|
46 | 50 |
|
47 | 51 | Ok(())
|
48 | 52 | }
|
| 53 | + |
| 54 | + async fn state_machine_updates( |
| 55 | + &self, |
| 56 | + _counterparty_state_id: StateMachineId, |
| 57 | + ) -> Result<BoxStream<Vec<StateMachineUpdated>>, Error> { |
| 58 | + use futures::StreamExt; |
| 59 | + let (tx, recv) = tokio::sync::broadcast::channel(512); |
| 60 | + |
| 61 | + let initial_height = self.client.get_block_number().await?.low_u64(); |
| 62 | + let client = self.clone(); |
| 63 | + let poll_interval = 5; |
| 64 | + tokio::spawn(async move { |
| 65 | + let mut latest_height = initial_height; |
| 66 | + let state_machine = client.state_machine; |
| 67 | + loop { |
| 68 | + tokio::time::sleep(Duration::from_secs(poll_interval)).await; |
| 69 | + // wait for an update with a greater height |
| 70 | + let block_number = match client.client.get_block_number().await { |
| 71 | + Ok(number) => number.low_u64(), |
| 72 | + Err(err) => { |
| 73 | + if let Err(err) = tx |
| 74 | + .send(Err(anyhow!( |
| 75 | + "Error fetching latest block height on {state_machine:?} {err:?}" |
| 76 | + ).into())) |
| 77 | + { |
| 78 | + log::error!(target: "tesseract", "Failed to send message over channel on {state_machine:?} \n {err:?}"); |
| 79 | + return |
| 80 | + } |
| 81 | + continue; |
| 82 | + }, |
| 83 | + }; |
| 84 | + |
| 85 | + if block_number <= latest_height { |
| 86 | + continue; |
| 87 | + } |
| 88 | + |
| 89 | + let event = StateMachineUpdated { |
| 90 | + state_machine_id: client.state_machine_id(), |
| 91 | + latest_height: block_number, |
| 92 | + }; |
| 93 | + |
| 94 | + let events = match client.query_ismp_events(latest_height, event).await { |
| 95 | + Ok(events) => events, |
| 96 | + Err(err) => { |
| 97 | + if let Err(err) = tx |
| 98 | + .send(Err(anyhow!( |
| 99 | + "Error encountered while querying ismp events {err:?}" |
| 100 | + ).into())) |
| 101 | + { |
| 102 | + log::error!(target: "tesseract", "Failed to send message over channel on {state_machine:?} \n {err:?}"); |
| 103 | + return |
| 104 | + } |
| 105 | + latest_height = block_number; |
| 106 | + continue; |
| 107 | + }, |
| 108 | + }; |
| 109 | + |
| 110 | + let events = events |
| 111 | + .into_iter() |
| 112 | + .filter_map(|ev| match ev { |
| 113 | + Event::StateMachineUpdated(update) => Some(update), |
| 114 | + _ => None, |
| 115 | + }).collect::<Vec<_>>(); |
| 116 | + |
| 117 | + if !events.is_empty() { |
| 118 | + if let Err(err) = tx |
| 119 | + .send(Ok(events)) |
| 120 | + { |
| 121 | + log::error!(target: "tesseract", "Failed to send message over channel on {state_machine:?} \n {err:?}"); |
| 122 | + return |
| 123 | + } |
| 124 | + } |
| 125 | + latest_height = block_number; |
| 126 | + } |
| 127 | + }.boxed()); |
| 128 | + |
| 129 | + let stream = tokio_stream::wrappers::BroadcastStream::new(recv).filter_map(|res| async { |
| 130 | + match res { |
| 131 | + Ok(res) => Some(res), |
| 132 | + Err(err) => Some(Err(anyhow!("{err:?}").into())), |
| 133 | + } |
| 134 | + }); |
| 135 | + |
| 136 | + Ok(Box::pin(stream)) |
| 137 | + } |
49 | 138 | }
|
0 commit comments