Skip to content

Commit 2fb5baa

Browse files
authored
Push evm transactions into a queue. (#311)
1 parent 3170047 commit 2fb5baa

File tree

18 files changed

+384
-83
lines changed

18 files changed

+384
-83
lines changed

Cargo.lock

+18-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ members = [
7676
"tesseract/fees/prisma-cli",
7777
"tesseract/telemetry",
7878
"tesseract/config",
79+
"tesseract/fisherman",
7980

8081
# integration tests
8182
"tesseract/integration-test",
@@ -304,6 +305,7 @@ nexus-runtime = { path = "./parachain/runtimes/nexus", default-features = false
304305
tesseract-primitives = { path = "tesseract/primitives" }
305306
tesseract-consensus = { path = "tesseract/consensus" }
306307
tesseract-messaging = { path = "tesseract/messaging" }
308+
tesseract-fisherman = { path = "tesseract/fisherman" }
307309
tesseract-substrate = { path = "tesseract/substrate" }
308310
tesseract-evm = { path = "tesseract/evm" }
309311
tesseract = { path = "tesseract/relayer" }

tesseract/evm/src/byzantine.rs

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use std::sync::Arc;
2+
3+
use ethers::providers::Middleware;
4+
use ismp::{
5+
consensus::{StateMachineHeight, StateMachineId},
6+
events::StateMachineUpdated,
7+
};
8+
use tesseract_primitives::{ByzantineHandler, IsmpProvider};
9+
10+
use crate::EvmClient;
11+
12+
#[async_trait::async_trait]
13+
impl ByzantineHandler for EvmClient {
14+
async fn check_for_byzantine_attack(
15+
&self,
16+
counterparty: Arc<dyn IsmpProvider>,
17+
event: StateMachineUpdated,
18+
) -> Result<(), anyhow::Error> {
19+
let height = StateMachineHeight {
20+
id: StateMachineId {
21+
state_id: self.state_machine,
22+
consensus_state_id: self.consensus_state_id,
23+
},
24+
height: event.latest_height,
25+
};
26+
let Some(header) = self.client.get_block(event.latest_height).await? else {
27+
// If block header is not found veto the state commitment
28+
log::info!(
29+
"Vetoing State Machine Update for {} on {}",
30+
self.state_machine,
31+
counterparty.state_machine_id().state_id
32+
);
33+
counterparty.veto_state_commitment(height).await?;
34+
return Ok(())
35+
};
36+
37+
let state_machine_commitment = counterparty.query_state_machine_commitment(height).await?;
38+
if header.state_root != state_machine_commitment.state_root {
39+
log::info!(
40+
"Vetoing State Machine Update for {} on {}",
41+
self.state_machine,
42+
counterparty.state_machine_id().state_id
43+
);
44+
counterparty.veto_state_commitment(height).await?;
45+
}
46+
47+
Ok(())
48+
}
49+
}

tesseract/evm/src/lib.rs

+21-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use ethers::{
88
signers::Signer,
99
};
1010
use frame_support::crypto::ecdsa::ECDSAExt;
11-
use ismp::{consensus::ConsensusStateId, events::Event, host::StateMachine};
11+
use ismp::{consensus::ConsensusStateId, events::Event, host::StateMachine, messaging::Message};
1212

1313
use evm_common::presets::{
1414
REQUEST_COMMITMENTS_SLOT, REQUEST_RECEIPTS_SLOT, RESPONSE_COMMITMENTS_SLOT,
@@ -19,9 +19,14 @@ use ismp_solidity_abi::shared_types::{StateCommitment, StateMachineHeight};
1919
use serde::{Deserialize, Serialize};
2020
use sp_core::{bytes::from_hex, keccak_256, Pair, H160};
2121
use std::{sync::Arc, time::Duration};
22-
use tesseract_primitives::{IsmpProvider, StateMachineUpdated, StreamError};
22+
use tesseract_primitives::{
23+
queue::{start_pipeline, PipelineQueue},
24+
IsmpProvider, StateMachineUpdated, StreamError, TxReceipt,
25+
};
26+
use tx::handle_message_submission;
2327

2428
pub mod abi;
29+
mod byzantine;
2530
mod gas_oracle;
2631
pub mod provider;
2732

@@ -135,6 +140,8 @@ pub struct EvmClient {
135140
Option<tokio::sync::broadcast::Sender<Result<StateMachineUpdated, StreamError>>>,
136141
>,
137142
>,
143+
/// Tx submission pipeline
144+
queue: Option<Arc<PipelineQueue<Vec<Message>, anyhow::Result<Vec<TxReceipt>>>>>,
138145
}
139146

140147
impl EvmClient {
@@ -168,7 +175,7 @@ impl EvmClient {
168175
};
169176

170177
let latest_height = client.get_block_number().await?.as_u64();
171-
Ok(Self {
178+
let mut partial_client = Self {
172179
client,
173180
signer,
174181
address,
@@ -179,7 +186,16 @@ impl EvmClient {
179186
chain_id,
180187
client_type: config.client_type.unwrap_or_default(),
181188
state_machine_update_sender: Arc::new(tokio::sync::Mutex::new(None)),
182-
})
189+
queue: None,
190+
};
191+
192+
let partial_client_clone = partial_client.clone();
193+
let queue = start_pipeline(move |messages| {
194+
let client = partial_client_clone.clone();
195+
async move { handle_message_submission(&client, messages).await }
196+
});
197+
partial_client.queue = Some(Arc::new(queue));
198+
Ok(partial_client)
183199
}
184200

185201
pub async fn events(&self, from: u64, to: u64) -> Result<Vec<Event>, anyhow::Error> {
@@ -347,6 +363,7 @@ impl Clone for EvmClient {
347363
chain_id: self.chain_id.clone(),
348364
client_type: self.client_type.clone(),
349365
state_machine_update_sender: self.state_machine_update_sender.clone(),
366+
queue: self.queue.clone(),
350367
}
351368
}
352369
}

tesseract/evm/src/provider.rs

+11-58
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use crate::{
22
abi::{beefy::BeefyConsensusState, EvmHost},
33
gas_oracle::is_orbit_chain,
4-
state_comitment_key,
5-
tx::submit_messages,
6-
EvmClient,
4+
state_comitment_key, EvmClient,
75
};
86
use anyhow::{anyhow, Error};
97
use beefy_verifier_primitives::ConsensusState;
@@ -17,7 +15,7 @@ use evm_common::types::EvmStateProof;
1715
use ismp::{
1816
consensus::{ConsensusStateId, StateMachineId},
1917
events::{Event, StateCommitmentVetoed},
20-
messaging::{hash_request, hash_response, Message, StateCommitmentHeight},
18+
messaging::{Message, StateCommitmentHeight},
2119
};
2220
use ismp_solidity_abi::evm_host::{PostRequestHandledFilter, PostResponseHandledFilter};
2321
use pallet_ismp_host_executive::{EvmHostParam, HostParam};
@@ -38,15 +36,14 @@ use futures::{stream::FuturesOrdered, FutureExt};
3836
use ismp::{
3937
consensus::{StateCommitment, StateMachineHeight},
4038
host::StateMachine,
41-
messaging::{CreateConsensusState, ResponseMessage},
42-
router::{Request, RequestResponse},
39+
messaging::CreateConsensusState,
4340
};
4441
use primitive_types::U256;
4542
use sp_core::{H160, H256};
4643
use std::{collections::BTreeMap, sync::Arc, time::Duration};
4744
use tesseract_primitives::{
48-
wait_for_challenge_period, BoxStream, EstimateGasReturnParams, Hasher, IsmpProvider, Query,
49-
Signature, StateMachineUpdated, StateProofQueryType, TxReceipt,
45+
wait_for_challenge_period, BoxStream, EstimateGasReturnParams, IsmpProvider, Query, Signature,
46+
StateMachineUpdated, StateProofQueryType, TxReceipt,
5047
};
5148

5249
#[async_trait::async_trait]
@@ -722,56 +719,12 @@ impl IsmpProvider for EvmClient {
722719
}
723720

724721
async fn submit(&self, messages: Vec<Message>) -> Result<Vec<TxReceipt>, Error> {
725-
let receipts = submit_messages(&self, messages.clone()).await?;
726-
let height = self.client.get_block_number().await?.low_u64();
727-
let mut results = vec![];
728-
for msg in messages {
729-
match msg {
730-
Message::Request(req_msg) =>
731-
for post in req_msg.requests {
732-
let req = Request::Post(post);
733-
let commitment = hash_request::<Hasher>(&req);
734-
if receipts.contains(&commitment) {
735-
let tx_receipt = TxReceipt::Request {
736-
query: Query {
737-
source_chain: req.source_chain(),
738-
dest_chain: req.dest_chain(),
739-
nonce: req.nonce(),
740-
commitment,
741-
},
742-
height,
743-
};
744-
745-
results.push(tx_receipt);
746-
}
747-
},
748-
Message::Response(ResponseMessage {
749-
datagram: RequestResponse::Response(resp),
750-
..
751-
}) =>
752-
for res in resp {
753-
let commitment = hash_response::<Hasher>(&res);
754-
let request_commitment = hash_request::<Hasher>(&res.request());
755-
if receipts.contains(&commitment) {
756-
let tx_receipt = TxReceipt::Response {
757-
query: Query {
758-
source_chain: res.source_chain(),
759-
dest_chain: res.dest_chain(),
760-
nonce: res.nonce(),
761-
commitment,
762-
},
763-
request_commitment,
764-
height,
765-
};
766-
767-
results.push(tx_receipt);
768-
}
769-
},
770-
_ => {},
771-
}
772-
}
773-
774-
Ok(results)
722+
let queue = self
723+
.queue
724+
.as_ref()
725+
.ok_or_else(|| anyhow!("Trasnsaction submission pipeline was not initialized"))?
726+
.clone();
727+
queue.send(messages).await?
775728
}
776729

777730
fn request_commitment_full_key(&self, commitment: H256) -> Vec<Vec<u8>> {

tesseract/evm/src/tx.rs

+59-2
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use ethers::{
1717
};
1818
use ismp::{
1919
host::StateMachine,
20-
messaging::{Message, ResponseMessage},
21-
router::{RequestResponse, Response},
20+
messaging::{hash_request, hash_response, Message, ResponseMessage},
21+
router::{Request, RequestResponse, Response},
2222
};
2323
use ismp_solidity_abi::{
2424
beefy::StateMachineHeight,
@@ -33,6 +33,7 @@ use pallet_ismp::mmr::{LeafIndexAndPos, Proof as MmrProof};
3333
use primitive_types::{H160, H256, U256};
3434
use sp_mmr_primitives::utils::NodesUtils;
3535
use std::{collections::BTreeSet, sync::Arc, time::Duration};
36+
use tesseract_primitives::{Hasher, Query, TxReceipt};
3637

3738
use crate::gas_oracle::get_current_gas_cost_in_usd;
3839

@@ -392,3 +393,59 @@ pub fn get_chain_gas_limit(state_machine: StateMachine) -> u64 {
392393
_ => Default::default(),
393394
}
394395
}
396+
397+
pub async fn handle_message_submission(
398+
client: &EvmClient,
399+
messages: Vec<Message>,
400+
) -> Result<Vec<TxReceipt>, anyhow::Error> {
401+
let receipts = submit_messages(client, messages.clone()).await?;
402+
let height = client.client.get_block_number().await?.low_u64();
403+
let mut results = vec![];
404+
for msg in messages {
405+
match msg {
406+
Message::Request(req_msg) =>
407+
for post in req_msg.requests {
408+
let req = Request::Post(post);
409+
let commitment = hash_request::<Hasher>(&req);
410+
if receipts.contains(&commitment) {
411+
let tx_receipt = TxReceipt::Request {
412+
query: Query {
413+
source_chain: req.source_chain(),
414+
dest_chain: req.dest_chain(),
415+
nonce: req.nonce(),
416+
commitment,
417+
},
418+
height,
419+
};
420+
421+
results.push(tx_receipt);
422+
}
423+
},
424+
Message::Response(ResponseMessage {
425+
datagram: RequestResponse::Response(resp),
426+
..
427+
}) =>
428+
for res in resp {
429+
let commitment = hash_response::<Hasher>(&res);
430+
let request_commitment = hash_request::<Hasher>(&res.request());
431+
if receipts.contains(&commitment) {
432+
let tx_receipt = TxReceipt::Response {
433+
query: Query {
434+
source_chain: res.source_chain(),
435+
dest_chain: res.dest_chain(),
436+
nonce: res.nonce(),
437+
commitment,
438+
},
439+
request_commitment,
440+
height,
441+
};
442+
443+
results.push(tx_receipt);
444+
}
445+
},
446+
_ => {},
447+
}
448+
}
449+
450+
Ok(results)
451+
}

tesseract/fisherman/Cargo.toml

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[package]
2+
name = "tesseract-fisherman"
3+
version = "0.1.0"
4+
edition = "2021"
5+
description = "A process that checks for malicious consensus updates"
6+
authors = ["Polytope Labs <hello@polytope.technology>"]
7+
8+
[dependencies]
9+
anyhow = "1.0.75"
10+
log = "0.4.17"
11+
futures = "0.3.28"
12+
tracing = "0.1.40"
13+
tokio = { workspace = true, features = ["full"] }
14+
sp-core = { workspace = true, features = ["full_crypto"] }
15+
16+
ismp = { workspace = true }
17+
pallet-ismp = { workspace = true }
18+
19+
tesseract-primitives = { workspace = true }
20+
sc-service = { workspace = true }

0 commit comments

Comments
 (0)