Skip to content

Commit e4db05c

Browse files
authored
Add retry middleware to sync committee prover (#117)
1 parent b9fbaef commit e4db05c

File tree

6 files changed

+224
-47
lines changed

6 files changed

+224
-47
lines changed

Cargo.lock

+51
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

evm/abi/src/generated/host_manager.rs

+2-2
Large diffs are not rendered by default.

modules/consensus/sync-committee/prover/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ sync-committee-primitives = { path= "../primitives" }
1010
sync-committee-verifier = { path= "../verifier" }
1111
ssz-rs = { git = "https://github.com/polytope-labs/ssz-rs", branch = "main" }
1212
reqwest = {version="0.11.14", features=["json"]}
13+
reqwest-middleware = "0.2.4"
14+
reqwest-chain = "0.1.0"
1315
serde = { version = "1.0.185", features = ["derive"] }
1416
serde_json = { version = "1.0.81"}
1517
anyhow = "1.0.68"
@@ -24,6 +26,7 @@ bls_on_arkworks = { version = "0.2.2" }
2426
primitive-types = { version = "0.12.1", features = ["serde_no_std", "impl-codec"] }
2527
log = "0.4.20"
2628
hex = "0.4.3"
29+
async-trait = "0.1.77"
2730

2831

2932
[dev-dependencies]

modules/consensus/sync-committee/prover/src/lib.rs

+80-40
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod middleware;
12
#[warn(unused_imports)]
23
#[warn(unused_variables)]
34
mod responses;
@@ -6,26 +7,25 @@ mod routes;
67
#[cfg(test)]
78
mod test;
89

9-
use anyhow::anyhow;
10-
use bls_on_arkworks::{point_to_pubkey, types::G1ProjectivePoint};
11-
use log::debug;
12-
use reqwest::Client;
13-
use std::marker::PhantomData;
14-
use sync_committee_primitives::{
15-
consensus_types::{BeaconBlock, BeaconBlockHeader, BeaconState, Checkpoint, Validator},
16-
types::VerifierState,
17-
};
18-
1910
use crate::{
11+
middleware::SwitchProviderMiddleware,
2012
responses::{
2113
finality_checkpoint_response::FinalityCheckpoint,
2214
sync_committee_response::NodeSyncCommittee,
2315
},
2416
routes::*,
2517
};
18+
use anyhow::anyhow;
19+
use bls_on_arkworks::{point_to_pubkey, types::G1ProjectivePoint};
20+
use log::trace;
2621
use primitive_types::H256;
22+
use reqwest::{Client, Url};
23+
use reqwest_chain::ChainMiddleware;
24+
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
2725
use ssz_rs::{Merkleized, Node};
26+
use std::marker::PhantomData;
2827
use sync_committee_primitives::{
28+
consensus_types::{BeaconBlock, BeaconBlockHeader, BeaconState, Checkpoint, Validator},
2929
constants::{
3030
BlsPublicKey, Config, Root, BLOCK_ROOTS_INDEX, BYTES_PER_LOGS_BLOOM,
3131
EPOCHS_PER_HISTORICAL_VECTOR, EPOCHS_PER_SLASHINGS_VECTOR, ETH1_DATA_VOTES_BOUND,
@@ -39,7 +39,7 @@ use sync_committee_primitives::{
3939
deneb::MAX_BLOB_COMMITMENTS_PER_BLOCK,
4040
types::{
4141
AncestryProof, BlockRootsProof, ExecutionPayloadProof, FinalityProof, SyncCommitteeUpdate,
42-
VerifierStateUpdate,
42+
VerifierState, VerifierStateUpdate,
4343
},
4444
util::{
4545
compute_epoch_at_slot, compute_sync_committee_period_at_slot,
@@ -63,43 +63,68 @@ pub type BeaconStateType = BeaconState<
6363
>;
6464

6565
pub struct SyncCommitteeProver<C: Config> {
66-
pub node_url: String,
67-
pub client: Client,
66+
pub primary_url: String,
67+
pub providers: Vec<String>,
68+
pub client: ClientWithMiddleware,
6869
pub phantom: PhantomData<C>,
6970
}
7071

7172
impl<C: Config> Clone for SyncCommitteeProver<C> {
7273
fn clone(&self) -> Self {
73-
Self { node_url: self.node_url.clone(), client: self.client.clone(), phantom: PhantomData }
74+
Self {
75+
primary_url: self.primary_url.clone(),
76+
client: self.client.clone(),
77+
providers: self.providers.clone(),
78+
phantom: PhantomData,
79+
}
7480
}
7581
}
7682

7783
impl<C: Config> SyncCommitteeProver<C> {
78-
pub fn new(node_url: String) -> Self {
79-
let client = Client::new();
80-
81-
SyncCommitteeProver::<C> { node_url, client, phantom: PhantomData }
84+
pub fn new(providers: Vec<String>) -> Self {
85+
let client = ClientBuilder::new(Client::new())
86+
.with(ChainMiddleware::new(SwitchProviderMiddleware::_new(providers.clone())))
87+
.build();
88+
89+
SyncCommitteeProver::<C> {
90+
primary_url: providers.get(0).expect("There must be atleast one provider").clone(),
91+
providers,
92+
client,
93+
phantom: PhantomData,
94+
}
8295
}
8396

8497
pub async fn fetch_finalized_checkpoint(
8598
&self,
8699
state_id: Option<&str>,
87100
) -> Result<FinalityCheckpoint, anyhow::Error> {
88-
let full_url = self.generate_route(&finality_checkpoints(state_id.unwrap_or("head")));
89-
let response = self.client.get(full_url).send().await?;
90-
91-
let response_data =
92-
response.json::<responses::finality_checkpoint_response::Response>().await?;
101+
let full_url = self.generate_route(&finality_checkpoints(state_id.unwrap_or("head")))?;
102+
let response = self
103+
.client
104+
.get(full_url)
105+
.send()
106+
.await
107+
.map_err(|e| anyhow!("Failed to fetch finalized checkpoint due to error {e:?}"))?;
108+
109+
let response_data = response
110+
.json::<responses::finality_checkpoint_response::Response>()
111+
.await
112+
.map_err(|e| anyhow!("Failed to fetch finalized checkpoint due to error {e:?}"))?;
93113
Ok(response_data.data)
94114
}
95115

96116
pub async fn fetch_header(&self, block_id: &str) -> Result<BeaconBlockHeader, anyhow::Error> {
97117
let path = header_route(block_id);
98-
let full_url = self.generate_route(&path);
99-
let response = self.client.get(full_url).send().await?;
118+
let full_url = self.generate_route(&path)?;
119+
let response =
120+
self.client.get(full_url).send().await.map_err(|e| {
121+
anyhow!("Failed to fetch header with id {block_id} due to error {e:?}")
122+
})?;
100123

101-
let response_data =
102-
response.json::<responses::beacon_block_header_response::Response>().await?;
124+
let response_data = response
125+
.json::<responses::beacon_block_header_response::Response>()
126+
.await
127+
.map_err(|e| anyhow!("Failed to fetch header with id {block_id} due to error {e:?}"))?;
103128

104129
let beacon_block_header = response_data.data.header.message;
105130

@@ -129,11 +154,17 @@ impl<C: Config> SyncCommitteeProver<C> {
129154
anyhow::Error,
130155
> {
131156
let path = block_route(block_id);
132-
let full_url = self.generate_route(&path);
157+
let full_url = self.generate_route(&path)?;
133158

134-
let response = self.client.get(full_url).send().await?;
159+
let response =
160+
self.client.get(full_url).send().await.map_err(|e| {
161+
anyhow!("Failed to fetch block with id {block_id} due to error {e:?}")
162+
})?;
135163

136-
let response_data = response.json::<responses::beacon_block_response::Response>().await?;
164+
let response_data = response
165+
.json::<responses::beacon_block_response::Response>()
166+
.await
167+
.map_err(|e| anyhow!("Failed to fetch block with id {block_id} due to error {e:?}"))?;
137168

138169
let beacon_block = response_data.data.message;
139170

@@ -145,7 +176,7 @@ impl<C: Config> SyncCommitteeProver<C> {
145176
state_id: &str,
146177
) -> Result<NodeSyncCommittee, anyhow::Error> {
147178
let path = sync_committee_route(state_id);
148-
let full_url = self.generate_route(&path);
179+
let full_url = self.generate_route(&path)?;
149180

150181
let response = self.client.get(full_url).send().await?;
151182

@@ -162,7 +193,7 @@ impl<C: Config> SyncCommitteeProver<C> {
162193
validator_index: &str,
163194
) -> Result<Validator, anyhow::Error> {
164195
let path = validator_route(state_id, validator_index);
165-
let full_url = self.generate_route(&path);
196+
let full_url = self.generate_route(&path)?;
166197

167198
let response = self.client.get(full_url).send().await?;
168199

@@ -178,19 +209,27 @@ impl<C: Config> SyncCommitteeProver<C> {
178209
state_id: &str,
179210
) -> Result<BeaconStateType, anyhow::Error> {
180211
let path = beacon_state_route(state_id);
181-
let full_url = self.generate_route(&path);
212+
let full_url = self.generate_route(&path)?;
182213

183-
let response = self.client.get(full_url).send().await?;
214+
let response = self.client.get(full_url).send().await.map_err(|e| {
215+
anyhow!("Failed to fetch beacon state with id {state_id} due to error {e:?}")
216+
})?;
184217

185-
let response_data = response.json::<responses::beacon_state_response::Response>().await?;
218+
let response_data = response
219+
.json::<responses::beacon_state_response::Response>()
220+
.await
221+
.map_err(|e| {
222+
anyhow!("Failed to fetch beacon state with id {state_id} due to error {e:?}")
223+
})?;
186224

187225
let beacon_state = response_data.data;
188226

189227
Ok(beacon_state)
190228
}
191229

192-
fn generate_route(&self, path: &str) -> String {
193-
format!("{}{}", self.node_url.clone(), path)
230+
fn generate_route(&self, path: &str) -> Result<Url, anyhow::Error> {
231+
let url = Url::parse(&format!("{}{}", self.primary_url.clone(), path))?;
232+
Ok(url)
194233
}
195234

196235
/// Fetches the latest finality update that can be verified by (state_period..=state_period+1)
@@ -201,15 +240,15 @@ impl<C: Config> SyncCommitteeProver<C> {
201240
mut client_state: VerifierState,
202241
finality_checkpoint: Checkpoint,
203242
latest_block_id: Option<&str>,
204-
debug_target: &str,
205243
) -> Result<Option<VerifierStateUpdate>, anyhow::Error> {
206244
if finality_checkpoint.root == Node::default() ||
207245
client_state.latest_finalized_epoch >= finality_checkpoint.epoch
208246
{
247+
trace!(target: "sync-committee-prover", "No new epoch finalized yet {}", finality_checkpoint.epoch);
209248
return Ok(None);
210249
}
211250

212-
debug!(target: debug_target, "A new epoch has been finalized {}", finality_checkpoint.epoch);
251+
trace!(target: "sync-committee-prover", "A new epoch has been finalized {}", finality_checkpoint.epoch);
213252
// Find the highest block with the a threshhold number of sync committee signatures
214253
let latest_header = self.fetch_header(latest_block_id.unwrap_or("head")).await?;
215254
let latest_root = latest_header.clone().hash_tree_root()?;
@@ -230,7 +269,7 @@ impl<C: Config> SyncCommitteeProver<C> {
230269
let parent_block_finality_checkpoint =
231270
self.fetch_finalized_checkpoint(Some(&parent_state_id)).await?.finalized;
232271
if parent_block_finality_checkpoint.epoch <= client_state.latest_finalized_epoch {
233-
debug!(target: "prover", "Signature block search has reached an invalid epoch {} latest finalized_block_epoch {}", parent_block_finality_checkpoint.epoch, client_state.latest_finalized_epoch);
272+
trace!(target: "sync-committee-prover", "Search for a block with a valid sync committee signature has reached an invalid epoch {} latest_finalized_block_epoch: {}", parent_block_finality_checkpoint.epoch, client_state.latest_finalized_epoch);
234273
return Ok(None);
235274
}
236275

@@ -309,6 +348,7 @@ impl<C: Config> SyncCommitteeProver<C> {
309348
let mut block = loop {
310349
// Prevent an infinite loop
311350
if count == 100 {
351+
log::trace!("Prover could not find a suitable block for the sync committee: {period}, syncing will fail");
312352
return Err(anyhow!("Error fetching blocks from selected epoch"));
313353
}
314354

0 commit comments

Comments
 (0)