diff --git a/src/cli/subcommands/sync_cmd.rs b/src/cli/subcommands/sync_cmd.rs index 3eef7126b616..ba6067d80778 100644 --- a/src/cli/subcommands/sync_cmd.rs +++ b/src/cli/subcommands/sync_cmd.rs @@ -25,6 +25,8 @@ pub enum SyncCommands { }, /// Check sync status Status, + /// Check snapshot progress + Progress, /// Check if a given block is marked bad, and for what reason CheckBad { #[arg(short)] @@ -42,6 +44,19 @@ pub enum SyncCommands { impl SyncCommands { pub async fn run(self, client: rpc::Client) -> anyhow::Result<()> { match self { + Self::Progress => { + let ticker = Ticker::new(0.., Duration::from_secs(5)); + for _ in ticker { + let progress = SyncStatus::call(&client, ()).await?; + if progress.message.is_empty() { + println!("Snapshot download is completed."); + return Ok(()); + } + + println!("Snapshot download is in progress: {}", progress.message); + } + Ok(()) + } Self::Wait { watch } => { let ticker = Ticker::new(0.., Duration::from_secs(1)); let mut stdout = stdout(); diff --git a/src/cli_shared/snapshot.rs b/src/cli_shared/snapshot.rs index 929a1d04a006..35e2bd2253e8 100644 --- a/src/cli_shared/snapshot.rs +++ b/src/cli_shared/snapshot.rs @@ -67,7 +67,7 @@ pub async fn fetch( .date_and_height_and_forest(); let filename = filename(vendor, chain, date, height, forest_format); - download_file_with_retry(&url, directory, &filename, DownloadFileOption::Resumable).await + download_file_with_retry(&url, directory, &filename, DownloadFileOption::Resumable, None).await } /// Returns diff --git a/src/daemon/context.rs b/src/daemon/context.rs new file mode 100644 index 000000000000..7225c3ddce16 --- /dev/null +++ b/src/daemon/context.rs @@ -0,0 +1,356 @@ +use std::cell::RefCell; +use std::future::Future; +use std::path::PathBuf; +use std::sync::Arc; +use anyhow::Context; +use dialoguer::console::Term; +use futures::FutureExt; +use fvm_shared4::address::Network; +use tokio::sync::RwLock; +use tracing::{info, warn}; +use crate::cli_shared::cli::CliOpts; +use crate::{Config, KeyStore, KeyStoreConfig, ENCRYPTED_KEYSTORE_NAME, FOREST_KEYSTORE_PHRASE_ENV, JWT_IDENTIFIER}; +use crate::auth::{create_token, generate_priv_key, ADMIN}; +use crate::chain::ChainStore; +use crate::cli_shared::chain_path; +use crate::daemon::bundle::load_actor_bundles; +use crate::daemon::db_util::{load_all_forest_cars}; +use crate::db::CAR_DB_DIR_NAME; +use crate::db::car::ManyCar; +use crate::db::db_engine::{db_root, open_db}; +use crate::db::parity_db::ParityDb; +use crate::genesis::read_genesis_header; +use crate::libp2p::{Keypair, PeerId}; +use crate::networks::ChainConfig; +use crate::rpc::sync::SnapshotTracker; +use crate::shim::address::CurrentNetwork; +use crate::state_manager::StateManager; + +pub struct AppContext { + pub net_keypair: Keypair, + pub p2p_peer_id: PeerId, + pub db: Arc, + pub db_meta_data: DbMetadata, + pub state_manager: Arc>, + pub keystore: Arc>, + pub admin_jwt: String, + pub network_name: String, + pub snapshot_tracker: Arc>>, +} + +impl AppContext { + pub async fn init(opts: &CliOpts, cfg: &Config) -> anyhow::Result { + let chain_cfg = get_chain_config_and_set_network(cfg); + let (net_keypair, p2p_peer_id) = get_or_create_p2p_keypair_and_peer_id(&cfg)?; + let (db, db_meta_data) = setup_db(&opts, &cfg).await?; + let state_manager = create_state_manager(&cfg, &db, &chain_cfg).await?; + let (keystore, admin_jwt) = load_or_create_keystore_and_configure_jwt(&opts, &cfg).await?; + let network_name = state_manager.get_network_name_from_genesis()?; + let snapshot_progress = Arc::new(parking_lot::RwLock::new(Some(SnapshotTracker::default()))); + Ok(Self { + net_keypair, + p2p_peer_id, + db, + db_meta_data, + state_manager, + keystore, + admin_jwt, + network_name, + snapshot_tracker: snapshot_progress, + }) + } + + pub fn create_snapshot_callback(&self) -> Option> { + let snapshot_progress = self.snapshot_tracker.clone(); + + Some(Arc::new(move |msg: String| { + let mut progress = snapshot_progress.write(); + if let Some(p) = progress.as_mut() { + p.set_message(msg.clone()); + } + })) + } +} + +fn get_chain_config_and_set_network(config: &Config) -> Arc { + let chain_config = Arc::new(ChainConfig::from_chain(config.chain())); + if chain_config.is_testnet() { + CurrentNetwork::set_global(Network::Testnet); + } + chain_config +} + +fn get_or_create_p2p_keypair_and_peer_id(config: &Config) -> anyhow::Result<(Keypair, PeerId)> { + let path = config.client.data_dir.join("libp2p"); + let keypair = crate::libp2p::keypair::get_or_create_keypair(&path)?; + let peer_id = keypair.public().to_peer_id(); + Ok((keypair, peer_id)) +} + +/// This may: +/// - create a [`KeyStore`] +/// - load a [`KeyStore`] +/// - ask a user for password input +async fn load_or_create_keystore(config: &Config) -> anyhow::Result { + use std::env::VarError; + + let passphrase_from_env = std::env::var(FOREST_KEYSTORE_PHRASE_ENV); + let require_encryption = config.client.encrypt_keystore; + let keystore_already_exists = config + .client + .data_dir + .join(ENCRYPTED_KEYSTORE_NAME) + .is_dir(); + + match (require_encryption, passphrase_from_env) { + // don't need encryption, we can implicitly create a keystore + (false, maybe_passphrase) => { + warn!("Forest has encryption disabled"); + if let Ok(_) | Err(VarError::NotUnicode(_)) = maybe_passphrase { + warn!( + "Ignoring passphrase provided in {} - encryption is disabled", + FOREST_KEYSTORE_PHRASE_ENV + ) + } + KeyStore::new(KeyStoreConfig::Persistent(config.client.data_dir.clone())) + .map_err(anyhow::Error::new) + } + + // need encryption, the user has provided the password through env + (true, Ok(passphrase)) => KeyStore::new(KeyStoreConfig::Encrypted( + config.client.data_dir.clone(), + passphrase, + )) + .map_err(anyhow::Error::new), + + // need encryption, we've not been given a password + (true, Err(error)) => { + // prompt for passphrase and try and load the keystore + + if let VarError::NotUnicode(_) = error { + // If we're ignoring the user's password, tell them why + warn!( + "Ignoring passphrase provided in {} - it's not utf-8", + FOREST_KEYSTORE_PHRASE_ENV + ) + } + + let data_dir = config.client.data_dir.clone(); + + match keystore_already_exists { + true => asyncify(move || input_password_to_load_encrypted_keystore(data_dir)) + .await + .context("Couldn't load keystore"), + false => { + let password = + asyncify(|| create_password("Create a password for Forest's keystore")) + .await?; + KeyStore::new(KeyStoreConfig::Encrypted(data_dir, password)) + .context("Couldn't create keystore") + } + } + } + } +} + +async fn load_or_create_keystore_and_configure_jwt( + opts: &CliOpts, + config: &Config, +) -> anyhow::Result<(Arc>, String)> { + let mut keystore = load_or_create_keystore(config).await?; + if keystore.get(JWT_IDENTIFIER).is_err() { + keystore.put(JWT_IDENTIFIER, generate_priv_key())?; + } + let admin_jwt = handle_admin_token(opts, &keystore)?; + let keystore = Arc::new(RwLock::new(keystore)); + Ok((keystore, admin_jwt)) +} + +fn maybe_migrate_db(config: &Config) { + // Try to migrate the database if needed. In case the migration fails, we fallback to creating a new database + // to avoid breaking the node. + let db_migration = crate::db::migration::DbMigration::new(config); + if let Err(e) = db_migration.migrate() { + warn!("Failed to migrate database: {e}"); + } +} + +pub type DbType = ManyCar>; + +pub(crate) struct DbMetadata { + db_root_dir: PathBuf, + forest_car_db_dir: PathBuf, +} + +impl DbMetadata { + pub(crate) fn get_root_dir(&self) -> PathBuf { + self.db_root_dir.clone() + } + + pub(crate) fn get_forest_car_db_dir(&self) -> PathBuf { + self.forest_car_db_dir.clone() + } +} + +/// This function configures database with below steps +/// - migrate database auto-magically on Forest version bump +/// - load parity-db +/// - load CAR database +/// - load actor bundles +async fn setup_db(opts: &CliOpts, config: &Config) -> anyhow::Result<(Arc, DbMetadata)> { + maybe_migrate_db(config); + let chain_data_path = chain_path(config); + let db_root_dir = db_root(&chain_data_path)?; + let db_writer = Arc::new(open_db(db_root_dir.clone(), config.db_config().clone())?); + let db = Arc::new(ManyCar::new(db_writer.clone())); + let forest_car_db_dir = db_root_dir.join(CAR_DB_DIR_NAME); + load_all_forest_cars(&db, &forest_car_db_dir)?; + if config.client.load_actors && !opts.stateless { + load_actor_bundles(&db, config.chain()).await?; + } + Ok(( + db, + DbMetadata { + db_root_dir, + forest_car_db_dir, + }, + )) +} + +async fn create_state_manager( + config: &Config, + db: &Arc, + chain_config: &Arc, +) -> anyhow::Result>> { + // Read Genesis file + // * When snapshot command implemented, this genesis does not need to be + // initialized + let genesis_header = read_genesis_header( + config.client.genesis_file.as_deref(), + chain_config.genesis_bytes(db).await?.as_deref(), + db, + ) + .await?; + + let chain_store = Arc::new(ChainStore::new( + Arc::clone(db), + Arc::new(db.clone()), + db.writer().clone(), + chain_config.clone(), + genesis_header.clone(), + )?); + + // Initialize StateManager + let state_manager = Arc::new(StateManager::new( + Arc::clone(&chain_store), + Arc::clone(chain_config), + Arc::new(config.sync.clone()), + )?); + + Ok(state_manager) +} + + +/// Run the closure on a thread where blocking is allowed +/// +/// # Panics +/// If the closure panics +fn asyncify(f: impl FnOnce() -> T + Send + 'static) -> impl Future +where + T: Send + 'static, +{ + tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") }) +} + +/// Prompts for password, looping until the [`KeyStore`] is successfully loaded. +/// +/// This code makes blocking syscalls. +fn input_password_to_load_encrypted_keystore(data_dir: PathBuf) -> dialoguer::Result { + let keystore = RefCell::new(None); + let term = Term::stderr(); + + // Unlike `dialoguer::Confirm`, `dialoguer::Password` doesn't fail if the terminal is not a tty + // so do that check ourselves. + // This means users can't pipe their password from stdin. + if !term.is_term() { + return Err(std::io::Error::new( + std::io::ErrorKind::NotConnected, + "cannot read password from non-terminal", + ) + .into()); + } + + dialoguer::Password::new() + .with_prompt("Enter the password for Forest's keystore") + .allow_empty_password(true) // let validator do validation + .validate_with(|input: &String| { + KeyStore::new(KeyStoreConfig::Encrypted(data_dir.clone(), input.clone())) + .map(|created| *keystore.borrow_mut() = Some(created)) + .context( + "Error: couldn't load keystore with this password. Try again or press Ctrl+C to abort.", + ) + }) + .interact_on(&term)?; + + Ok(keystore + .into_inner() + .expect("validation succeeded, so keystore must be emplaced")) +} + +/// Loops until the user provides two matching passwords. +/// +/// This code makes blocking syscalls +fn create_password(prompt: &str) -> dialoguer::Result { + let term = Term::stderr(); + + // Unlike `dialoguer::Confirm`, `dialoguer::Password` doesn't fail if the terminal is not a tty + // so do that check ourselves. + // This means users can't pipe their password from stdin. + if !term.is_term() { + return Err(std::io::Error::new( + std::io::ErrorKind::NotConnected, + "cannot read password from non-terminal", + ) + .into()); + } + dialoguer::Password::new() + .with_prompt(prompt) + .allow_empty_password(false) + .with_confirmation( + "Confirm password", + "Error: the passwords do not match. Try again or press Ctrl+C to abort.", + ) + .interact_on(&term) +} + +/// Generates, prints and optionally writes to a file the administrator JWT +/// token. +fn handle_admin_token(opts: &CliOpts, keystore: &KeyStore) -> anyhow::Result { + let ki = keystore.get(JWT_IDENTIFIER)?; + // Lotus admin tokens do not expire but Forest requires all JWT tokens to + // have an expiration date. So we set the expiration date to 100 years in + // the future to match user-visible behavior of Lotus. + let token_exp = chrono::Duration::days(365 * 100); + let token = create_token( + ADMIN.iter().map(ToString::to_string).collect(), + ki.private_key(), + token_exp, + )?; + info!("Admin token: {token}"); + if let Some(path) = opts.save_token.as_ref() { + if let Some(dir) = path.parent() { + if !dir.is_dir() { + std::fs::create_dir_all(dir).with_context(|| { + format!( + "Failed to create `--save-token` directory {}", + dir.display() + ) + })?; + } + } + std::fs::write(path, &token) + .with_context(|| format!("Failed to save admin token to {}", path.display()))?; + } + + Ok(token) +} \ No newline at end of file diff --git a/src/daemon/db_util.rs b/src/daemon/db_util.rs index 3f383fbaf531..b3a087ba0d6a 100644 --- a/src/daemon/db_util.rs +++ b/src/daemon/db_util.rs @@ -17,6 +17,7 @@ use std::{ path::{Path, PathBuf}, time, }; +use std::sync::Arc; use tokio::io::AsyncWriteExt; use tracing::{debug, info}; use url::Url; @@ -92,6 +93,7 @@ pub async fn import_chain_as_forest_car( from_path: &Path, forest_car_db_dir: &Path, import_mode: ImportMode, + callback: Option>, ) -> anyhow::Result<(PathBuf, Tipset)> { info!("Importing chain from snapshot at: {}", from_path.display()); @@ -112,6 +114,7 @@ pub async fn import_chain_as_forest_car( &url, &downloaded_car_temp_path, DownloadFileOption::Resumable, + callback, ) .await?; } else { @@ -390,7 +393,7 @@ mod test { let temp_db_dir = tempfile::Builder::new().tempdir()?; let (path, ts) = - import_chain_as_forest_car(file_path, temp_db_dir.path(), import_mode).await?; + import_chain_as_forest_car(file_path, temp_db_dir.path(), import_mode, None).await?; match import_mode { ImportMode::Symlink => { assert_eq!( diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index 95ce0a062fd1..fd44ab744cd0 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -4,10 +4,9 @@ pub mod bundle; pub mod db_util; pub mod main; +mod context; -use crate::auth::{create_token, generate_priv_key, ADMIN, JWT_IDENTIFIER}; use crate::blocks::Tipset; -use crate::chain::ChainStore; use crate::chain_sync::ChainMuxer; use crate::cli_shared::{car_db_path, snapshot}; use crate::cli_shared::{ @@ -18,21 +17,14 @@ use crate::daemon::db_util::{ import_chain_as_forest_car, load_all_forest_cars, populate_eth_mappings, }; use crate::db::car::ManyCar; -use crate::db::db_engine::{db_root, open_db}; -use crate::db::parity_db::ParityDb; use crate::db::SettingsStore; -use crate::db::{ttl::EthMappingCollector, MarkAndSweep, MemoryDB, SettingsExt, CAR_DB_DIR_NAME}; -use crate::genesis::read_genesis_header; -use crate::key_management::{ - KeyStore, KeyStoreConfig, ENCRYPTED_KEYSTORE_NAME, FOREST_KEYSTORE_PHRASE_ENV, -}; +use crate::db::{ttl::EthMappingCollector, MarkAndSweep, MemoryDB, SettingsExt}; use crate::libp2p::{Libp2pService, PeerManager}; use crate::message_pool::{MessagePool, MpoolConfig, MpoolRpcProvider}; use crate::networks::{self, ChainConfig}; use crate::rpc::eth::filter::EthEventHandler; use crate::rpc::start_rpc; use crate::rpc::RPCState; -use crate::shim::address::{CurrentNetwork, Network}; use crate::shim::clock::ChainEpoch; use crate::shim::version::NetworkVersion; use crate::state_manager::StateManager; @@ -42,19 +34,15 @@ use crate::utils::{ version::FOREST_VERSION_STRING, }; use anyhow::{bail, Context as _}; -use bundle::load_actor_bundles; -use dialoguer::console::Term; use dialoguer::theme::ColorfulTheme; use futures::{select, Future, FutureExt}; use fvm_ipld_blockstore::Blockstore; -use libp2p::identity::Keypair; -use libp2p::PeerId; use once_cell::sync::Lazy; use raw_sync_2::events::{Event, EventInit as _, EventState}; use shared_memory::ShmemConf; use std::path::Path; use std::time::Duration; -use std::{cell::RefCell, cmp, path::PathBuf, sync::Arc}; +use std::{cmp, sync::Arc}; use tempfile::{Builder, TempPath}; use tokio::{ net::TcpListener, @@ -62,10 +50,11 @@ use tokio::{ ctrl_c, unix::{signal, SignalKind}, }, - sync::{mpsc, RwLock}, + sync::mpsc, task::JoinSet, }; use tracing::{debug, info, warn}; +use crate::daemon::context::{AppContext, DbType}; static IPC_PATH: Lazy = Lazy::new(|| { Builder::new() @@ -145,13 +134,6 @@ pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Resul // Garbage collection interval, currently set at 10 hours. const GC_INTERVAL: Duration = Duration::from_secs(60 * 60 * 10); -type DbType = ManyCar>; - -struct DbMetadata { - db_root_dir: PathBuf, - forest_car_db_dir: PathBuf, -} - /// This function initialize Forest with below steps /// - increase file descriptor limit (for parity-db) /// - setup proofs parameter cache directory @@ -172,84 +154,20 @@ fn startup_init(opts: &CliOpts, config: &Config) -> anyhow::Result<()> { Ok(()) } -fn get_chain_config_and_set_network(config: &Config) -> Arc { - let chain_config = Arc::new(ChainConfig::from_chain(config.chain())); - if chain_config.is_testnet() { - CurrentNetwork::set_global(Network::Testnet); - } - chain_config -} - -fn get_or_create_p2p_keypair_and_peer_id(config: &Config) -> anyhow::Result<(Keypair, PeerId)> { - let path = config.client.data_dir.join("libp2p"); - let keypair = crate::libp2p::keypair::get_or_create_keypair(&path)?; - let peer_id = keypair.public().to_peer_id(); - Ok((keypair, peer_id)) -} - -async fn load_or_create_keystore_and_configure_jwt( - opts: &CliOpts, - config: &Config, -) -> anyhow::Result<(Arc>, String)> { - let mut keystore = load_or_create_keystore(config).await?; - if keystore.get(JWT_IDENTIFIER).is_err() { - keystore.put(JWT_IDENTIFIER, generate_priv_key())?; - } - let admin_jwt = handle_admin_token(opts, &keystore)?; - let keystore = Arc::new(RwLock::new(keystore)); - Ok((keystore, admin_jwt)) -} - -fn maybe_migrate_db(config: &Config) { - // Try to migrate the database if needed. In case the migration fails, we fallback to creating a new database - // to avoid breaking the node. - let db_migration = crate::db::migration::DbMigration::new(config); - if let Err(e) = db_migration.migrate() { - warn!("Failed to migrate database: {e}"); - } -} - -/// This function configures database with below steps -/// - migrate database auto-magically on Forest version bump -/// - load parity-db -/// - load CAR database -/// - load actor bundles -async fn setup_db(opts: &CliOpts, config: &Config) -> anyhow::Result<(Arc, DbMetadata)> { - maybe_migrate_db(config); - let chain_data_path = chain_path(config); - let db_root_dir = db_root(&chain_data_path)?; - let db_writer = Arc::new(open_db(db_root_dir.clone(), config.db_config().clone())?); - let db = Arc::new(ManyCar::new(db_writer.clone())); - let forest_car_db_dir = db_root_dir.join(CAR_DB_DIR_NAME); - load_all_forest_cars(&db, &forest_car_db_dir)?; - if config.client.load_actors && !opts.stateless { - load_actor_bundles(&db, config.chain()).await?; - } - Ok(( - db, - DbMetadata { - db_root_dir, - forest_car_db_dir, - }, - )) -} - async fn maybe_import_snapshot( opts: &CliOpts, config: &mut Config, - db: &DbType, - db_meta: &DbMetadata, - state_manager: &Arc>, + ctx: &AppContext, ) -> anyhow::Result<()> { - let chain_config = state_manager.chain_config(); + let chain_config = ctx.state_manager.chain_config(); // Sets the latest snapshot if needed for downloading later if config.client.snapshot_path.is_none() && !opts.stateless { maybe_set_snapshot_path( config, chain_config, - state_manager.chain_store().heaviest_tipset().epoch(), + ctx.state_manager.chain_store().heaviest_tipset().epoch(), opts.auto_download_snapshot, - &db_meta.db_root_dir, + &ctx.db_meta_data.get_root_dir(), ) .await?; } @@ -259,15 +177,16 @@ async fn maybe_import_snapshot( if let Some(path) = &config.client.snapshot_path { let (car_db_path, ts) = import_chain_as_forest_car( path, - &db_meta.forest_car_db_dir, + &ctx.db_meta_data.get_forest_car_db_dir(), config.client.import_mode, + ctx.create_snapshot_callback(), ) .await?; - db.read_only_files(std::iter::once(car_db_path.clone()))?; + ctx.db.read_only_files(std::iter::once(car_db_path.clone()))?; let ts_epoch = ts.epoch(); // Explicitly set heaviest tipset here in case HEAD_KEY has already been set // in the current setting store - state_manager.chain_store().set_heaviest_tipset(ts.into())?; + ctx.state_manager.chain_store().set_heaviest_tipset(ts.into())?; debug!( "Loaded car DB at {} and set current head to epoch {ts_epoch}", car_db_path.display(), @@ -282,53 +201,20 @@ async fn maybe_import_snapshot( let current_height = config .client .snapshot_head - .unwrap_or_else(|| state_manager.chain_store().heaviest_tipset().epoch()); + .unwrap_or_else(|| ctx.state_manager.chain_store().heaviest_tipset().epoch()); assert!(current_height.is_positive()); match validate_from.is_negative() { // allow --height=-1000 to scroll back from the current head true => { - state_manager.validate_range((current_height + validate_from)..=current_height)? + ctx.state_manager.validate_range((current_height + validate_from)..=current_height)? } - false => state_manager.validate_range(validate_from..=current_height)?, + false => ctx.state_manager.validate_range(validate_from..=current_height)?, } } Ok(()) } -async fn create_state_manager( - config: &Config, - db: &Arc, - chain_config: &Arc, -) -> anyhow::Result>> { - // Read Genesis file - // * When snapshot command implemented, this genesis does not need to be - // initialized - let genesis_header = read_genesis_header( - config.client.genesis_file.as_deref(), - chain_config.genesis_bytes(db).await?.as_deref(), - db, - ) - .await?; - - let chain_store = Arc::new(ChainStore::new( - Arc::clone(db), - Arc::new(db.clone()), - db.writer().clone(), - chain_config.clone(), - genesis_header.clone(), - )?); - - // Initialize StateManager - let state_manager = Arc::new(StateManager::new( - Arc::clone(&chain_store), - Arc::clone(chain_config), - Arc::new(config.sync.clone()), - )?); - - Ok(state_manager) -} - fn maybe_start_track_peak_rss_service(services: &mut JoinSet>, opts: &CliOpts) { if opts.track_peak_rss { let mem_stats_tracker = MemStatsTracker::default(); @@ -342,8 +228,7 @@ fn maybe_start_track_peak_rss_service(services: &mut JoinSet> async fn maybe_start_metrics_service( services: &mut JoinSet>, config: &Config, - db: &DbType, - state_manager: &StateManager, + ctx: &AppContext, ) -> anyhow::Result<()> { if config.client.enable_metrics_endpoint { // Start Prometheus server port @@ -355,7 +240,7 @@ async fn maybe_start_metrics_service( config.client.metrics_address ); let db_directory = crate::db::db_engine::db_root(&chain_path(config))?; - let db = db.writer().clone(); + let db = ctx.db.writer().clone(); services.spawn(async { crate::metrics::init_prometheus(prometheus_listener, db_directory, db) .await @@ -364,8 +249,8 @@ async fn maybe_start_metrics_service( crate::metrics::default_registry().register_collector(Box::new( networks::metrics::NetworkHeightCollector::new( - state_manager.chain_config().block_delay_secs, - state_manager.chain_store().genesis_block_header().timestamp, + ctx.state_manager.chain_config().block_delay_secs, + ctx.state_manager.chain_store().genesis_block_header().timestamp, ), )); } @@ -376,24 +261,23 @@ fn maybe_start_gc_service( services: &mut JoinSet>, opts: &CliOpts, config: &Config, - db: &DbType, - state_manager: &StateManager, + ctx: &AppContext, ) { if !opts.no_gc { let mut db_garbage_collector = { - let chain_store = state_manager.chain_store().clone(); + let chain_store = ctx.state_manager.chain_store().clone(); let depth = cmp::max( - state_manager.chain_config().policy.chain_finality * 2, + ctx.state_manager.chain_config().policy.chain_finality * 2, config.sync.recent_state_roots, ); let get_heaviest_tipset = Box::new(move || chain_store.heaviest_tipset()); MarkAndSweep::new( - db.writer().clone(), + ctx.db.writer().clone(), get_heaviest_tipset, depth, - Duration::from_secs(state_manager.chain_config().block_delay_secs as u64), + Duration::from_secs(ctx.state_manager.chain_config().block_delay_secs as u64), ) }; @@ -404,13 +288,13 @@ fn maybe_start_gc_service( fn maybe_start_eth_mapping_collection_service( services: &mut JoinSet>, config: &Config, - state_manager: &StateManager, + ctx: &AppContext, ) { if let Some(ttl) = config.client.eth_mapping_ttl { - let chain_store = state_manager.chain_store().clone(); - let chain_config = state_manager.chain_config().clone(); + let chain_store = ctx.state_manager.chain_store().clone(); + let chain_config = ctx.state_manager.chain_config().clone(); services.spawn(async move { - tracing::info!("Starting collector for eth_mappings"); + info!("Starting collector for eth_mappings"); let mut collector = EthMappingCollector::new( chain_store.db.clone(), chain_config.eth_chain_id, @@ -424,13 +308,11 @@ fn maybe_start_eth_mapping_collection_service( async fn create_p2p_service( services: &mut JoinSet>, config: &mut Config, - state_manager: &StateManager, - net_keypair: Keypair, - network_name: &str, + ctx: &AppContext, ) -> anyhow::Result> { // if bootstrap peers are not set, set them if config.network.bootstrap_peers.is_empty() { - config.network.bootstrap_peers = state_manager.chain_config().bootstrap_peers.clone(); + config.network.bootstrap_peers = ctx.state_manager.chain_config().bootstrap_peers.clone(); } let peer_manager = Arc::new(PeerManager::default()); @@ -438,11 +320,11 @@ async fn create_p2p_service( // Libp2p service setup let p2p_service = Libp2pService::new( config.network.clone(), - Arc::clone(state_manager.chain_store()), + Arc::clone(ctx.state_manager.chain_store()), peer_manager.clone(), - net_keypair, - network_name, - *state_manager.chain_store().genesis_block_header().cid(), + ctx.net_keypair.clone(), + ctx.network_name.as_str(), + *ctx.state_manager.chain_store().genesis_block_header().cid(), ) .await?; Ok(p2p_service) @@ -451,29 +333,27 @@ async fn create_p2p_service( fn create_chain_muxer( services: &mut JoinSet>, opts: &CliOpts, - db: &DbType, - state_manager: &Arc>, p2p_service: &Libp2pService, - network_name: String, + ctx: &AppContext, ) -> anyhow::Result>> { - let publisher = state_manager.chain_store().publisher(); - let provider = MpoolRpcProvider::new(publisher.clone(), state_manager.clone()); + let publisher = ctx.state_manager.chain_store().publisher(); + let provider = MpoolRpcProvider::new(publisher.clone(), ctx.state_manager.clone()); let mpool = Arc::new(MessagePool::new( provider, - network_name, + ctx.network_name.clone(), p2p_service.network_sender().clone(), - MpoolConfig::load_config(db.writer().as_ref())?, - state_manager.chain_config().clone(), + MpoolConfig::load_config(ctx.db.writer().as_ref())?, + ctx.state_manager.chain_config().clone(), services, )?); let chain_muxer = ChainMuxer::new( - state_manager.clone(), + ctx.state_manager.clone(), p2p_service.peer_manager().clone(), mpool.clone(), p2p_service.network_sender().clone(), p2p_service.network_receiver(), Arc::new(Tipset::from( - state_manager.chain_store().genesis_block_header(), + ctx.state_manager.chain_store().genesis_block_header(), )), opts.stateless, )?; @@ -490,19 +370,18 @@ fn start_chain_muxer_service( async fn maybe_start_health_check_service( services: &mut JoinSet>, config: &Config, - db: &DbType, - state_manager: &StateManager, p2p_service: &Libp2pService, chain_muxer: &ChainMuxer>, + ctx: &AppContext, ) -> anyhow::Result<()> { if config.client.enable_health_check { let forest_state = crate::health::ForestState { config: config.clone(), - chain_config: state_manager.chain_config().clone(), - genesis_timestamp: state_manager.chain_store().genesis_block_header().timestamp, + chain_config: ctx.state_manager.chain_config().clone(), + genesis_timestamp: ctx.state_manager.chain_store().genesis_block_header().timestamp, sync_state: chain_muxer.sync_state().clone(), peer_manager: p2p_service.peer_manager().clone(), - settings_store: db.writer().clone(), + settings_store: ctx.db.writer().clone(), }; let listener = tokio::net::TcpListener::bind(forest_state.config.client.healthcheck_address).await?; @@ -519,12 +398,10 @@ async fn maybe_start_health_check_service( fn maybe_start_rpc_service( services: &mut JoinSet>, config: &Config, - keystore: Arc>, - state_manager: &Arc>, chain_muxer: &ChainMuxer>, start_time: chrono::DateTime, - network_name: String, shutdown: mpsc::Sender<()>, + ctx: &AppContext, ) -> anyhow::Result<()> { if config.client.enable_rpc { let rpc_address = config.client.rpc_address; @@ -537,12 +414,15 @@ fn maybe_start_rpc_service( info!("JSON-RPC endpoint will listen at {rpc_address}"); let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events)); services.spawn({ - let state_manager = state_manager.clone(); + let state_manager = ctx.state_manager.clone(); let mpool = chain_muxer.mpool().clone(); let bad_blocks = chain_muxer.bad_blocks().clone(); let sync_state = chain_muxer.sync_state().clone(); let sync_network_context = chain_muxer.sync_network_context().clone(); let tipset_send = chain_muxer.tipset_sender().clone(); + let keystore = ctx.keystore.clone(); + let network_name = ctx.network_name.clone(); + let snapshot_tracker = ctx.snapshot_tracker.clone(); async move { start_rpc( RPCState { @@ -557,6 +437,7 @@ fn maybe_start_rpc_service( start_time, shutdown, tipset_send, + snapshot_tracker, }, rpc_address, filter_list, @@ -574,12 +455,10 @@ fn maybe_start_f3_service( services: &mut JoinSet>, opts: &CliOpts, config: &Config, - state_manager: &Arc>, - p2p_peer_id: PeerId, - admin_jwt: String, + ctx: &AppContext, ) { if !config.client.enable_rpc { - if crate::f3::is_sidecar_ffi_enabled(state_manager.chain_config()) { + if crate::f3::is_sidecar_ffi_enabled(ctx.state_manager.chain_config()) { tracing::warn!("F3 sidecar is enabled but not run because RPC is disabled. ") } return; @@ -587,6 +466,9 @@ fn maybe_start_f3_service( if !opts.halt_after_import && !opts.stateless { let rpc_address = config.client.rpc_address; + let state_manager = &ctx.state_manager; + let p2p_peer_id = ctx.p2p_peer_id.clone(); + let admin_jwt = ctx.admin_jwt.clone(); services.spawn_blocking({ crate::rpc::f3::F3_LEASE_MANAGER .set(crate::rpc::f3::F3LeaseManager::new( @@ -626,12 +508,11 @@ fn maybe_populate_eth_mappings_in_background( services: &mut JoinSet>, opts: &CliOpts, config: Config, - db: &DbType, - state_manager: &Arc>, + ctx: &AppContext, ) { - if !opts.stateless && !state_manager.chain_config().is_devnet() { - let state_manager = state_manager.clone(); - let settings = db.writer().clone(); + if !opts.stateless && !ctx.state_manager.chain_config().is_devnet() { + let state_manager = ctx.state_manager.clone(); + let settings = ctx.db.writer().clone(); services.spawn(async move { if let Err(err) = init_ethereum_mapping(state_manager, &settings, &config) { tracing::warn!("Init Ethereum mapping failed: {}", err) @@ -651,70 +532,60 @@ pub(super) async fn start( startup_init(&opts, &config)?; let mut services = JoinSet::new(); maybe_start_track_peak_rss_service(&mut services, &opts); - let chain_config = get_chain_config_and_set_network(&config); - let (net_keypair, p2p_peer_id) = get_or_create_p2p_keypair_and_peer_id(&config)?; - let (keystore, admin_jwt) = load_or_create_keystore_and_configure_jwt(&opts, &config).await?; - let (db, db_meta) = setup_db(&opts, &config).await?; - let state_manager = create_state_manager(&config, &db, &chain_config).await?; - let network_name = state_manager.get_network_name_from_genesis()?; - info!("Using network :: {}", get_actual_chain_name(&network_name)); + let ctx = AppContext::init(&opts, &config).await?; + info!("Using network :: {}", get_actual_chain_name(&ctx.network_name)); utils::misc::display_chain_logo(config.chain()); if opts.exit_after_init { return Ok(()); } - maybe_import_snapshot(&opts, &mut config, &db, &db_meta, &state_manager).await?; - if opts.halt_after_import { - // Cancel all async services - services.shutdown().await; - return Ok(()); - } - maybe_start_eth_mapping_collection_service(&mut services, &config, &state_manager); - maybe_start_metrics_service(&mut services, &config, &db, &state_manager).await?; - maybe_start_gc_service(&mut services, &opts, &config, &db, &state_manager); let p2p_service = create_p2p_service( &mut services, &mut config, - &state_manager, - net_keypair, - &network_name, - ) - .await?; + &ctx, + ).await?; + let chain_muxer = create_chain_muxer( &mut services, &opts, - &db, - &state_manager, &p2p_service, - network_name.clone(), + &ctx, )?; + + info!("Starting network:: {}", get_actual_chain_name(&ctx.network_name)); + maybe_start_rpc_service( &mut services, &config, - keystore, - &state_manager, &chain_muxer, start_time, - network_name, shutdown_send.clone(), + &ctx, )?; + + maybe_import_snapshot(&opts, &mut config, &ctx).await?; + if opts.halt_after_import { + // Cancel all async services + services.shutdown().await; + return Ok(()); + } + maybe_start_eth_mapping_collection_service(&mut services, &config, &ctx); + maybe_start_metrics_service(&mut services, &config, &ctx).await?; + maybe_start_gc_service(&mut services, &opts, &config, &ctx); maybe_start_f3_service( &mut services, &opts, &config, - &state_manager, - p2p_peer_id, - admin_jwt, + &ctx, ); maybe_start_health_check_service( &mut services, &config, - &db, - &state_manager, &p2p_service, &chain_muxer, + &ctx, ) .await?; - maybe_populate_eth_mappings_in_background(&mut services, &opts, config, &db, &state_manager); + maybe_populate_eth_mappings_in_background(&mut services, &opts, config, &ctx); if !opts.stateless { ensure_proof_params_downloaded().await?; } @@ -803,38 +674,6 @@ async fn maybe_set_snapshot_path( Ok(()) } -/// Generates, prints and optionally writes to a file the administrator JWT -/// token. -fn handle_admin_token(opts: &CliOpts, keystore: &KeyStore) -> anyhow::Result { - let ki = keystore.get(JWT_IDENTIFIER)?; - // Lotus admin tokens do not expire but Forest requires all JWT tokens to - // have an expiration date. So we set the expiration date to 100 years in - // the future to match user-visible behavior of Lotus. - let token_exp = chrono::Duration::days(365 * 100); - let token = create_token( - ADMIN.iter().map(ToString::to_string).collect(), - ki.private_key(), - token_exp, - )?; - info!("Admin token: {token}"); - if let Some(path) = opts.save_token.as_ref() { - if let Some(dir) = path.parent() { - if !dir.is_dir() { - std::fs::create_dir_all(dir).with_context(|| { - format!( - "Failed to create `--save-token` directory {}", - dir.display() - ) - })?; - } - } - std::fs::write(path, &token) - .with_context(|| format!("Failed to save admin token to {}", path.display()))?; - } - - Ok(token) -} - /// returns the first error with which any of the services end, or never returns at all // This should return anyhow::Result once the `Never` type is stabilized async fn propagate_error( @@ -860,72 +699,6 @@ pub fn get_actual_chain_name(internal_network_name: &str) -> &str { } } -/// This may: -/// - create a [`KeyStore`] -/// - load a [`KeyStore`] -/// - ask a user for password input -async fn load_or_create_keystore(config: &Config) -> anyhow::Result { - use std::env::VarError; - - let passphrase_from_env = std::env::var(FOREST_KEYSTORE_PHRASE_ENV); - let require_encryption = config.client.encrypt_keystore; - let keystore_already_exists = config - .client - .data_dir - .join(ENCRYPTED_KEYSTORE_NAME) - .is_dir(); - - match (require_encryption, passphrase_from_env) { - // don't need encryption, we can implicitly create a keystore - (false, maybe_passphrase) => { - warn!("Forest has encryption disabled"); - if let Ok(_) | Err(VarError::NotUnicode(_)) = maybe_passphrase { - warn!( - "Ignoring passphrase provided in {} - encryption is disabled", - FOREST_KEYSTORE_PHRASE_ENV - ) - } - KeyStore::new(KeyStoreConfig::Persistent(config.client.data_dir.clone())) - .map_err(anyhow::Error::new) - } - - // need encryption, the user has provided the password through env - (true, Ok(passphrase)) => KeyStore::new(KeyStoreConfig::Encrypted( - config.client.data_dir.clone(), - passphrase, - )) - .map_err(anyhow::Error::new), - - // need encryption, we've not been given a password - (true, Err(error)) => { - // prompt for passphrase and try and load the keystore - - if let VarError::NotUnicode(_) = error { - // If we're ignoring the user's password, tell them why - warn!( - "Ignoring passphrase provided in {} - it's not utf-8", - FOREST_KEYSTORE_PHRASE_ENV - ) - } - - let data_dir = config.client.data_dir.clone(); - - match keystore_already_exists { - true => asyncify(move || input_password_to_load_encrypted_keystore(data_dir)) - .await - .context("Couldn't load keystore"), - false => { - let password = - asyncify(|| create_password("Create a password for Forest's keystore")) - .await?; - KeyStore::new(KeyStoreConfig::Encrypted(data_dir, password)) - .context("Couldn't create keystore") - } - } - } - } -} - /// Run the closure on a thread where blocking is allowed /// /// # Panics @@ -937,67 +710,6 @@ where tokio::task::spawn_blocking(f).then(|res| async { res.expect("spawned task panicked") }) } -/// Prompts for password, looping until the [`KeyStore`] is successfully loaded. -/// -/// This code makes blocking syscalls. -fn input_password_to_load_encrypted_keystore(data_dir: PathBuf) -> dialoguer::Result { - let keystore = RefCell::new(None); - let term = Term::stderr(); - - // Unlike `dialoguer::Confirm`, `dialoguer::Password` doesn't fail if the terminal is not a tty - // so do that check ourselves. - // This means users can't pipe their password from stdin. - if !term.is_term() { - return Err(std::io::Error::new( - std::io::ErrorKind::NotConnected, - "cannot read password from non-terminal", - ) - .into()); - } - - dialoguer::Password::new() - .with_prompt("Enter the password for Forest's keystore") - .allow_empty_password(true) // let validator do validation - .validate_with(|input: &String| { - KeyStore::new(KeyStoreConfig::Encrypted(data_dir.clone(), input.clone())) - .map(|created| *keystore.borrow_mut() = Some(created)) - .context( - "Error: couldn't load keystore with this password. Try again or press Ctrl+C to abort.", - ) - }) - .interact_on(&term)?; - - Ok(keystore - .into_inner() - .expect("validation succeeded, so keystore must be emplaced")) -} - -/// Loops until the user provides two matching passwords. -/// -/// This code makes blocking syscalls -fn create_password(prompt: &str) -> dialoguer::Result { - let term = Term::stderr(); - - // Unlike `dialoguer::Confirm`, `dialoguer::Password` doesn't fail if the terminal is not a tty - // so do that check ourselves. - // This means users can't pipe their password from stdin. - if !term.is_term() { - return Err(std::io::Error::new( - std::io::ErrorKind::NotConnected, - "cannot read password from non-terminal", - ) - .into()); - } - dialoguer::Password::new() - .with_prompt(prompt) - .allow_empty_password(false) - .with_confirmation( - "Confirm password", - "Error: the passwords do not match. Try again or press Ctrl+C to abort.", - ) - .interact_on(&term) -} - fn init_ethereum_mapping( state_manager: Arc>, settings: &impl SettingsStore, diff --git a/src/rpc/methods/sync.rs b/src/rpc/methods/sync.rs index 57978db3e06b..e89e2589af13 100644 --- a/src/rpc/methods/sync.rs +++ b/src/rpc/methods/sync.rs @@ -1,6 +1,8 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +mod types; + use crate::blocks::{Block, FullTipset, GossipBlock}; use crate::libp2p::{IdentTopic, NetworkMessage, PUBSUB_BLOCK_STR}; use crate::lotus_json::{lotus_json_with_self, LotusJson}; @@ -13,6 +15,8 @@ use nunny::{vec as nonempty, Vec as NonEmpty}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::sync::Arc; +use tracing::info; +pub use types::*; use crate::chain; use crate::chain_sync::{SyncStage, TipsetValidator}; @@ -71,6 +75,26 @@ impl RpcMethod<0> for SyncState { } } +pub enum SyncStatus {} +impl RpcMethod<0> for SyncStatus { + const NAME: &'static str = "Filecoin.SyncStatus"; + const PARAM_NAMES: [&'static str; 0] = []; + const API_PATHS: ApiPaths = ApiPaths::V1; + const PERMISSION: Permission = Permission::Read; + type Params = (); + type Ok = SnapshotTracker; + async fn handle(ctx: Ctx, (): Self::Params) -> Result { + let mut msg = "".to_string(); + if let Some(p) = ctx.snapshot_tracker.read().clone() { + msg = p.message.clone() + } + + Ok(SnapshotTracker { + message: msg, + }) + } +} + pub enum SyncSubmitBlock {} impl RpcMethod<1> for SyncSubmitBlock { const NAME: &'static str = "Filecoin.SyncSubmitBlock"; @@ -232,6 +256,7 @@ mod tests { start_time, shutdown: mpsc::channel(1).0, // dummy for tests tipset_send, + snapshot_tracker: Arc::new(parking_lot::RwLock::new(Default::default())), }); (state, network_rx) } diff --git a/src/rpc/methods/sync/types.rs b/src/rpc/methods/sync/types.rs new file mode 100644 index 000000000000..e43d9e11017f --- /dev/null +++ b/src/rpc/methods/sync/types.rs @@ -0,0 +1,24 @@ +// Add to src/rpc/types.rs or a similar appropriate location + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use crate::lotus_json::lotus_json_with_self; + +#[derive(PartialEq, Default, Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct SnapshotTracker { + pub message: String, // The formatted progress message +} + +impl SnapshotTracker { + pub fn new() -> Self { + Self::default() + } + + pub fn set_message(&mut self, message: String) { + tracing::info!("tracking snapshot message: {}", message.clone()); + self.message = message; + } +} + +lotus_json_with_self!(SnapshotTracker); \ No newline at end of file diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index c6caa8614ff6..7eb8c90efe50 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -231,6 +231,7 @@ macro_rules! for_each_rpc_method { $callback!($crate::rpc::sync::SyncCheckBad); $callback!($crate::rpc::sync::SyncMarkBad); $callback!($crate::rpc::sync::SyncState); + $callback!($crate::rpc::sync::SyncStatus); $callback!($crate::rpc::sync::SyncSubmitBlock); // wallet vertical @@ -381,6 +382,7 @@ use tokio::sync::{mpsc, RwLock}; use tower::Service; use openrpc_types::{self, ParamStructure}; +use crate::rpc::sync::SnapshotTracker; pub const DEFAULT_PORT: u16 = 2345; @@ -408,6 +410,7 @@ pub struct RPCState { pub network_name: String, pub tipset_send: flume::Sender>, pub start_time: chrono::DateTime, + pub snapshot_tracker: Arc>>, pub shutdown: mpsc::Sender<()>, } diff --git a/src/tool/offline_server/server.rs b/src/tool/offline_server/server.rs index 38aced979acd..324557951b73 100644 --- a/src/tool/offline_server/server.rs +++ b/src/tool/offline_server/server.rs @@ -36,6 +36,7 @@ use tokio::{ task::JoinSet, }; use tracing::{info, warn}; +use crate::rpc::sync::SnapshotTracker; pub async fn start_offline_server( snapshot_files: Vec, @@ -129,6 +130,7 @@ pub async fn start_offline_server( let peer_manager = Arc::new(PeerManager::default()); let sync_network_context = SyncNetworkContext::new(network_send, peer_manager, state_manager.blockstore_owned()); + let rpc_state = RPCState { state_manager, keystore: Arc::new(RwLock::new(keystore)), @@ -141,6 +143,7 @@ pub async fn start_offline_server( start_time: chrono::Utc::now(), shutdown, tipset_send, + snapshot_tracker: Arc::new(parking_lot::RwLock::new(Some(SnapshotTracker::new()))), }; rpc_state.sync_state.write().set_stage(SyncStage::Idle); start_offline_rpc(rpc_state, rpc_port, shutdown_recv).await?; @@ -223,6 +226,7 @@ async fn handle_snapshots( &snapshot_url, &downloaded_snapshot_path, DownloadFileOption::Resumable, + None, ) .await?; info!("Snapshot downloaded"); diff --git a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs index 457bd2d0da82..f61a657f7189 100644 --- a/src/tool/subcommands/api_cmd/generate_test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/generate_test_snapshot.rs @@ -26,6 +26,7 @@ use openrpc_types::ParamStructure; use parking_lot::RwLock; use rpc::{eth::filter::EthEventHandler, RPCState, RpcMethod as _}; use tokio::{sync::mpsc, task::JoinSet}; +use crate::rpc::sync::SnapshotTracker; pub async fn run_test_with_dump( test_dump: &TestDump, @@ -135,6 +136,7 @@ async fn ctx( start_time: chrono::Utc::now(), shutdown, tipset_send, + snapshot_tracker: Arc::new(RwLock::new(Some(SnapshotTracker::new()))), }); rpc_state.sync_state.write().set_stage(SyncStage::Idle); Ok((rpc_state, network_rx, shutdown_recv)) diff --git a/src/tool/subcommands/api_cmd/test_snapshot.rs b/src/tool/subcommands/api_cmd/test_snapshot.rs index 0a2ec4f4d0b7..bd940cd822f5 100644 --- a/src/tool/subcommands/api_cmd/test_snapshot.rs +++ b/src/tool/subcommands/api_cmd/test_snapshot.rs @@ -26,6 +26,7 @@ use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use std::{path::Path, str::FromStr, sync::Arc}; use tokio::{sync::mpsc, task::JoinSet}; +use crate::rpc::sync::SnapshotTracker; #[derive(Default, Hash, Eq, PartialEq, Debug, Clone, Serialize, Deserialize)] pub struct Payload(#[serde(with = "crate::lotus_json::base64_standard")] pub Vec); @@ -162,6 +163,7 @@ async fn ctx( start_time: chrono::Utc::now(), shutdown, tipset_send, + snapshot_tracker: Arc::new(RwLock::new(Some(SnapshotTracker::new()))), }); rpc_state.sync_state.write().set_stage(SyncStage::Idle); Ok((rpc_state, network_rx, shutdown_recv)) diff --git a/src/utils/io/progress_log.rs b/src/utils/io/progress_log.rs index 79e1fdf5de95..19df8b6ec64c 100644 --- a/src/utils/io/progress_log.rs +++ b/src/utils/io/progress_log.rs @@ -77,10 +77,17 @@ impl tokio::io::AsyncRead for WithProgress { } impl WithProgress { - pub fn wrap_async_read(message: &str, read: S, total_items: u64) -> WithProgress { + pub fn wrap_sync_read_with_callback( + message: &str, + read: S, + total_items: u64, + callback: Option>, + ) -> WithProgress { WithProgress { inner: read, - progress: Progress::new(message).with_total(total_items), + progress: Progress::new(message) + .with_callback(callback) + .with_total(total_items), } } @@ -90,8 +97,8 @@ impl WithProgress { } } -#[derive(Debug, Clone)] -struct Progress { +#[derive(Clone)] +pub struct Progress { completed_items: u64, total_items: Option, last_logged_items: u64, @@ -99,6 +106,7 @@ struct Progress { last_logged: Instant, message: String, item_type: ItemType, + callback: Option>, } #[derive(Debug, Clone, Copy)] @@ -118,9 +126,15 @@ impl Progress { last_logged: now, message: message.into(), item_type: ItemType::Items, + callback: None, } } + fn with_callback(mut self, callback: Option>) -> Self { + self.callback = callback; + self + } + fn with_total(mut self, total: u64) -> Self { self.total_items = Some(total); self @@ -183,10 +197,15 @@ impl Progress { fn emit_log_if_required(&mut self) { let now = Instant::now(); if (now - self.last_logged) > UPDATE_FREQUENCY { + let msg = self.msg(now); + if let Some(cb) = &self.callback { + cb(msg.clone()); + } + tracing::info!( target: "forest::progress", "{}", - self.msg(now) + msg ); self.last_logged = now; self.last_logged_items = self.completed_items; @@ -194,6 +213,21 @@ impl Progress { } } +// Derive Debug for Progress, skipping the callback field +impl std::fmt::Debug for Progress { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Progress") + .field("completed_items", &self.completed_items) + .field("total_items", &self.total_items) + .field("last_logged_items", &self.last_logged_items) + .field("start", &self.start) + .field("last_logged", &self.last_logged) + .field("message", &self.message) + .field("item_type", &self.item_type) + .finish() + } +} + #[derive(Debug, Clone)] pub struct WithProgressRaw { sync: Arc>>, diff --git a/src/utils/net.rs b/src/utils/net.rs index db396aa7a0aa..a5107df7c19d 100644 --- a/src/utils/net.rs +++ b/src/utils/net.rs @@ -11,6 +11,7 @@ use futures::{AsyncWriteExt, TryStreamExt}; use once_cell::sync::Lazy; use reqwest::Response; use std::path::Path; +use std::sync::Arc; use tap::Pipe; use tokio::io::AsyncBufRead; use tokio_util::{ @@ -42,7 +43,7 @@ pub async fn download_ipfs_file_trustlessly( tempfile::NamedTempFile::new_in(destination.parent().unwrap_or_else(|| Path::new(".")))? .into_temp_path(); { - let mut reader = reader(url.as_str(), DownloadFileOption::Resumable) + let mut reader = reader(url.as_str(), DownloadFileOption::Resumable, None) .await? .compat(); let mut writer = futures::io::BufWriter::new(async_fs::File::create(&tmp).await?); @@ -68,6 +69,7 @@ pub async fn download_ipfs_file_trustlessly( pub async fn reader( location: &str, option: DownloadFileOption, + callback: Option>, ) -> anyhow::Result { // This isn't the cleanest approach in terms of error-handling, but it works. If the URL is // malformed it'll end up trying to treat it as a local filepath. If that fails - an error @@ -106,7 +108,7 @@ pub async fn reader( }; Ok(tokio::io::BufReader::new( - WithProgress::wrap_async_read("Loading", stream, content_length).bytes(), + WithProgress::wrap_sync_read_with_callback("Loading", stream, content_length, callback).bytes(), )) } diff --git a/src/utils/net/download_file.rs b/src/utils/net/download_file.rs index 1a0e032f097a..0ec12c5a71cb 100644 --- a/src/utils/net/download_file.rs +++ b/src/utils/net/download_file.rs @@ -11,6 +11,7 @@ use std::{ path::{Path, PathBuf}, time::Duration, }; +use std::sync::Arc; use url::Url; #[derive(Debug, Copy, Clone)] @@ -76,6 +77,7 @@ pub async fn download_file_with_cache( ) })?, option, + None, ) .await?; } @@ -133,6 +135,7 @@ pub async fn download_http( directory: &Path, filename: &str, option: DownloadFileOption, + callback: Option>, ) -> anyhow::Result { if !directory.is_dir() { std::fs::create_dir_all(directory)?; @@ -140,7 +143,7 @@ pub async fn download_http( let dst_path = directory.join(filename); let destination = dst_path.display(); tracing::info!(%url, %destination, "downloading snapshot"); - let mut reader = crate::utils::net::reader(url.as_str(), option).await?; + let mut reader = crate::utils::net::reader(url.as_str(), option, callback).await?; let tmp_dst_path = { // like `crdownload` for the chrome browser const DOWNLOAD_EXTENSION: &str = "frdownload"; @@ -171,13 +174,14 @@ pub async fn download_file_with_retry( directory: &Path, filename: &str, option: DownloadFileOption, + callback: Option>, ) -> anyhow::Result { Ok(retry( RetryArgs { timeout: None, ..Default::default() }, - || download_http(url, directory, filename, option), + || download_http(url, directory, filename, option, callback.clone()), ) .await?) } @@ -186,6 +190,7 @@ pub async fn download_to( url: &Url, destination: &Path, option: DownloadFileOption, + callback: Option>, ) -> anyhow::Result<()> { download_file_with_retry( url, @@ -200,6 +205,7 @@ pub async fn download_to( .and_then(OsStr::to_str) .with_context(|| format!("Error getting the file name of {}", destination.display()))?, option, + callback, ) .await?;