Skip to content

Commit df6b28a

Browse files
MujkicAhal3esegfault-magnet
authored
feat: logging blobs (#109)
Co-authored-by: hal3e <git@hal3e.io> Co-authored-by: segfault-magnet <ahmed.sagdati.ets@gmail.com>
1 parent b6656cc commit df6b28a

30 files changed

+535
-176
lines changed

Cargo.lock

+222-9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,13 @@ async-trait = { version = "0.1", default-features = false }
4141
c-kzg = { version = "1.0", default-features = false }
4242
clap = { version = "4.5", default-features = false }
4343
config = { version = "0.14", default-features = false }
44+
fs_extra = { version = "1.3", default-features = false }
45+
fuel-core-chain-config = { version = "0.31", default-features = false }
4446
fuel-core-client = { version = "0.31", default-features = false }
47+
fuel-core-types = { version = "0.31", default-features = false }
4548
fuel-crypto = { version = "0.55", default-features = false }
4649
futures = { version = "0.3", default-features = false }
50+
futures-util = { version = "0.3", default-features = false }
4751
hex = { version = "0.4", default-features = false }
4852
humantime = { version = "2.1", default-features = false }
4953
impl-tools = { version = "0.10.0", default-features = false }

committer/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ rust-version = { workspace = true }
1111

1212
[dependencies]
1313
actix-web = { workspace = true, features = ["macros"] }
14-
alloy-chains = { workspace = true, features = [ "serde" ] }
1514
clap = { workspace = true, features = ["default", "derive"] }
1615
config = { workspace = true, features = ["toml", "async"] }
1716
eth = { workspace = true }

committer/src/config.rs

+4-19
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use std::{net::Ipv4Addr, path::PathBuf, str::FromStr, time::Duration};
22

3-
use alloy_chains::NamedChain;
43
use clap::{command, Parser};
54
use eth::Address;
65
use serde::Deserialize;
@@ -16,8 +15,8 @@ pub struct Config {
1615

1716
impl Config {
1817
pub fn validate(&self) -> crate::errors::Result<()> {
19-
if let Some(blob_pool_wallet_key) = &self.eth.blob_pool_key_id {
20-
if blob_pool_wallet_key == &self.eth.main_key_id {
18+
if let Some(blob_pool_wallet_key) = &self.eth.blob_pool_key_arn {
19+
if blob_pool_wallet_key == &self.eth.main_key_arn {
2120
return Err(crate::errors::Error::Other(
2221
"Wallet key and blob pool wallet key must be different".to_string(),
2322
));
@@ -40,30 +39,16 @@ pub struct Fuel {
4039
#[derive(Debug, Clone, Deserialize)]
4140
pub struct Eth {
4241
/// The AWS KMS key ID authorized by the L1 bridging contracts to post block commitments.
43-
pub main_key_id: String,
42+
pub main_key_arn: String,
4443
/// The AWS KMS key ID for posting L2 state to L1.
45-
pub blob_pool_key_id: Option<String>,
44+
pub blob_pool_key_arn: Option<String>,
4645
/// URL to a Ethereum RPC endpoint.
4746
#[serde(deserialize_with = "parse_url")]
4847
pub rpc: Url,
49-
/// Chain id of the ethereum network.
50-
#[serde(deserialize_with = "deserialize_named_chain")]
51-
pub chain_id: NamedChain,
5248
/// Ethereum address of the fuel chain state contract.
5349
pub state_contract_address: Address,
5450
}
5551

56-
fn deserialize_named_chain<'de, D>(deserializer: D) -> Result<NamedChain, D::Error>
57-
where
58-
D: serde::Deserializer<'de>,
59-
{
60-
let chain_str: String = Deserialize::deserialize(deserializer).unwrap();
61-
NamedChain::from_str(&chain_str).map_err(|_| {
62-
let msg = format!("Failed to parse chain from '{chain_str}'");
63-
serde::de::Error::custom(msg)
64-
})
65-
}
66-
6752
fn parse_url<'de, D>(deserializer: D) -> Result<Url, D::Error>
6853
where
6954
D: serde::Deserializer<'de>,

committer/src/errors.rs

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl From<ports::fuel::Error> for Error {
5757
fn from(error: ports::fuel::Error) -> Self {
5858
match error {
5959
ports::fuel::Error::Network(e) => Self::Network(e),
60+
ports::fuel::Error::Other(e) => Self::Other(e),
6061
}
6162
}
6263
}

committer/src/main.rs

+7-8
Original file line numberDiff line numberDiff line change
@@ -79,26 +79,25 @@ async fn main() -> Result<()> {
7979

8080
// If the blob pool wallet key is set, we need to start
8181
// the state committer and state importer
82-
if config.eth.blob_pool_key_id.is_some() {
82+
if config.eth.blob_pool_key_arn.is_some() {
8383
let state_committer_handle = setup::state_committer(
8484
ethereum_rpc.clone(),
8585
storage.clone(),
86-
&metrics_registry,
8786
cancel_token.clone(),
8887
&config,
8988
);
9089

91-
let state_importer_handle = setup::state_importer(
92-
fuel_adapter,
90+
let state_importer_handle =
91+
setup::state_importer(fuel_adapter, storage.clone(), cancel_token.clone(), &config);
92+
93+
let state_listener_handle = setup::state_listener(
94+
ethereum_rpc,
9395
storage.clone(),
94-
&metrics_registry,
9596
cancel_token.clone(),
97+
&metrics_registry,
9698
&config,
9799
);
98100

99-
let state_listener_handle =
100-
setup::state_listener(ethereum_rpc, storage.clone(), cancel_token.clone(), &config);
101-
102101
handles.push(state_committer_handle);
103102
handles.push(state_importer_handle);
104103
handles.push(state_listener_handle);

committer/src/setup.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ pub fn block_committer(
7373
pub fn state_committer(
7474
l1: L1,
7575
storage: impl Storage + 'static,
76-
_registry: &Registry,
7776
cancel_token: CancellationToken,
7877
config: &config::Config,
7978
) -> tokio::task::JoinHandle<()> {
@@ -90,7 +89,6 @@ pub fn state_committer(
9089
pub fn state_importer(
9190
fuel: FuelApi,
9291
storage: impl Storage + 'static,
93-
_registry: &Registry,
9492
cancel_token: CancellationToken,
9593
config: &config::Config,
9694
) -> tokio::task::JoinHandle<()> {
@@ -109,11 +107,14 @@ pub fn state_listener(
109107
l1: L1,
110108
storage: impl Storage + 'static,
111109
cancel_token: CancellationToken,
110+
registry: &Registry,
112111
config: &config::Config,
113112
) -> tokio::task::JoinHandle<()> {
114113
let state_listener =
115114
services::StateListener::new(l1, storage, config.app.num_blocks_to_finalize_tx);
116115

116+
state_listener.register_metrics(registry);
117+
117118
schedule_polling(
118119
config.app.block_check_interval,
119120
state_listener,
@@ -133,10 +134,9 @@ pub async fn l1_adapter(
133134

134135
let l1 = L1::connect(
135136
config.eth.rpc.clone(),
136-
config.eth.chain_id.into(),
137137
config.eth.state_contract_address,
138-
config.eth.main_key_id.clone(),
139-
config.eth.blob_pool_key_id.clone(),
138+
config.eth.main_key_arn.clone(),
139+
config.eth.blob_pool_key_arn.clone(),
140140
internal_config.eth_errors_before_unhealthy,
141141
aws_client,
142142
)

configurations/development/config.toml

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
[eth]
2-
chain_id = "anvil"
32
state_contract_address = "0xDc64a140Aa3E981100a9becA4E685f962f0cF6C9"
43
rpc = "ws://localhost:8545"
54

@@ -11,7 +10,7 @@ block_producer_public_key = "0x73dc6cc8cc0041e4924954b35a71a22ccb520664c522198a6
1110
port = 8080
1211
host = "0.0.0.0"
1312
block_check_interval = "1s"
14-
num_blocks_to_finalize_tx = "12"
13+
num_blocks_to_finalize_tx = "3"
1514

1615
[app.db]
1716
host = "localhost"

e2e/Cargo.toml

+12-2
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,23 @@ walkdir = { workspace = true }
2525
zip = { workspace = true, features = ["deflate"] }
2626

2727
[dev-dependencies]
28-
alloy = { workspace = true, features = [ "signer-aws", "signer-mnemonic", "serde" ] }
29-
alloy-chains = { workspace = true }
28+
fs_extra = { workspace = true }
29+
alloy = { workspace = true, features = [
30+
"signer-aws",
31+
"signer-mnemonic",
32+
"serde",
33+
] }
3034
anyhow = { workspace = true, features = ["std"] }
3135
aws-sdk-kms = { workspace = true, features = ["rustls"] }
3236
aws-config = { workspace = true, features = ["rustls"] }
3337
eth = { workspace = true, features = ["test-helpers"] }
3438
fuel = { workspace = true, features = ["test-helpers"] }
39+
fuel-core-chain-config = { workspace = true, features = [
40+
"std",
41+
"test-helpers",
42+
] }
43+
fuel-core-types = { workspace = true }
44+
futures-util = { workspace = true }
3545
hex = { workspace = true }
3646
portpicker = { workspace = true }
3747
ports = { workspace = true, features = ["fuel", "l1"] }

e2e/src/committer.rs

+33-13
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ use url::Url;
77
#[derive(Default)]
88
pub struct Committer {
99
show_logs: bool,
10-
main_key_id: Option<String>,
11-
blob_key_id: Option<String>,
10+
main_key_arn: Option<String>,
11+
blob_key_arn: Option<String>,
1212
state_contract_address: Option<String>,
1313
eth_rpc: Option<Url>,
1414
fuel_rpc: Option<Url>,
@@ -36,10 +36,10 @@ impl Committer {
3636
let mut cmd = tokio::process::Command::new("fuel-block-committer");
3737
cmd.arg(config)
3838
.env("E2E_TEST_AWS_ENDPOINT", kms_url)
39-
.env("AWS_ACCESS_KEY_ID", "test")
4039
.env("AWS_REGION", "us-east-1")
40+
.env("AWS_ACCESS_KEY_ID", "test")
4141
.env("AWS_SECRET_ACCESS_KEY", "test")
42-
.env("COMMITTER__ETH__MAIN_KEY_ID", get_field!(main_key_id))
42+
.env("COMMITTER__ETH__MAIN_KEY_ARN", get_field!(main_key_arn))
4343
.env("COMMITTER__ETH__RPC", get_field!(eth_rpc).as_str())
4444
.env(
4545
"COMMITTER__ETH__STATE_CONTRACT_ADDRESS",
@@ -56,12 +56,11 @@ impl Committer {
5656
.env("COMMITTER__APP__DB__PORT", get_field!(db_port).to_string())
5757
.env("COMMITTER__APP__DB__DATABASE", get_field!(db_name))
5858
.env("COMMITTER__APP__PORT", unused_port.to_string())
59-
.env("COMMITTER__AWS__ALLOW_HTTP", "true")
6059
.current_dir(Path::new(env!("CARGO_MANIFEST_DIR")).parent().unwrap())
6160
.kill_on_drop(true);
6261

63-
if let Some(blob_wallet_key_id) = self.blob_key_id {
64-
cmd.env("COMMITTER__ETH__BLOB_POOL_KEY_ID", blob_wallet_key_id);
62+
if let Some(blob_wallet_key_arn) = self.blob_key_arn {
63+
cmd.env("COMMITTER__ETH__BLOB_POOL_KEY_ARN", blob_wallet_key_arn);
6564
}
6665

6766
let sink = if self.show_logs {
@@ -79,8 +78,8 @@ impl Committer {
7978
})
8079
}
8180

82-
pub fn with_main_key_id(mut self, wallet_id: String) -> Self {
83-
self.main_key_id = Some(wallet_id);
81+
pub fn with_main_key_arn(mut self, wallet_arn: String) -> Self {
82+
self.main_key_arn = Some(wallet_arn);
8483
self
8584
}
8685

@@ -89,8 +88,8 @@ impl Committer {
8988
self
9089
}
9190

92-
pub fn with_blob_key_id(mut self, blob_wallet_id: String) -> Self {
93-
self.blob_key_id = Some(blob_wallet_id);
91+
pub fn with_blob_key_arn(mut self, blob_wallet_arn: String) -> Self {
92+
self.blob_key_arn = Some(blob_wallet_arn);
9493
self
9594
}
9695

@@ -149,7 +148,28 @@ impl CommitterProcess {
149148
Ok(())
150149
}
151150

151+
pub async fn wait_for_committed_blob(&self) -> anyhow::Result<()> {
152+
loop {
153+
match self.fetch_latest_blob_block().await {
154+
Ok(_) => break,
155+
_ => {
156+
tokio::time::sleep(Duration::from_secs(1)).await;
157+
continue;
158+
}
159+
}
160+
}
161+
Ok(())
162+
}
163+
152164
async fn fetch_latest_committed_block(&self) -> anyhow::Result<u64> {
165+
self.fetch_metric_value("latest_committed_block").await
166+
}
167+
168+
async fn fetch_latest_blob_block(&self) -> anyhow::Result<u64> {
169+
self.fetch_metric_value("last_eth_block_w_blob").await
170+
}
171+
172+
async fn fetch_metric_value(&self, metric_name: &str) -> anyhow::Result<u64> {
153173
let response = reqwest::get(format!("http://localhost:{}/metrics", self.port))
154174
.await?
155175
.error_for_status()?
@@ -158,8 +178,8 @@ impl CommitterProcess {
158178

159179
let height_line = response
160180
.lines()
161-
.find(|line| line.starts_with("latest_committed_block"))
162-
.ok_or_else(|| anyhow::anyhow!("couldn't find latest_committed_block metric"))?;
181+
.find(|line| line.starts_with(metric_name))
182+
.ok_or_else(|| anyhow::anyhow!("couldn't find {} metric", metric_name))?;
163183

164184
Ok(height_line
165185
.split_whitespace()

e2e/src/eth_node.rs

+3-19
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,8 @@ use alloy::{
55
network::{EthereumWallet, TransactionBuilder},
66
providers::{Provider, ProviderBuilder, WsConnect},
77
rpc::types::TransactionRequest,
8-
signers::{
9-
local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner},
10-
Signer,
11-
},
8+
signers::local::{coins_bip39::English, MnemonicBuilder, PrivateKeySigner},
129
};
13-
use alloy_chains::NamedChain;
1410
use eth::Address;
1511
use ports::types::U256;
1612
use state_contract::CreateTransactions;
@@ -53,12 +49,7 @@ impl EthNode {
5349

5450
let child = cmd.spawn()?;
5551

56-
Ok(EthNodeProcess::new(
57-
child,
58-
unused_port,
59-
NamedChain::AnvilHardhat.into(),
60-
mnemonic,
61-
))
52+
Ok(EthNodeProcess::new(child, unused_port, mnemonic))
6253
}
6354

6455
pub fn with_show_logs(mut self, show_logs: bool) -> Self {
@@ -69,18 +60,16 @@ impl EthNode {
6960

7061
pub struct EthNodeProcess {
7162
_child: tokio::process::Child,
72-
chain_id: u64,
7363
port: u16,
7464
mnemonic: String,
7565
}
7666

7767
impl EthNodeProcess {
78-
fn new(child: tokio::process::Child, port: u16, chain_id: u64, mnemonic: String) -> Self {
68+
fn new(child: tokio::process::Child, port: u16, mnemonic: String) -> Self {
7969
Self {
8070
_child: child,
8171
mnemonic,
8272
port,
83-
chain_id,
8473
}
8574
}
8675

@@ -108,7 +97,6 @@ impl EthNodeProcess {
10897
.expect("Should generate a valid derivation path")
10998
.build()
11099
.expect("phrase to be correct")
111-
.with_chain_id(Some(self.chain_id))
112100
}
113101

114102
pub fn ws_url(&self) -> Url {
@@ -117,10 +105,6 @@ impl EthNodeProcess {
117105
.expect("URL to be well formed")
118106
}
119107

120-
pub fn chain_id(&self) -> u64 {
121-
self.chain_id
122-
}
123-
124108
pub async fn fund(&self, address: Address, amount: U256) -> anyhow::Result<()> {
125109
let wallet = EthereumWallet::from(self.wallet(0));
126110
let ws = WsConnect::new(self.ws_url());

0 commit comments

Comments
 (0)