diff --git a/.gitignore b/.gitignore index 77cbb24..ae3e7e7 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ boot_node_addr.conf data *.swp .vscode +poi-radio-e2e-tests +*.json diff --git a/Cargo.lock b/Cargo.lock index 884c27a..43c84c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1304,6 +1304,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "derive-getters" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0122f262bf9c9a367829da84f808d9fb128c10ef283bbe7b0922a77cf07b2747" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -2273,7 +2284,7 @@ dependencies = [ [[package]] name = "graphcast-sdk" version = "0.3.1" -source = "git+https://github.com/graphops/graphcast-sdk#8353d14e0aa00e2120da3b261dd056913e49a55a" +source = "git+https://github.com/graphops/graphcast-sdk#20f220aed6540ef96283043036b42e106f435f69" dependencies = [ "anyhow", "async-graphql", @@ -2281,6 +2292,7 @@ dependencies = [ "chrono", "clap", "data-encoding", + "derive-getters", "dotenv", "ethers", "ethers-contract", @@ -3865,6 +3877,7 @@ dependencies = [ "chrono", "clap", "criterion", + "derive-getters", "dotenv", "ethers", "ethers-contract", diff --git a/Cargo.toml b/Cargo.toml index c7308ee..83402b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ chrono = "0.4" serde = "1.0.163" serde_json = "1.0.96" sha3 = "0.10.8" +derive-getters = "0.2.1" tokio = { version = "1.28.1", features = ["full", "rt"] } # tokio = { version = "1.28", features = ["full", "tracing", "rt", "parking_lot"] } anyhow = "1.0" diff --git a/benches/attestations.rs b/benches/attestations.rs index 58444d9..c855555 100644 --- a/benches/attestations.rs +++ b/benches/attestations.rs @@ -4,11 +4,13 @@ extern crate criterion; mod attestation { use criterion::{black_box, criterion_group, Criterion}; - use poi_radio::attestation::{ + use poi_radio::operator::attestation::{ compare_attestations, local_comparison_point, update_blocks, Attestation, }; - use std::{collections::HashMap, sync::Arc}; - use tokio::{runtime::Runtime, sync::Mutex as AsyncMutex, task}; + use std::{ + collections::HashMap, + sync::{Arc, Mutex as SyncMutex}, + }; criterion_group!( benches, @@ -85,18 +87,15 @@ mod attestation { black_box(remote_attestations.insert("my-awesome-hash".to_string(), remote_blocks)); black_box(local_attestations.insert("my-awesome-hash".to_string(), local_blocks)); - let rt = Runtime::new().unwrap(); - rt.block_on(async { - c.bench_function("compare_attestations", |b| { - b.iter(|| { - task::spawn(compare_attestations( - 42, - black_box(remote_attestations.clone()), - black_box(Arc::new(AsyncMutex::new(local_attestations.clone()))), - "my-awesome-hash", - )) - }) - }); + c.bench_function("compare_attestations", |b| { + b.iter(|| { + compare_attestations( + 42, + black_box(remote_attestations.clone()), + black_box(Arc::new(SyncMutex::new(local_attestations.clone()))), + "my-awesome-hash", + ) + }) }); } @@ -131,19 +130,11 @@ mod attestation { black_box(HashMap::new()); black_box(local_attestations.insert("hash".to_string(), local_blocks.clone())); black_box(local_attestations.insert("hash2".to_string(), local_blocks)); - let local = black_box(Arc::new(AsyncMutex::new(local_attestations))); - - let rt = Runtime::new().unwrap(); - rt.block_on(async { - c.bench_function("comparison_point", |b| { - b.iter(|| { - task::spawn(local_comparison_point( - black_box(local.clone()), - "hash".to_string(), - 120, - )) - }) - }); + let local: Arc>>> = + black_box(Arc::new(SyncMutex::new(local_attestations))); + + c.bench_function("comparison_point", |b| { + b.iter(|| local_comparison_point(black_box(local.clone()), "hash".to_string(), 120)) }); } } diff --git a/benches/gossips.rs b/benches/gossips.rs index 041e7f9..f7222af 100644 --- a/benches/gossips.rs +++ b/benches/gossips.rs @@ -1,28 +1,25 @@ use criterion::async_executor::FuturesExecutor; use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use poi_radio::operator::RadioOperator; + use rand::{thread_rng, Rng}; use secp256k1::SecretKey; use std::collections::HashMap; -use std::sync::{Arc, Mutex as SyncMutex}; -use tokio::sync::Mutex as AsyncMutex; use graphcast_sdk::networks::NetworkName; use graphcast_sdk::{BlockPointer, NetworkPointer}; -use poi_radio::attestation::LocalAttestationsMap; -use poi_radio::operation::gossip_poi; -use poi_radio::{config::Config, CONFIG}; +use poi_radio::config::Config; fn gossip_poi_bench(c: &mut Criterion) { let identifiers = black_box(vec!["identifier1".to_string(), "identifier2".to_string()]); - let network_chainhead_blocks: Arc>> = - black_box(Arc::new(AsyncMutex::new(Default::default()))); + let network_chainhead_blocks: HashMap = + black_box(Default::default()); let subgraph_network_latest_blocks: HashMap = black_box(Default::default()); - let local_attestations: Arc> = - black_box(Arc::new(AsyncMutex::new(Default::default()))); let pk = black_box(generate_random_private_key()); let config = black_box(Config { + radio_name: String::from("test"), graph_node_endpoint: String::from("http://localhost:8030/graphql"), private_key: Some(pk.display_secret().to_string()), mnemonic: None, @@ -48,24 +45,24 @@ fn gossip_poi_bench(c: &mut Criterion) { discord_webhook: None, telegram_token: None, telegram_chat_id: None, - metrics_host: None, + metrics_host: String::from("0.0.0.0"), metrics_port: None, - server_host: None, + server_host: String::from("0.0.0.0"), server_port: None, log_format: String::from("pretty"), persistence_file_path: None, }); - _ = black_box(CONFIG.set(Arc::new(SyncMutex::new(config)))); c.bench_function("gossip_poi", move |b| { b.to_async(FuturesExecutor).iter(|| async { - gossip_poi( - identifiers.clone(), - &network_chainhead_blocks, - &subgraph_network_latest_blocks, - local_attestations.clone(), - ) - .await + RadioOperator::new(config.clone()) + .await + .gossip_poi( + identifiers.clone(), + &network_chainhead_blocks, + &subgraph_network_latest_blocks, + ) + .await }) }); } diff --git a/src/config.rs b/src/config.rs index f010241..cfe6d96 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,7 +1,12 @@ +use std::collections::HashSet; + +use autometrics::autometrics; use clap::Parser; +use derive_getters::Getters; use ethers::signers::WalletError; use graphcast_sdk::{ build_wallet, + callbook::CallBook, graphcast_agent::{GraphcastAgent, GraphcastAgentConfig, GraphcastAgentError}, graphcast_id_address, graphql::{ @@ -12,17 +17,18 @@ use graphcast_sdk::{ use serde::{Deserialize, Serialize}; use tracing::{debug, info}; -use crate::radio_name; use crate::state::PersistedState; +use crate::{active_allocation_hashes, syncing_deployment_hashes}; -#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize)] +#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize, Default)] pub enum CoverageLevel { Minimal, + #[default] OnChain, Comprehensive, } -#[derive(Clone, Debug, Parser, Serialize, Deserialize)] +#[derive(Clone, Debug, Parser, Serialize, Deserialize, Getters, Default)] #[clap( name = "poi-radio", about = "Cross-check POIs with other Indexer in real time", @@ -198,10 +204,11 @@ pub struct Config { #[clap( long, value_name = "METRICS_HOST", - help = "If set, the Radio will expose Prometheus metrics on the given host (off by default). This requires having a local Prometheus server running and scraping metrics on the given port.", + default_value = "0.0.0.0", + help = "If port is set, the Radio will expose Prometheus metrics on the given host. This requires having a local Prometheus server running and scraping metrics on the given port.", env = "METRICS_HOST" )] - pub metrics_host: Option, + pub metrics_host: String, #[clap( long, value_name = "METRICS_PORT", @@ -212,10 +219,11 @@ pub struct Config { #[clap( long, value_name = "SERVER_HOST", - help = "If set, the Radio will expose API service on the given host (off by default).", + default_value = "0.0.0.0", + help = "If port is set, the Radio will expose API service on the given host.", env = "SERVER_HOST" )] - pub server_host: Option, + pub server_host: String, #[clap( long, value_name = "SERVER_PORT", @@ -240,6 +248,13 @@ pub struct Config { default_value = "pretty" )] pub log_format: String, + #[clap( + long, + value_name = "RADIO_NAME", + env = "RADIO_NAME", + default_value = "poi-radio" + )] + pub radio_name: String, } impl Config { @@ -275,14 +290,13 @@ impl Config { pub async fn to_graphcast_agent_config( &self, - radio_name: &'static str, ) -> Result { let wallet_key = self.wallet_input().unwrap().to_string(); let topics = self.topics.clone(); GraphcastAgentConfig::new( wallet_key, - radio_name, + self.radio_name.clone(), self.registry_subgraph.clone(), self.network_subgraph.clone(), self.graph_node_endpoint.clone(), @@ -299,15 +313,18 @@ impl Config { pub async fn basic_info(&self) -> Result<(String, f32), QueryError> { // Using unwrap directly as the query has been ran in the set-up validation - let wallet = build_wallet(self.wallet_input().unwrap()).unwrap(); + let wallet = build_wallet( + self.wallet_input() + .map_err(|e| QueryError::Other(e.into()))?, + ) + .map_err(|e| QueryError::Other(e.into()))?; // The query here must be Ok but so it is okay to panic here // Alternatively, make validate_set_up return wallet, address, and stake let my_address = query_registry_indexer( self.registry_subgraph.to_string(), graphcast_id_address(&wallet), ) - .await - .unwrap(); + .await?; let my_stake = query_network_subgraph(self.network_subgraph.to_string(), my_address.clone()) .await @@ -339,9 +356,56 @@ impl Config { } pub async fn create_graphcast_agent(&self) -> Result { - let config = self.to_graphcast_agent_config(radio_name()).await.unwrap(); + let config = self.to_graphcast_agent_config().await.unwrap(); GraphcastAgent::new(config).await } + + pub fn callbook(&self) -> CallBook { + CallBook::new( + self.graph_node_endpoint.clone(), + self.registry_subgraph.clone(), + self.network_subgraph.clone(), + ) + } + + /// Generate a set of unique topics along with given static topics + #[autometrics] + pub async fn generate_topics(&self, indexer_address: String) -> Vec { + let static_topics = HashSet::from_iter(self.topics().to_vec()); + let topics = match self.coverage { + CoverageLevel::Minimal => static_topics, + CoverageLevel::OnChain => { + let mut topics: HashSet = active_allocation_hashes( + self.callbook().graph_network(), + indexer_address.clone(), + ) + .await + .into_iter() + .collect(); + topics.extend(static_topics); + topics + } + CoverageLevel::Comprehensive => { + let active_topics: HashSet = active_allocation_hashes( + self.callbook().graph_network(), + indexer_address.clone(), + ) + .await + .into_iter() + .collect(); + let mut additional_topics: HashSet = + syncing_deployment_hashes(self.graph_node_endpoint()) + .await + .into_iter() + .collect(); + + additional_topics.extend(active_topics); + additional_topics.extend(static_topics); + additional_topics + } + }; + topics.into_iter().collect::>() + } } #[derive(Debug, thiserror::Error)] diff --git a/src/graphql/mod.rs b/src/graphql/mod.rs index eef4097..acc1a91 100644 --- a/src/graphql/mod.rs +++ b/src/graphql/mod.rs @@ -1,8 +1,8 @@ +use graphcast_sdk::graphql::QueryError; use graphql_client::{GraphQLQuery, Response}; use serde_derive::{Deserialize, Serialize}; // Maybe later on move graphql to SDK as the queries are pretty standarded -use graphcast_sdk::graphql::QueryError; /// Derived GraphQL Query to Proof of Indexing #[derive(GraphQLQuery, Serialize, Deserialize, Debug)] diff --git a/src/lib.rs b/src/lib.rs index e695091..658b852 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,5 @@ use async_graphql::{Error, ErrorExtensions, SimpleObject}; - use autometrics::autometrics; -use config::{Config, CoverageLevel}; use ethers_contract::EthAbiType; use ethers_core::types::transaction::eip712::Eip712; use ethers_derive_eip712::*; @@ -9,36 +7,32 @@ use once_cell::sync::OnceCell; use prost::Message; use serde::{Deserialize, Serialize}; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, sync::{ atomic::{AtomicBool, Ordering}, Arc, Mutex as SyncMutex, }, }; use tokio::signal; -use tracing::{error, trace}; +use tracing::error; use graphcast_sdk::{ - graphcast_agent::GraphcastAgentError, graphql::client_graph_node::get_indexing_statuses, + graphcast_agent::GraphcastAgentError, + graphql::{client_graph_node::get_indexing_statuses, QueryError}, }; use graphcast_sdk::{ - graphcast_agent::{ - message_typing::GraphcastMessage, waku_handling::WakuHandlingError, GraphcastAgent, - }, + graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent}, graphql::client_network::query_network_subgraph, networks::NetworkName, BlockPointer, }; -use crate::attestation::AttestationError; -use crate::metrics::{CACHED_MESSAGES, VALIDATED_MESSAGES}; +use crate::operator::attestation::AttestationError; -pub mod attestation; pub mod config; pub mod graphql; pub mod metrics; -pub mod notifier; -pub mod operation; +pub mod operator; pub mod server; pub mod state; @@ -46,17 +40,7 @@ pub type MessagesVec = OnceCell = OnceCell::new(); - -/// A global static (singleton) instance of A GraphcastMessage vector. -/// It is used to save incoming messages after they've been validated, in order -/// defer their processing for later, because async code is required for the processing but -/// it is not allowed in the handler itself. -pub static MESSAGES: OnceCell>>>> = - OnceCell::new(); - -/// Radio's global config -pub static CONFIG: OnceCell>> = OnceCell::new(); +pub static GRAPHCAST_AGENT: OnceCell> = OnceCell::new(); pub fn radio_name() -> &'static str { "poi-radio" @@ -89,33 +73,6 @@ impl RadioPayloadMessage { } } -/// Custom callback for handling the validated GraphcastMessage, in this case we only save the messages to a local store -/// to process them at a later time. This is required because for the processing we use async operations which are not allowed -/// in the handler. -#[autometrics] -pub fn radio_msg_handler( -) -> impl Fn(Result, WakuHandlingError>) { - |msg: Result, WakuHandlingError>| { - // TODO: Handle the error case by incrementing a Prometheus "error" counter - if let Ok(msg) = msg { - trace!(msg = tracing::field::debug(&msg), "Received message"); - let id = msg.identifier.clone(); - VALIDATED_MESSAGES.with_label_values(&[&id]).inc(); - MESSAGES.get().unwrap().lock().unwrap().push(msg); - CACHED_MESSAGES.with_label_values(&[&id]).set( - MESSAGES - .get() - .unwrap() - .lock() - .unwrap() - .len() - .try_into() - .unwrap(), - ); - } - } -} - /// Generate default topics that is operator address resolved to indexer address /// and then its active on-chain allocations -> function signature should just return /// A vec of strings for subtopics @@ -151,49 +108,6 @@ pub async fn syncing_deployment_hashes( .collect::>() } -/// Generate a set of unique topics along with given static topics -#[autometrics] -pub async fn generate_topics( - coverage: CoverageLevel, - network_subgraph: String, - indexer_address: String, - graph_node_endpoint: String, - static_topics: &Vec, -) -> Vec { - match coverage { - CoverageLevel::Minimal => static_topics.to_vec(), - CoverageLevel::OnChain => { - let mut topics = active_allocation_hashes(&network_subgraph, indexer_address).await; - for topic in static_topics { - if !topics.contains(topic) { - topics.push(topic.clone()); - } - } - topics - } - CoverageLevel::Comprehensive => { - let active_topics: HashSet = - active_allocation_hashes(&network_subgraph, indexer_address) - .await - .into_iter() - .collect(); - let additional_topics: HashSet = - syncing_deployment_hashes(&graph_node_endpoint) - .await - .into_iter() - .collect(); - - let mut combined_topics: Vec = static_topics.clone(); - combined_topics.extend( - active_topics - .into_iter() - .chain(additional_topics.into_iter()), - ); - combined_topics - } - } -} - /// This function returns the string representation of a set of network mapped to their chainhead blocks #[autometrics] pub fn chainhead_block_str( @@ -249,6 +163,8 @@ pub enum OperationError { CompareTrigger(String, u64, String), #[error("Agent encountered problems: {0}")] Agent(GraphcastAgentError), + #[error("Failed to query: {0}")] + Query(QueryError), #[error("Attestation failure: {0}")] Attestation(AttestationError), #[error("Others: {0}")] @@ -273,80 +189,3 @@ impl ErrorExtensions for OperationError { Error::new(format!("{}", self)) } } - -pub fn clear_all_messages() { - _ = MESSAGES.set(Arc::new(SyncMutex::new(vec![]))); -} - -#[cfg(test)] -mod tests { - use super::*; - - const NETWORK: NetworkName = NetworkName::Goerli; - - #[test] - fn test_add_message() { - _ = MESSAGES.set(Arc::new(SyncMutex::new(Vec::new()))); - let mut messages = MESSAGES.get().unwrap().lock().unwrap(); - - let hash: String = "QmWECgZdP2YMcV9RtKU41GxcdW8EGYqMNoG98ubu5RGN6U".to_string(); - let content: String = - "0xa6008cea5905b8b7811a68132feea7959b623188e2d6ee3c87ead7ae56dd0eae".to_string(); - let nonce: i64 = 123321; - let block_number: u64 = 0; - let block_hash: String = "0xblahh".to_string(); - - let radio_msg = RadioPayloadMessage::new(hash.clone(), content); - let sig: String = "4be6a6b7f27c4086f22e8be364cbdaeddc19c1992a42b08cbe506196b0aafb0a68c8c48a730b0e3155f4388d7cc84a24b193d091c4a6a4e8cd6f1b305870fae61b".to_string(); - let msg = GraphcastMessage::new( - hash, - Some(radio_msg), - nonce, - NETWORK, - block_number, - block_hash, - sig, - ) - .expect("Shouldn't get here since the message is purposefully constructed for testing"); - - assert!(messages.is_empty()); - - messages.push(msg); - assert_eq!( - messages.first().unwrap().identifier, - "QmWECgZdP2YMcV9RtKU41GxcdW8EGYqMNoG98ubu5RGN6U".to_string() - ); - } - - #[test] - fn test_delete_messages() { - _ = MESSAGES.set(Arc::new(SyncMutex::new(Vec::new()))); - - let mut messages = MESSAGES.get().unwrap().lock().unwrap(); - - let hash: String = "QmWECgZdP2YMcV9RtKU41GxcdW8EGYqMNoG98ubu5RGN6U".to_string(); - let content: String = - "0xa6008cea5905b8b7811a68132feea7959b623188e2d6ee3c87ead7ae56dd0eae".to_string(); - let nonce: i64 = 123321; - let block_number: u64 = 0; - let block_hash: String = "0xblahh".to_string(); - let radio_msg = RadioPayloadMessage::new(hash.clone(), content); - let sig: String = "4be6a6b7f27c4086f22e8be364cbdaeddc19c1992a42b08cbe506196b0aafb0a68c8c48a730b0e3155f4388d7cc84a24b193d091c4a6a4e8cd6f1b305870fae61b".to_string(); - let msg = GraphcastMessage::new( - hash, - Some(radio_msg), - nonce, - NETWORK, - block_number, - block_hash, - sig, - ) - .expect("Shouldn't get here since the message is purposefully constructed for testing"); - - messages.push(msg); - assert!(!messages.is_empty()); - - messages.clear(); - assert!(messages.is_empty()); - } -} diff --git a/src/main.rs b/src/main.rs index de1ba7d..b76d9b4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,39 +1,6 @@ use dotenv::dotenv; -use poi_radio::attestation::process_comparison_results; -use poi_radio::notifier::Notifier; -use poi_radio::radio_name; -use std::collections::{HashMap, HashSet}; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex as SyncMutex, -}; -use std::thread::sleep; -use tokio::{ - sync::Mutex as AsyncMutex, - time::{interval, timeout, Duration}, -}; -use tracing::{debug, error, info, trace, warn}; +use poi_radio::{config::Config, operator::RadioOperator}; -use graphcast_sdk::{ - graphql::client_graph_node::{ - get_indexing_statuses, update_chainhead_blocks, update_network_chainheads, - }, - networks::NetworkName, - BlockPointer, -}; -use poi_radio::{ - attestation::{log_gossip_summary, LocalAttestationsMap}, - chainhead_block_str, - config::Config, - generate_topics, - metrics::handle_serve_metrics, - operation::{compare_poi, gossip_poi}, - radio_msg_handler, - server::run_server, - CONFIG, GRAPHCAST_AGENT, MESSAGES, -}; - -#[macro_use] extern crate partial_application; #[tokio::main] @@ -43,263 +10,12 @@ async fn main() { // Parse basic configurations let radio_config = Config::args(); - // Set up Prometheus metrics url if configured - if let Some(port) = radio_config.metrics_port { - tokio::spawn(handle_serve_metrics( - radio_config - .metrics_host - .clone() - .unwrap_or(String::from("0.0.0.0")), - port, - )); - } - - debug!("Initializing Graphcast Agent"); - _ = GRAPHCAST_AGENT.set( - radio_config - .create_graphcast_agent() - .await - .expect("Initialize Graphcast agent"), - ); - debug!("Initialized Graphcast Agent"); - - // Initialize program state - _ = CONFIG.set(Arc::new(SyncMutex::new(radio_config.clone()))); - let state = radio_config.init_radio_state().await; - _ = MESSAGES.set(state.remote_messages()); - let local_attestations: Arc> = - Arc::new(AsyncMutex::new(state.local_attestations())); - - let (my_address, _) = radio_config.basic_info().await.unwrap(); - let topic_coverage = radio_config.coverage.clone(); - let topic_network = radio_config.network_subgraph.clone(); - let topic_graph_node = radio_config.graph_node_endpoint.clone(); - let topic_static = &radio_config.topics.clone(); - let generate_topics = partial!(generate_topics => topic_coverage.clone(), topic_network.clone(), my_address.clone(), topic_graph_node.clone(), topic_static); - let topics = generate_topics().await; - debug!( - topics = tracing::field::debug(&topics), - "Found content topics for subscription", - ); - GRAPHCAST_AGENT - .get() - .unwrap() - .update_content_topics(topics.clone()) - .await; - - GRAPHCAST_AGENT - .get() - .unwrap() - .register_handler(Arc::new(AsyncMutex::new(radio_msg_handler()))) - .expect("Could not register handler"); - - // Control flow - // TODO: expose to radio config for the users - let running = Arc::new(AtomicBool::new(true)); - let skip_iteration = Arc::new(AtomicBool::new(false)); - let skip_iteration_clone = skip_iteration.clone(); - - let mut topic_update_interval = interval(Duration::from_secs(600)); - let mut state_update_interval = interval(Duration::from_secs(15)); - let mut gossip_poi_interval = interval(Duration::from_secs(30)); - let mut comparison_interval = interval(Duration::from_secs(60)); - - let iteration_timeout = Duration::from_secs(180); - let update_timeout = Duration::from_secs(10); - let gossip_timeout = Duration::from_secs(150); - - let notifier = Notifier::from_config( - radio_name().to_string(), - &CONFIG.get().unwrap().lock().unwrap(), - ); - - // Separate control flow thread to skip a main loop iteration when hit timeout - tokio::spawn(async move { - tokio::time::sleep(iteration_timeout).await; - skip_iteration_clone.store(true, Ordering::SeqCst); - }); - - // Initialize Http server if configured - if CONFIG.get().unwrap().lock().unwrap().server_port.is_some() { - tokio::spawn(run_server(running.clone(), Arc::clone(&local_attestations))); - } - - let mut divergent_subgraphs: HashSet = HashSet::new(); - - // Main loop for sending messages, can factor out - // and take radio specific query and parsing for radioPayload - while running.load(Ordering::SeqCst) { - // Run event intervals sequentially by satisfication of other intervals and corresponding tick - tokio::select! { - _ = topic_update_interval.tick() => { - if skip_iteration.load(Ordering::SeqCst) { - skip_iteration.store(false, Ordering::SeqCst); - continue; - } - // Update topic subscription - let result = timeout(update_timeout, - GRAPHCAST_AGENT - .get() - .unwrap() - .update_content_topics(generate_topics().await) - ).await; - - if result.is_err() { - warn!("update_content_topics timed out"); - } else { - debug!("update_content_topics completed"); - } - }, - _ = state_update_interval.tick() => { - if skip_iteration.load(Ordering::SeqCst) { - skip_iteration.store(false, Ordering::SeqCst); - continue; - } - // TODO: make operator struct that keeps the global state - // Update the state to persist - let result = timeout(update_timeout, - state.update(Some(local_attestations.clone()), Some(MESSAGES.get().unwrap().clone())) - ).await; - - if let Ok(r) = result { - debug!("state update completed"); - - // Save cache if path provided - if let Some(path) = &radio_config.persistence_file_path.clone() { - r.update_cache(path); - } - } else { - warn!("state update timed out"); - } - }, - _ = gossip_poi_interval.tick() => { - if skip_iteration.load(Ordering::SeqCst) { - skip_iteration.store(false, Ordering::SeqCst); - continue; - } - - let result = timeout(gossip_timeout, { - let network_chainhead_blocks: Arc>> = - Arc::new(AsyncMutex::new(HashMap::new())); - // Update all the chainheads of the network - // Also get a hash map returned on the subgraph mapped to network name and latest block - let graph_node = CONFIG - .get() - .unwrap() - .lock() - .unwrap() - .graph_node_endpoint - .clone(); - let subgraph_network_latest_blocks = - match update_chainhead_blocks(graph_node, &mut *network_chainhead_blocks.lock().await) - .await - { - Ok(res) => res, - Err(e) => { - error!(err = tracing::field::debug(&e), "Could not query indexing statuses, pull again later"); - continue; - } - }; - - trace!( - network_pointers = tracing::field::debug(&subgraph_network_latest_blocks), - "Subgraph network and latest blocks", - ); - - // Radio specific message content query function - // Function takes in an identifier string and make specific queries regarding the identifier - // The example here combines a single function provided query endpoint, current block info based on the subgraph's indexing network - // Then the function gets sent to agent for making identifier independent queries - let identifiers = GRAPHCAST_AGENT.get().unwrap().content_identifiers().await; - let num_topics = identifiers.len(); - let blocks_str = chainhead_block_str(&*network_chainhead_blocks.lock().await); - info!( - chainhead = blocks_str.clone(), - num_gossip_peers = GRAPHCAST_AGENT.get().unwrap().number_of_peers(), - num_topics, - "Network statuses", - ); - - let send_ops = gossip_poi( - identifiers.clone(), - &network_chainhead_blocks.clone(), - &subgraph_network_latest_blocks.clone(), - local_attestations.clone(), - ).await; - - log_gossip_summary( - blocks_str, - identifiers.len(), - send_ops, - ) - }).await; - - if result.is_err() { - warn!("gossip_poi timed out"); - } else { - debug!("gossip_poi completed"); - } - }, - _ = comparison_interval.tick() => { - if skip_iteration.load(Ordering::SeqCst) { - skip_iteration.store(false, Ordering::SeqCst); - continue; - } - - let result = timeout(gossip_timeout, { - let mut network_chainhead_blocks: HashMap = - HashMap::new(); - let local_attestations = Arc::clone(&local_attestations); - - // Update all the chainheads of the network - // Also get a hash map returned on the subgraph mapped to network name and latest block - let graph_node_endpoint = CONFIG - .get() - .unwrap() - .lock() - .unwrap() - .graph_node_endpoint - .clone(); - let indexing_status = match get_indexing_statuses(graph_node_endpoint.clone()).await { - Ok(res) => res, - Err(e) => { - error!(err = tracing::field::debug(&e), "Could not query indexing statuses, pull again later"); - continue; - } - }; - update_network_chainheads( - indexing_status, - &mut network_chainhead_blocks, - ); - - let identifiers = GRAPHCAST_AGENT.get().unwrap().content_identifiers().await; - let blocks_str = chainhead_block_str(&network_chainhead_blocks); - - let comparison_res = compare_poi( - identifiers.clone(), - local_attestations, - ) - .await; - - process_comparison_results( - blocks_str, - identifiers.len(), - comparison_res, - &mut divergent_subgraphs, - notifier.clone() - ) - }).await; + // Initialization + let radio_operator = RadioOperator::new(radio_config).await; - if result.is_err() { - warn!("compare_poi timed out"); - } else { - debug!("compare_poi completed"); - } - }, - else => break, - } + // Start separate processes + radio_operator.prepare().await; - sleep(Duration::from_secs(5)); - continue; - } + // Start radio operations + radio_operator.run().await; } diff --git a/src/operation.rs b/src/operation.rs deleted file mode 100644 index 215fb1b..0000000 --- a/src/operation.rs +++ /dev/null @@ -1,413 +0,0 @@ -use autometrics::autometrics; -use chrono::Utc; -use std::cmp::max; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::Mutex as AsyncMutex; -use tracing::{debug, error, trace, warn}; - -use graphcast_sdk::{ - determine_message_block, - graphcast_agent::{ - message_typing::{BuildMessageError, GraphcastMessage}, - GraphcastAgent, GraphcastAgentError, - }, - networks::NetworkName, - BlockPointer, NetworkBlockError, NetworkPointer, -}; - -use crate::{ - attestation::{ - clear_local_attestation, compare_attestations, local_comparison_point, process_messages, - save_local_attestation, Attestation, ComparisonResult, - }, - graphql::query_graph_node_poi, - metrics::CACHED_MESSAGES, - OperationError, RadioPayloadMessage, CONFIG, GRAPHCAST_AGENT, MESSAGES, -}; - -/// Determine the parameters for messages to send and compare -#[autometrics(track_concurrency)] -pub async fn gossip_set_up( - id: String, - network_chainhead_blocks: &Arc>>, - subgraph_network_latest_blocks: &HashMap, - local_attestations: Arc>>>, -) -> Result<(NetworkName, BlockPointer, u64), BuildMessageError> { - // Get the indexing network of the deployment - // and update the NETWORK message block - let (network_name, latest_block) = match subgraph_network_latest_blocks.get(&id.clone()) { - Some(network_block) => ( - NetworkName::from_string(&network_block.network.clone()), - network_block.block.clone(), - ), - None => { - let err_msg = format!("Could not query the subgraph's indexing network, check Graph node's indexing statuses of subgraph deployment {}", id.clone()); - warn!( - err = tracing::field::debug(&err_msg), - "Failed to build message" - ); - return Err(BuildMessageError::Network(NetworkBlockError::FailedStatus( - err_msg, - ))); - } - }; - - let message_block = - match determine_message_block(&*network_chainhead_blocks.lock().await, network_name) { - Ok(block) => block, - Err(e) => return Err(BuildMessageError::Network(e)), - }; - - debug!( - deployment_hash = tracing::field::debug(&id), - network = tracing::field::debug(&network_name), - message_block = message_block, - latest_block = latest_block.number, - message_countdown_blocks = max(0, message_block as i64 - latest_block.number as i64), - topics_waiting_for_next_message_interval = local_attestations - .lock() - .await - .get(&id.clone()) - .and_then(|blocks| blocks.get(&message_block)) - .is_some(), - "Deployment status", - ); - - Ok((network_name, latest_block, message_block)) -} - -/// Construct the message and send it to Graphcast network -#[autometrics(track_concurrency)] -pub async fn message_send( - id: String, - message_block: u64, - latest_block: BlockPointer, - network_name: NetworkName, - local_attestations: Arc>>>, - graphcast_agent: &GraphcastAgent, -) -> Result { - trace!( - message_block = message_block, - latest_block = latest_block.number, - "Check message send requirement", - ); - - // Deployment did not sync to message_block - if latest_block.number < message_block { - //TODO: fill in variant in SDK - let err_msg = format!( - "Did not send message for deployment {}: latest_block ({}) syncing status must catch up to the message block ({})", - id.clone(), - latest_block.number, message_block, - ); - trace!(err = err_msg, "Skip send",); - return Err(OperationError::SendTrigger(err_msg)); - }; - - // Already sent message - if local_attestations - .lock() - .await - .get(&id.clone()) - .and_then(|blocks| blocks.get(&message_block)) - .is_some() - { - let err_msg = format!( - "Repeated message for deployment {}, skip sending message for block: {}", - id.clone(), - message_block - ); - trace!(err = err_msg, "Skip send"); - return Err(OperationError::SkipDuplicate(err_msg)); - } - - let block_hash = match graphcast_agent - .get_block_hash(network_name.to_string(), message_block) - .await - { - Ok(hash) => hash, - Err(e) => { - let err_msg = format!("Failed to query graph node for the block hash: {e}"); - error!(err = err_msg, "Failed to send message"); - return Err(OperationError::Agent(e)); - } - }; - - match query_graph_node_poi( - graphcast_agent.graph_node_endpoint.clone(), - id.clone(), - block_hash.clone(), - message_block.try_into().unwrap(), - ) - .await - { - Ok(content) => { - let radio_message = RadioPayloadMessage::new(id.clone(), content.clone()); - match graphcast_agent - .send_message(id.clone(), network_name, message_block, Some(radio_message)) - .await - { - Ok(msg_id) => { - save_local_attestation( - local_attestations, - content.clone(), - id.clone(), - message_block, - ) - .await; - Ok(msg_id) - } - Err(e) => { - error!(err = tracing::field::debug(&e), "Failed to send message"); - Err(OperationError::Agent(e)) - } - } - } - Err(e) => { - error!( - err = tracing::field::debug(&e), - "Failed to query message content" - ); - Err(OperationError::Agent( - GraphcastAgentError::QueryResponseError(e), - )) - } - } -} - -/// Compare validated messages -#[allow(clippy::too_many_arguments)] -#[autometrics(track_concurrency)] -pub async fn message_comparison( - id: String, - collect_window_duration: i64, - registry_subgraph: String, - network_subgraph: String, - messages: Vec>, - local_attestations: Arc>>>, -) -> Result { - let time = Utc::now().timestamp(); - - let (compare_block, collect_window_end) = match local_comparison_point( - Arc::clone(&local_attestations), - id.clone(), - collect_window_duration, - ) - .await - { - Some((block, window)) if time >= window => (block, window), - Some((compare_block, window)) => { - let err_msg = format!("Deployment {} comparison not triggered: collecting messages until time {}; currently {time}", id.clone(), window); - debug!(err = err_msg, "Collecting messages",); - return Err(OperationError::CompareTrigger( - id.clone(), - compare_block, - err_msg, - )); - } - _ => { - let err_msg = format!( - "Deployment {} comparison not triggered: no matching attestation to compare", - id.clone() - ); - debug!(err = err_msg, "No matching attestations",); - return Err(OperationError::CompareTrigger(id.clone(), 0, err_msg)); - } - }; - - // Update to only process the identifier&compare_block related messages within the collection window - let filter_msg: Vec> = messages - .iter() - .filter(|&m| m.block_number == compare_block && m.nonce <= collect_window_end) - .cloned() - .collect(); - debug!( - deployment_hash = id, - time, - comparison_time = collect_window_end, - compare_block, - comparison_countdown_seconds = max(0, time - collect_window_end), - number_of_messages_matched_to_compare = filter_msg.len(), - "Comparison state", - ); - let remote_attestations_result = - process_messages(filter_msg, ®istry_subgraph, &network_subgraph).await; - let remote_attestations = match remote_attestations_result { - Ok(remote) => { - debug!(unique_remote_nPOIs = remote.len(), "Processed messages",); - remote - } - Err(err) => { - trace!( - err = tracing::field::debug(&err), - "An error occured while parsing messages", - ); - return Err(OperationError::Attestation(err)); - } - }; - let comparison_result = compare_attestations( - compare_block, - remote_attestations, - Arc::clone(&local_attestations), - &id, - ) - .await; - - Ok(comparison_result) -} - -pub async fn gossip_poi( - identifiers: Vec, - network_chainhead_blocks: &Arc>>, - subgraph_network_latest_blocks: &HashMap, - local_attestations: Arc>>>, -) -> Vec> { - let mut send_handles = vec![]; - for id in identifiers.clone() { - /* Set up */ - let local_attestations = Arc::clone(&local_attestations); - let (network_name, latest_block, message_block) = if let Ok(params) = gossip_set_up( - id.clone(), - network_chainhead_blocks, - subgraph_network_latest_blocks, - Arc::clone(&local_attestations), - ) - .await - { - params - } else { - let err_msg = "Failed to set up message parameters".to_string(); - warn!(id, err_msg, "Gossip POI failed"); - continue; - }; - - /* Send message */ - let id_cloned = id.clone(); - - let local = Arc::clone(&local_attestations); - let send_handle = tokio::spawn(async move { - message_send( - id_cloned, - message_block, - latest_block, - network_name, - local, - GRAPHCAST_AGENT.get().unwrap(), - ) - .await - }); - - send_handles.push(send_handle); - } - - let mut send_ops = vec![]; - for handle in send_handles { - if let Ok(s) = handle.await { - send_ops.push(s); - } - } - send_ops -} - -pub async fn compare_poi( - identifiers: Vec, - local_attestations: Arc>>>, -) -> Vec> { - let mut compare_handles = vec![]; - for id in identifiers.clone() { - /* Set up */ - let collect_duration: i64 = CONFIG - .get() - .unwrap() - .lock() - .unwrap() - .collect_message_duration; - let id_cloned = id.clone(); - let registry_subgraph = CONFIG - .get() - .unwrap() - .lock() - .unwrap() - .registry_subgraph - .clone(); - let network_subgraph = CONFIG - .get() - .unwrap() - .lock() - .unwrap() - .network_subgraph - .clone(); - let local = Arc::clone(&local_attestations); - let msgs = MESSAGES.get().unwrap().lock().unwrap().to_vec(); - let filtered_msg = msgs - .iter() - .filter(|&m| m.identifier == id.clone()) - .cloned() - .collect(); - - let compare_handle = tokio::spawn(async move { - message_comparison( - id_cloned, - collect_duration, - registry_subgraph.clone(), - network_subgraph.clone(), - filtered_msg, - local, - ) - .await - }); - compare_handles.push(compare_handle); - } - - let mut compare_ops = vec![]; - for handle in compare_handles { - let res = handle.await; - if let Ok(s) = res { - // Skip clean up for comparisonResult for Error and buildFailed - match s { - Ok(r) => { - compare_ops.push(Ok(r.clone())); - - /* Clean up cache */ - // Only clear the ones matching identifier and block number equal or less - // Retain the msgs with a different identifier, or if their block number is greater - let local = Arc::clone(&local_attestations); - clear_local_attestation(local, r.deployment_hash(), r.block()).await; - CACHED_MESSAGES - .with_label_values(&[&r.deployment_hash()]) - .set( - MESSAGES - .get() - .unwrap() - .lock() - .unwrap() - .len() - .try_into() - .unwrap(), - ); - MESSAGES.get().unwrap().lock().unwrap().retain(|msg| { - msg.block_number >= r.block() || msg.identifier != r.deployment_hash() - }); - CACHED_MESSAGES - .with_label_values(&[&r.deployment_hash()]) - .set( - MESSAGES - .get() - .unwrap() - .lock() - .unwrap() - .len() - .try_into() - .unwrap(), - ); - } - Err(e) => { - trace!(err = tracing::field::debug(&e), "Compare handles"); - compare_ops.push(Err(e.clone_with_inner())); - } - } - } - } - compare_ops -} diff --git a/src/attestation/mod.rs b/src/operator/attestation.rs similarity index 92% rename from src/attestation/mod.rs rename to src/operator/attestation.rs index 8b738e8..0ebfe37 100644 --- a/src/attestation/mod.rs +++ b/src/operator/attestation.rs @@ -7,9 +7,9 @@ use sha3::{Digest, Sha3_256}; use std::{ collections::{HashMap, HashSet}, fmt::{self, Display}, - sync::Arc, + sync::{Arc, Mutex as SyncMutex}, }; -use tokio::sync::Mutex as AsyncMutex; + use tracing::{debug, error, info, trace, warn}; use graphcast_sdk::{ @@ -21,10 +21,11 @@ use crate::{ metrics::{ ACTIVE_INDEXERS, DIVERGING_SUBGRAPHS, INDEXER_COUNT_BY_NPOI, LOCAL_NPOIS_TO_COMPARE, }, - notifier::Notifier, OperationError, RadioPayloadMessage, }; +use super::Notifier; + /// A wrapper around an attested NPOI, tracks Indexers that have sent it plus their accumulated stake #[derive(Clone, Debug, PartialEq, Eq, Hash, SimpleObject, Serialize, Deserialize)] pub struct Attestation { @@ -92,12 +93,12 @@ pub struct AttestationEntry { pub attestation: Attestation, } -pub async fn attestations_to_vec( - attestations: &Arc>, +pub fn attestations_to_vec( + attestations: &Arc>, ) -> Vec { attestations .lock() - .await + .unwrap() .iter() .flat_map(|(npoi, inner_map)| { inner_map.iter().map(move |(blk, att)| AttestationEntry { @@ -109,9 +110,6 @@ pub async fn attestations_to_vec( .collect() } -/// This function processes the global messages map that we populate when -/// messages are being received. It constructs the remote attestations -/// map and returns it if the processing succeeds. #[autometrics] pub async fn process_messages( messages: Vec>, @@ -205,12 +203,12 @@ pub fn combine_senders(attestations: &[Attestation]) -> Vec { /// Determine the comparison pointer on both block and time based on the local attestations /// If they don't exist, then return default value that shall never be validated to trigger -pub async fn local_comparison_point( - local_attestations: Arc>, +pub fn local_comparison_point( + local_attestations: Arc>, id: String, collect_window_duration: i64, ) -> Option<(u64, i64)> { - let local_attestations = local_attestations.lock().await; + let local_attestations = local_attestations.lock().unwrap(); if let Some(blocks_map) = local_attestations.get(&id) { // Find the attestaion by the smallest block blocks_map @@ -254,20 +252,15 @@ pub fn update_blocks( } /// Saves NPOIs that we've generated locally, in order to compare them with remote ones later -pub async fn save_local_attestation( - local_attestations: Arc>, +pub fn save_local_attestation( + local_attestations: Arc>, content: String, ipfs_hash: String, block_number: u64, ) { - let attestation = Attestation::new( - content.clone(), - Zero::zero(), - vec![], - vec![Utc::now().timestamp()], - ); + let attestation = Attestation::new(content, Zero::zero(), vec![], vec![Utc::now().timestamp()]); - let mut local_attestations = local_attestations.lock().await; + let mut local_attestations = local_attestations.lock().unwrap(); let blocks = local_attestations.get(&ipfs_hash); match blocks { @@ -292,33 +285,25 @@ pub async fn save_local_attestation( } /// Clear the expired local attestations after comparing with remote results -pub async fn clear_local_attestation( - local_attestations: Arc>>>, +pub fn clear_local_attestation( + local_attestations: Arc>>>, ipfs_hash: String, block_number: u64, ) { - let mut local_attestations = local_attestations.lock().await; - let blocks = local_attestations.get(&ipfs_hash.clone()); + let mut local_attestations = local_attestations.lock().unwrap(); + let blocks = local_attestations.get(&ipfs_hash); if let Some(blocks) = blocks { let mut blocks_clone: HashMap = HashMap::new(); blocks_clone.extend(blocks.clone()); blocks_clone.remove(&block_number); - let npoi_gauge = LOCAL_NPOIS_TO_COMPARE.with_label_values(&[&ipfs_hash.clone()]); + let npoi_gauge = LOCAL_NPOIS_TO_COMPARE.with_label_values(&[&ipfs_hash]); // The value is the total number of senders that are attesting for that subgraph npoi_gauge.set(blocks_clone.len().try_into().unwrap()); - local_attestations.insert(ipfs_hash.clone(), blocks_clone); + local_attestations.insert(ipfs_hash, blocks_clone); }; } -//TODO: add as a function of global state -// /// Clear the expired local attestations after comparing with remote results -// pub async fn clear_local_attestations(local_attestations: Arc>, -// ) { - -// _ = local_attestations.set(Arc::new(AsyncMutex::new(HashMap::new()))); -// } - /// Tracks results indexed by deployment hash and block number #[derive(Enum, Debug, PartialEq, Eq, Hash, Clone, Copy)] pub enum ComparisonResultType { @@ -437,7 +422,7 @@ impl Clone for ComparisonResult { pub async fn compare_attestations( attestation_block: u64, remote: RemoteAttestationsMap, - local: Arc>, + local: Arc>, ipfs_hash: &str, ) -> ComparisonResult { trace!( @@ -446,7 +431,7 @@ pub async fn compare_attestations( "Comparing attestations", ); - let local = local.lock().await; + let local = local.lock().unwrap(); let blocks = match local.get(ipfs_hash) { Some(blocks) => blocks, @@ -653,7 +638,6 @@ pub async fn process_comparison_results( notifier.clone().notify(x.to_string()).await; divergent_subgraphs.insert(x.deployment.clone()); } - divergent_strings.push(x.to_string()); } Ok(x) => attestation_failed.push(x.to_string()), @@ -695,8 +679,6 @@ impl ErrorExtensions for AttestationError { #[cfg(test)] mod tests { - use crate::{radio_name, CONFIG}; - use super::*; // TODO: add setup and teardown functions @@ -848,7 +830,7 @@ mod tests { let res = compare_attestations( 42, HashMap::new(), - Arc::new(AsyncMutex::new(HashMap::new())), + Arc::new(SyncMutex::new(HashMap::new())), "non-existent-ipfs-hash", ) .await; @@ -890,7 +872,7 @@ mod tests { let res = compare_attestations( 42, remote_attestations, - Arc::new(AsyncMutex::new(local_attestations)), + Arc::new(SyncMutex::new(local_attestations)), "different-awesome-hash", ) .await; @@ -917,7 +899,7 @@ mod tests { let res = compare_attestations( 42, remote_attestations, - Arc::new(AsyncMutex::new(local_attestations)), + Arc::new(SyncMutex::new(local_attestations)), "my-awesome-hash", ) .await; @@ -954,7 +936,7 @@ mod tests { let res = compare_attestations( 42, remote_attestations, - Arc::new(AsyncMutex::new(local_attestations)), + Arc::new(SyncMutex::new(local_attestations)), "my-awesome-hash", ) .await; @@ -1002,13 +984,19 @@ mod tests { let mut local_attestations: HashMap> = HashMap::new(); local_attestations.insert("hash".to_string(), local_blocks.clone()); local_attestations.insert("hash2".to_string(), local_blocks); - let local = Arc::new(AsyncMutex::new(local_attestations)); - - clear_local_attestation(Arc::clone(&local), "hash".to_string(), 43).await; - - assert_eq!(local.lock().await.get("hash").unwrap().len(), 2); - assert!(local.lock().await.get("hash").unwrap().get(&43).is_none()); - assert_eq!(local.lock().await.get("hash2").unwrap().len(), 3); + let local = Arc::new(SyncMutex::new(local_attestations)); + + clear_local_attestation(Arc::clone(&local), "hash".to_string(), 43); + + assert_eq!(local.lock().unwrap().get("hash").unwrap().len(), 2); + assert!(local + .lock() + .unwrap() + .get("hash") + .unwrap() + .get(&43) + .is_none()); + assert_eq!(local.lock().unwrap().get("hash2").unwrap().len(), 3); } #[tokio::test] @@ -1042,11 +1030,9 @@ mod tests { let mut local_attestations: HashMap> = HashMap::new(); local_attestations.insert("hash".to_string(), local_blocks.clone()); local_attestations.insert("hash2".to_string(), local_blocks); - let local = Arc::new(AsyncMutex::new(local_attestations)); + let local = Arc::new(SyncMutex::new(local_attestations)); let (block_num, collect_window_end) = - local_comparison_point(local, "hash".to_string(), 120) - .await - .unwrap(); + local_comparison_point(local, "hash".to_string(), 120).unwrap(); assert_eq!(block_num, 42); assert_eq!(collect_window_end, 122); @@ -1054,39 +1040,52 @@ mod tests { #[tokio::test] async fn test_save_local_attestation() { - let local_attestations = Arc::new(AsyncMutex::new(HashMap::new())); + let local_attestations = Arc::new(SyncMutex::new(HashMap::new())); save_local_attestation( local_attestations.clone(), "npoi-x".to_string(), "0xa1".to_string(), 0, - ) - .await; + ); save_local_attestation( local_attestations.clone(), "npoi-y".to_string(), "0xa1".to_string(), 1, - ) - .await; + ); save_local_attestation( local_attestations.clone(), "npoi-z".to_string(), "0xa2".to_string(), 2, - ) - .await; + ); - assert!(!local_attestations.lock().await.is_empty()); - assert!(local_attestations.lock().await.len() == 2); - assert!(local_attestations.lock().await.get("0xa1").unwrap().len() == 2); - assert!(local_attestations.lock().await.get("0xa2").unwrap().len() == 1); + assert!(!local_attestations.lock().unwrap().is_empty()); + assert!(local_attestations.lock().unwrap().len() == 2); + assert!( + local_attestations + .lock() + .unwrap() + .get("0xa1") + .unwrap() + .len() + == 2 + ); + assert!( + local_attestations + .lock() + .unwrap() + .get("0xa2") + .unwrap() + .len() + == 1 + ); assert!( local_attestations .lock() - .await + .unwrap() .get("0xa1") .unwrap() .get(&0) diff --git a/src/operator/callbook.rs b/src/operator/callbook.rs new file mode 100644 index 0000000..d1ce4d8 --- /dev/null +++ b/src/operator/callbook.rs @@ -0,0 +1,34 @@ +use axum::async_trait; + +use crate::graphql::query_graph_node_poi; +use graphcast_sdk::callbook::CallBook; +use graphcast_sdk::graphql::QueryError; + +#[async_trait] +pub trait CallBookRadioExtensions { + // Define the additional function(s) you want to add to CallBook + async fn query_poi( + &self, + ipfs_hash: String, + block_hash: String, + block_number: i64, + ) -> Result; +} + +#[async_trait] +impl CallBookRadioExtensions for CallBook { + async fn query_poi( + &self, + ipfs_hash: String, + block_hash: String, + block_number: i64, + ) -> Result { + query_graph_node_poi( + self.graph_node_status().to_string(), + ipfs_hash, + block_hash, + block_number, + ) + .await + } +} diff --git a/src/operator/mod.rs b/src/operator/mod.rs new file mode 100644 index 0000000..37c4fdf --- /dev/null +++ b/src/operator/mod.rs @@ -0,0 +1,391 @@ +use derive_getters::Getters; +use std::collections::HashSet; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{mpsc, Arc, Mutex as SyncMutex}; +use std::thread; +use std::time::Duration; +use tokio::sync::Mutex as AsyncMutex; +use tokio::time::{interval, sleep, timeout}; +use tracing::{debug, error, info, trace, warn}; + +use graphcast_sdk::{ + build_wallet, + graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent}, + graphcast_id_address, + graphql::{ + client_graph_node::{subgraph_network_blocks, update_network_chainheads}, + client_registry::query_registry_indexer, + }, +}; + +use crate::chainhead_block_str; +use crate::metrics::handle_serve_metrics; +use crate::operator::attestation::log_gossip_summary; +use crate::operator::attestation::process_comparison_results; +use crate::server::run_server; +use crate::state::PersistedState; +use crate::{config::Config, metrics::CACHED_MESSAGES}; +use crate::{RadioPayloadMessage, GRAPHCAST_AGENT}; + +use self::notifier::Notifier; + +pub mod attestation; +pub mod callbook; +pub mod notifier; +pub mod operation; + +/// Aggregated control flow configurations +/// Not used currently +#[derive(Getters)] +#[allow(unused)] +struct ControlFlow { + running: Arc, + skip_iteration: Arc, + iteration_timeout: Duration, + update_timeout: Duration, + gossip_timeout: Duration, + topic_update_duration: Duration, + state_update_duration: Duration, + gossip_poi_duration: Duration, + comparison_duration: Duration, +} + +impl ControlFlow { + fn new() -> Self { + let running = Arc::new(AtomicBool::new(true)); + let skip_iteration = Arc::new(AtomicBool::new(false)); + + let topic_update_duration = Duration::from_secs(600); + let state_update_duration = Duration::from_secs(15); + let gossip_poi_duration = Duration::from_secs(30); + let comparison_duration = Duration::from_secs(60); + + let iteration_timeout = Duration::from_secs(180); + let update_timeout = Duration::from_secs(10); + let gossip_timeout = Duration::from_secs(150); + + ControlFlow { + running, + skip_iteration, + iteration_timeout, + update_timeout, + gossip_timeout, + topic_update_duration, + state_update_duration, + gossip_poi_duration, + comparison_duration, + } + } +} + +/// Radio operator contains all states needed for radio operations +#[allow(unused)] +pub struct RadioOperator { + config: Config, + persisted_state: Arc>, + graphcast_agent: Arc, + notifier: Notifier, + control_flow: ControlFlow, + indexer_address: String, +} + +impl RadioOperator { + /// Create a radio operator with radio configurations, persisted data, + /// graphcast agent, and control flow + pub async fn new(config: Config) -> RadioOperator { + debug!("Initializing Radio operator"); + let wallet = build_wallet( + config + .wallet_input() + .expect("Operator wallet input invalid"), + ) + .expect("Radio operator cannot build wallet"); + // The query here must be Ok but so it is okay to panic here + // Alternatively, make validate_set_up return wallet, address, and stake + let indexer_address = query_registry_indexer( + config.registry_subgraph.to_string(), + graphcast_id_address(&wallet), + ) + .await + .expect("Radio operator registered to indexer"); + + debug!("Initializing program state"); + // Initialize program state + let persisted_state: Arc> = + Arc::new(SyncMutex::new(config.init_radio_state().await)); + + debug!("Initializing Graphcast Agent"); + let graphcast_agent = Arc::new( + config + .create_graphcast_agent() + .await + .expect("Initialize Graphcast agent"), + ); + debug!("Set global static instance of graphcast_agent"); + _ = GRAPHCAST_AGENT.set(graphcast_agent.clone()); + + let notifier = Notifier::from_config(&config); + + RadioOperator { + config, + persisted_state, + graphcast_agent, + notifier, + control_flow: ControlFlow::new(), + indexer_address, + } + } + + /// Preparation for running the radio applications + /// Expose metrics and subscribe to graphcast topics + pub async fn prepare(&self) { + // Set up Prometheus metrics url if configured + if let Some(port) = self.config.metrics_port { + debug!("Initializing metrics port"); + tokio::spawn(handle_serve_metrics(self.config.metrics_host.clone(), port)); + } + + // Provide generated topics to Graphcast agent + let topics = self + .config + .generate_topics(self.indexer_address.clone()) + .await; + debug!( + topics = tracing::field::debug(&topics), + "Found content topics for subscription", + ); + self.graphcast_agent + .update_content_topics(topics.clone()) + .await; + + let (sender, receiver) = mpsc::channel::>(); + let handler = RadioOperator::radio_msg_handler(SyncMutex::new(sender)); + GRAPHCAST_AGENT + .get() + .unwrap() + .register_handler(Arc::new(AsyncMutex::new(handler))) + .expect("Could not register handler"); + let state_ref = self.persisted_state.clone(); + thread::spawn(move || { + for msg in receiver { + trace!( + "Radio operator received a validated message from Graphcast agent: {:#?}", + msg + ); + let identifier = msg.identifier.clone(); + state_ref + .lock() + .unwrap() + .remote_messages() + .lock() + .unwrap() + .push(msg.clone()); + CACHED_MESSAGES.with_label_values(&[&identifier]).set( + state_ref + .lock() + .unwrap() + .remote_messages() + .lock() + .unwrap() + .iter() + .filter(|m| m.identifier == identifier) + .collect::>>() + .len() + .try_into() + .unwrap(), + ); + } + }); + } + + pub fn graphcast_agent(&self) -> &GraphcastAgent { + &self.graphcast_agent + } + + /// Read persisted state at the time of access + pub async fn state(&self) -> PersistedState { + self.persisted_state.lock().unwrap().clone() + } + + /// Radio operations + pub async fn run(&self) { + // Control flow + // TODO: expose to radio config for the users + let running = Arc::new(AtomicBool::new(true)); + let skip_iteration = Arc::new(AtomicBool::new(false)); + let skip_iteration_clone = skip_iteration.clone(); + + let mut topic_update_interval = interval(Duration::from_secs(60)); + let mut state_update_interval = interval(Duration::from_secs(5)); + let mut gossip_poi_interval = interval(Duration::from_secs(10)); + let mut comparison_interval = interval(Duration::from_secs(6)); + + let iteration_timeout = Duration::from_secs(180); + let update_timeout = Duration::from_secs(10); + let gossip_timeout = Duration::from_secs(150); + + // Separate thread to skip a main loop iteration when hit timeout + tokio::spawn(async move { + tokio::time::sleep(iteration_timeout).await; + skip_iteration_clone.store(true, Ordering::SeqCst); + }); + + let mut divergent_subgraphs: HashSet = HashSet::new(); + + // Initialize Http server with graceful shutdown if configured + if self.config.server_port().is_some() { + tokio::spawn(run_server( + self.config.clone(), + self.persisted_state.clone(), + running.clone(), + )); + } + + // Main loop for sending messages, can factor out + // and take radio specific query and parsing for radioPayload + while running.load(Ordering::SeqCst) { + // Run event intervals sequentially by satisfication of other intervals and corresponding tick + tokio::select! { + _ = topic_update_interval.tick() => { + if skip_iteration.load(Ordering::SeqCst) { + skip_iteration.store(false, Ordering::SeqCst); + continue; + } + // Update topic subscription + let result = timeout(update_timeout, + self.graphcast_agent() + .update_content_topics(self.config.generate_topics(self.indexer_address.clone()).await) + ).await; + + if result.is_err() { + warn!("update_content_topics timed out"); + } else { + debug!("update_content_topics completed"); + } + }, + _ = state_update_interval.tick() => { + if skip_iteration.load(Ordering::SeqCst) { + skip_iteration.store(false, Ordering::SeqCst); + continue; + } + + // Save cache if path provided + let _ = &self.config.persistence_file_path.as_ref().map(|path| { + self.persisted_state.lock().unwrap().update_cache(path); + }); + }, + _ = gossip_poi_interval.tick() => { + if skip_iteration.load(Ordering::SeqCst) { + skip_iteration.store(false, Ordering::SeqCst); + continue; + } + + let result = timeout(gossip_timeout, { + // Update all the chainheads of the network + // Also get a hash map returned on the subgraph mapped to network name and latest block + let network_chainhead_blocks = match self.config.callbook().indexing_statuses().await { + Ok(res) => update_network_chainheads( + res, + ), + Err(e) => { + error!(err = tracing::field::debug(&e), "Could not query indexing statuses, pull again later"); + continue; + } + }; + // Separate calls to indexing_statuses as it is not cloneable + let subgraph_network_latest_blocks = match self.config.callbook().indexing_statuses().await { + Ok(res) => subgraph_network_blocks(res), + Err(e) => { + error!(err = tracing::field::debug(&e), "Could not query indexing statuses, pull again later"); + continue; + } + }; + + trace!( + network_pointers = tracing::field::debug(&subgraph_network_latest_blocks), + "Subgraph network and latest blocks", + ); + + // Radio specific message content query function + // Function takes in an identifier string and make specific queries regarding the identifier + // The example here combines a single function provided query endpoint, current block info based on the subgraph's indexing network + // Then the function gets sent to agent for making identifier independent queries + let identifiers = self.graphcast_agent.content_identifiers().await; + let num_topics = identifiers.len(); + let blocks_str = chainhead_block_str(&network_chainhead_blocks); + info!( + chainhead = blocks_str.clone(), + num_gossip_peers = self.graphcast_agent.number_of_peers(), + num_topics, + "Network statuses", + ); + + let send_ops = self.gossip_poi( + identifiers.clone(), + &network_chainhead_blocks.clone(), + &subgraph_network_latest_blocks, + ).await; + + log_gossip_summary( + blocks_str, + identifiers.len(), + send_ops, + ) + }).await; + + if result.is_err() { + warn!("gossip_poi timed out"); + } else { + debug!("gossip_poi completed"); + } + }, + _ = comparison_interval.tick() => { + if skip_iteration.load(Ordering::SeqCst) { + skip_iteration.store(false, Ordering::SeqCst); + continue; + } + + let result = timeout(update_timeout, { + // Update all the chainheads of the network + // Also get a hash map returned on the subgraph mapped to network name and latest block + let indexing_status = match self.config.callbook().indexing_statuses().await { + Ok(res) => res, + Err(e) => { + error!(err = tracing::field::debug(&e), "Could not query indexing statuses, pull again later"); + continue; + } + }; + let network_chainhead_blocks = update_network_chainheads( + indexing_status, + ); + let identifiers = self.graphcast_agent().content_identifiers().await; + let blocks_str = chainhead_block_str(&network_chainhead_blocks); + + let comparison_res = self.compare_poi( + identifiers.clone(), + ) + .await; + + process_comparison_results( + blocks_str, + identifiers.len(), + comparison_res, + &mut divergent_subgraphs, + self.notifier.clone() + ) + }).await; + + if result.is_err() { + warn!("compare_poi timed out"); + } else { + debug!("compare_poi completed"); + } + }, + else => break, + } + + sleep(Duration::from_secs(5)).await; + continue; + } + } +} diff --git a/src/operator/notifier.rs b/src/operator/notifier.rs new file mode 100644 index 0000000..9a2f795 --- /dev/null +++ b/src/operator/notifier.rs @@ -0,0 +1,91 @@ +use derive_getters::Getters; +use graphcast_sdk::bots::{DiscordBot, SlackBot, TelegramBot}; + +use serde_derive::{Deserialize, Serialize}; +use tracing::warn; + +use crate::config::Config; + +#[derive(Clone, Debug, Getters, Serialize, Deserialize, PartialEq)] +pub struct Notifier { + radio_name: String, + slack_token: Option, + slack_channel: Option, + discord_webhook: Option, + telegram_token: Option, + telegram_chat_id: Option, +} + +impl Notifier { + pub fn new( + radio_name: String, + slack_token: Option, + slack_channel: Option, + discord_webhook: Option, + telegram_token: Option, + telegram_chat_id: Option, + ) -> Notifier { + Notifier { + radio_name, + slack_token, + slack_channel, + discord_webhook, + telegram_token, + telegram_chat_id, + } + } + + pub fn from_config(config: &Config) -> Self { + let radio_name = config.radio_name.clone(); + let slack_token = config.slack_token.clone(); + let slack_channel = config.slack_channel.clone(); + let discord_webhook = config.discord_webhook.clone(); + let telegram_token = config.telegram_token.clone(); + let telegram_chat_id = config.telegram_chat_id; + + Notifier::new( + radio_name, + slack_token, + slack_channel, + discord_webhook, + telegram_token, + telegram_chat_id, + ) + } + + pub async fn notify(self, content: String) { + if let (Some(token), Some(channel)) = (&self.slack_token, &self.slack_channel) { + if let Err(e) = + SlackBot::send_webhook(token.to_string(), channel, &self.radio_name, &content).await + { + warn!( + err = tracing::field::debug(e), + "Failed to send notification to Slack" + ); + } + } + + if let Some(webhook_url) = self.discord_webhook.clone() { + if let Err(e) = DiscordBot::send_webhook(&webhook_url, &self.radio_name, &content).await + { + warn!( + err = tracing::field::debug(e), + "Failed to send notification to Discord" + ); + } + } + + if let (Some(token), Some(chat_id)) = (self.telegram_token.clone(), self.telegram_chat_id) { + let telegram_bot = TelegramBot::new(token); + if let Err(e) = telegram_bot + .send_message(chat_id, &self.radio_name, &content) + .await + { + warn!( + err = tracing::field::debug(e), + "Failed to send notification to Telegram" + ); + } + } + } +} diff --git a/src/operator/operation.rs b/src/operator/operation.rs new file mode 100644 index 0000000..f17373a --- /dev/null +++ b/src/operator/operation.rs @@ -0,0 +1,511 @@ +use autometrics::autometrics; +use chrono::Utc; +use graphcast_sdk::callbook::CallBook; +use std::cmp::max; +use std::collections::HashMap; +use std::sync::{mpsc, Arc, Mutex as SyncMutex}; + +use tracing::{debug, error, trace, warn}; + +use graphcast_sdk::{ + determine_message_block, + graphcast_agent::{ + message_typing::{BuildMessageError, GraphcastMessage}, + waku_handling::WakuHandlingError, + GraphcastAgent, GraphcastAgentError, + }, + networks::NetworkName, + BlockPointer, NetworkBlockError, NetworkPointer, +}; + +use crate::operator::attestation::process_messages; +use crate::{ + metrics::{CACHED_MESSAGES, VALIDATED_MESSAGES}, + operator::{ + attestation::{ + clear_local_attestation, compare_attestations, local_comparison_point, + save_local_attestation, Attestation, ComparisonResult, + }, + callbook::CallBookRadioExtensions, + RadioOperator, + }, + OperationError, RadioPayloadMessage, GRAPHCAST_AGENT, +}; + +/// Determine the parameters for messages to send and compare +#[autometrics(track_concurrency)] +pub async fn gossip_set_up( + id: String, + network_chainhead_blocks: &HashMap, + subgraph_network_latest_blocks: &HashMap, + local_attestations: Arc>>>, +) -> Result<(NetworkName, BlockPointer, u64), BuildMessageError> { + // Get the indexing network of the deployment + // and update the NETWORK message block + let (network_name, latest_block) = match subgraph_network_latest_blocks.get(&id.clone()) { + Some(network_block) => ( + NetworkName::from_string(&network_block.network.clone()), + network_block.block.clone(), + ), + None => { + let err_msg = format!("Could not query the subgraph's indexing network, check Graph node's indexing statuses of subgraph deployment {}", id.clone()); + warn!( + err = tracing::field::debug(&err_msg), + "Failed to build message" + ); + return Err(BuildMessageError::Network(NetworkBlockError::FailedStatus( + err_msg, + ))); + } + }; + + let message_block = match determine_message_block(network_chainhead_blocks, network_name) { + Ok(block) => block, + Err(e) => return Err(BuildMessageError::Network(e)), + }; + + debug!( + deployment_hash = tracing::field::debug(&id), + network = tracing::field::debug(&network_name), + message_block = message_block, + latest_block = latest_block.number, + message_countdown_blocks = max(0, message_block as i64 - latest_block.number as i64), + topics_waiting_for_next_message_interval = local_attestations + .lock() + .unwrap() + .get(&id.clone()) + .and_then(|blocks| blocks.get(&message_block)) + .is_some(), + "Deployment status", + ); + + Ok((network_name, latest_block, message_block)) +} + +/// Construct the message and send it to Graphcast network +#[autometrics(track_concurrency)] +pub async fn message_send( + id: String, + callbook: CallBook, + message_block: u64, + latest_block: BlockPointer, + network_name: NetworkName, + local_attestations: Arc>>>, + graphcast_agent: &GraphcastAgent, +) -> Result { + trace!( + message_block = message_block, + latest_block = latest_block.number, + "Check message send requirement", + ); + + // Deployment did not sync to message_block + if latest_block.number < message_block { + //TODO: fill in variant in SDK + let err_msg = format!( + "Did not send message for deployment {}: latest_block ({}) syncing status must catch up to the message block ({})", + id.clone(), + latest_block.number, message_block, + ); + trace!(err = err_msg, "Skip send",); + return Err(OperationError::SendTrigger(err_msg)); + }; + + // Message has already been sent + if local_attestations + .lock() + .unwrap() + .get(&id.clone()) + .and_then(|blocks| blocks.get(&message_block)) + .is_some() + { + let err_msg = format!( + "Repeated message for deployment {}, skip sending message for block: {}", + id.clone(), + message_block + ); + trace!(err = err_msg, "Skip send"); + return Err(OperationError::SkipDuplicate(err_msg)); + } + + let block_hash = match graphcast_agent + .callbook + .block_hash(network_name.to_string(), message_block) + .await + { + Ok(hash) => hash, + Err(e) => { + let err_msg = format!("Failed to query graph node for the block hash: {e}"); + warn!(err = err_msg, "Failed to send message"); + return Err(OperationError::Query(e)); + } + }; + + match callbook + .query_poi( + id.clone(), + block_hash.clone(), + message_block.try_into().unwrap(), + ) + .await + { + Ok(content) => { + let radio_message = RadioPayloadMessage::new(id.clone(), content.clone()); + match graphcast_agent + .send_message(id.clone(), network_name, message_block, Some(radio_message)) + .await + { + Ok(msg_id) => { + save_local_attestation( + local_attestations, + content.clone(), + id.clone(), + message_block, + ); + Ok(msg_id) + } + Err(e) => { + error!(err = tracing::field::debug(&e), "Failed to send message"); + Err(OperationError::Agent(e)) + } + } + } + Err(e) => { + error!( + err = tracing::field::debug(&e), + "Failed to query message content" + ); + Err(OperationError::Agent( + GraphcastAgentError::QueryResponseError(e), + )) + } + } +} + +/// Compare validated messages +#[allow(clippy::too_many_arguments)] +#[autometrics(track_concurrency)] +pub async fn message_comparison( + id: String, + collect_window_duration: i64, + registry_subgraph: String, + network_subgraph: String, + messages: Vec>, + local_attestations: Arc>>>, +) -> Result { + let time = Utc::now().timestamp(); + + let (compare_block, collect_window_end) = match local_comparison_point( + Arc::clone(&local_attestations), + id.clone(), + collect_window_duration, + ) { + Some((block, window)) if time >= window => (block, window), + Some((compare_block, window)) => { + let err_msg = format!("Deployment {} comparison not triggered: collecting messages until time {}; currently {time}", id.clone(), window); + debug!(err = err_msg, "Collecting messages",); + return Err(OperationError::CompareTrigger( + id.clone(), + compare_block, + err_msg, + )); + } + _ => { + let err_msg = format!( + "Deployment {} comparison not triggered: no matching attestation to compare", + id.clone() + ); + debug!(err = err_msg, "No matching attestations",); + return Err(OperationError::CompareTrigger(id.clone(), 0, err_msg)); + } + }; + + // Update to only process the identifier&compare_block related messages within the collection window + let filter_msg: Vec> = messages + .iter() + .filter(|&m| m.block_number == compare_block && m.nonce <= collect_window_end) + .cloned() + .collect(); + debug!( + deployment_hash = id, + time, + comparison_time = collect_window_end, + compare_block, + comparison_countdown_seconds = max(0, time - collect_window_end), + number_of_messages_matched_to_compare = filter_msg.len(), + "Comparison state", + ); + let remote_attestations_result = + process_messages(filter_msg, ®istry_subgraph, &network_subgraph).await; + let remote_attestations = match remote_attestations_result { + Ok(remote) => { + debug!(unique_remote_nPOIs = remote.len(), "Processed messages",); + remote + } + Err(err) => { + trace!( + err = tracing::field::debug(&err), + "An error occured while parsing messages", + ); + return Err(OperationError::Attestation(err)); + } + }; + let comparison_result = compare_attestations( + compare_block, + remote_attestations, + Arc::clone(&local_attestations), + &id, + ) + .await; + + Ok(comparison_result) +} + +impl RadioOperator { + /// Custom callback for handling the validated GraphcastMessage, in this case we only save the messages to a local store + /// to process them at a later time. This is required because for the processing we use async operations which are not allowed + /// in the handler. + #[autometrics] + pub fn radio_msg_handler( + sender: SyncMutex>>, + ) -> impl Fn(Result, WakuHandlingError>) { + move |msg: Result, WakuHandlingError>| { + // TODO: Handle the error case by incrementing a Prometheus "error" counter + if let Ok(msg) = msg { + trace!(msg = tracing::field::debug(&msg), "Received message"); + let id = msg.identifier.clone(); + VALIDATED_MESSAGES.with_label_values(&[&id]).inc(); + match sender.lock().unwrap().send(msg) { + Ok(_) => trace!("Sent received message to radio operator"), + Err(e) => error!("Could not send message to channel, {:#?}", e), + }; + + //TODO: Make sure CACHED_MESSAGES is updated + } + } + } + + /// Construct the message and send it to Graphcast network + #[autometrics(track_concurrency)] + pub async fn create_radio_message( + &self, + id: String, + message_block: u64, + latest_block: BlockPointer, + network_name: NetworkName, + // local_attestations: Arc>>>, + // graphcast_agent: &GraphcastAgent, + ) -> Result { + trace!( + message_block = message_block, + latest_block = latest_block.number, + "Check message send requirement", + ); + + // Deployment did not sync to message_block + if latest_block.number < message_block { + //TODO: fill in variant in SDK + let err_msg = format!( + "Did not send message for deployment {}: latest_block ({}) syncing status must catch up to the message block ({})", + id.clone(), + latest_block.number, message_block, + ); + trace!(err = err_msg, "Skip send",); + return Err(OperationError::SendTrigger(err_msg)); + }; + + // Skip messages that has been sent before + if self + .state() + .await + .local_attestations() + .lock() + .unwrap() + .get(&id.clone()) + .and_then(|blocks| blocks.get(&message_block)) + .is_some() + { + let err_msg = format!( + "Repeated message for deployment {}, skip sending message for block: {}", + id.clone(), + message_block + ); + trace!(err = err_msg, "Skip send"); + return Err(OperationError::SkipDuplicate(err_msg)); + } + + let block_hash = match self + .config + .callbook() + .block_hash(network_name.to_string(), message_block) + .await + { + Ok(hash) => hash, + Err(e) => { + let err_msg = format!("Failed to query graph node for the block hash: {e}"); + error!(err = err_msg, "Failed to send message"); + return Err(OperationError::Query(e)); + } + }; + + match self + .config + .callbook() + .query_poi( + id.clone(), + block_hash.clone(), + message_block.try_into().unwrap(), + ) + .await + { + Ok(content) => Ok(RadioPayloadMessage::new(id.clone(), content)), + Err(e) => { + error!( + err = tracing::field::debug(&e), + "Failed to query message content" + ); + Err(OperationError::Agent( + GraphcastAgentError::QueryResponseError(e), + )) + } + } + } + + pub async fn gossip_poi( + &self, + identifiers: Vec, + network_chainhead_blocks: &HashMap, + subgraph_network_latest_blocks: &HashMap, + ) -> Vec> { + let mut send_handles = vec![]; + for id in identifiers.clone() { + /* Set up */ + let local_attestations = + Arc::clone(&self.persisted_state.lock().unwrap().local_attestations()); + let (network_name, latest_block, message_block) = if let Ok(params) = gossip_set_up( + id.clone(), + network_chainhead_blocks, + subgraph_network_latest_blocks, + Arc::clone(&local_attestations), + ) + .await + { + params + } else { + let err_msg = "Failed to set up message parameters".to_string(); + warn!(id, err_msg, "Gossip POI failed"); + continue; + }; + + /* Send message */ + let id_cloned = id.clone(); + + let local = Arc::clone(&local_attestations); + let callbook = self.config.callbook(); + let send_handle = tokio::spawn(async move { + message_send( + id_cloned, + callbook, + message_block, + latest_block, + network_name, + local, + GRAPHCAST_AGENT.get().unwrap(), + ) + .await + }); + + send_handles.push(send_handle); + } + + let mut send_ops = vec![]; + for handle in send_handles { + if let Ok(s) = handle.await { + send_ops.push(s); + } + } + send_ops + } + + pub async fn compare_poi( + &self, + identifiers: Vec, + ) -> Vec> { + let mut compare_handles = vec![]; + let local_attestations = self.state().await.local_attestations(); + let remote_messages = self.state().await.remote_messages(); + for id in identifiers.clone() { + /* Set up */ + let collect_duration: i64 = self.config.collect_message_duration().to_owned(); + let id_cloned = id.clone(); + let registry_subgraph = self.config.registry_subgraph.clone(); + let network_subgraph = self.config.network_subgraph.clone(); + let local = Arc::clone(&local_attestations); + let msgs = remote_messages.lock().unwrap(); + let filtered_msg = msgs + .iter() + .filter(|&m| m.identifier == id.clone()) + .cloned() + .collect(); + + let compare_handle = tokio::spawn(async move { + message_comparison( + id_cloned, + collect_duration, + registry_subgraph.clone(), + network_subgraph.clone(), + filtered_msg, + local, + ) + .await + }); + compare_handles.push(compare_handle); + } + + let mut compare_ops = vec![]; + for handle in compare_handles { + let res = handle.await; + if let Ok(s) = res { + // Skip clean up for comparisonResult for Error and buildFailed + match s { + Ok(r) => { + compare_ops.push(Ok(r.clone())); + + /* Clean up cache */ + // Only clear the ones matching identifier and block number equal or less + // Retain the msgs with a different identifier, or if their block number is greater + let local = Arc::clone(&local_attestations); + clear_local_attestation(local, r.deployment_hash(), r.block()); + self.persisted_state + .lock() + .unwrap() + .remote_messages() + .lock() + .unwrap() + .retain(|msg| { + msg.block_number >= r.block() + || msg.identifier != r.deployment_hash() + }); + CACHED_MESSAGES + .with_label_values(&[&r.deployment_hash()]) + .set( + self.state() + .await + .remote_messages() + .lock() + .unwrap() + .len() + .try_into() + .unwrap(), + ); + } + Err(e) => { + trace!(err = tracing::field::debug(&e), "Compare handles"); + compare_ops.push(Err(e.clone_with_inner())); + } + } + } + } + compare_ops + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 0491b6b..ef7b3ca 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,16 +1,19 @@ use axum::{extract::Extension, routing::get, Router, Server}; - use std::net::SocketAddr; use std::str::FromStr; use std::sync::atomic::AtomicBool; -use std::sync::Arc; -use tokio::sync::Mutex as AsyncMutex; -use tracing::info; - -use crate::attestation::LocalAttestationsMap; -use crate::server::model::{build_schema, POIRadioContext}; -use crate::server::routes::{graphql_handler, graphql_playground, health}; -use crate::{shutdown_signal, CONFIG}; +use std::sync::{Arc, Mutex as SyncMutex}; +use tracing::{debug, info}; + +use crate::{ + config::Config, + server::{ + model::{build_schema, POIRadioContext}, + routes::{graphql_handler, graphql_playground, health}, + }, + shutdown_signal, + state::PersistedState, +}; pub mod model; pub mod routes; @@ -20,25 +23,20 @@ pub mod routes; /// and a versioned GraphQL endpoint at `api/v1/graphql` /// This function starts a API server at the configured server_host and server_port pub async fn run_server( + config: Config, + persisted_state: Arc>, running_program: Arc, - local_attestations: Arc>, ) { - info!("Initializing HTTP server"); - let host = CONFIG - .get() - .unwrap() - .lock() - .unwrap() - .server_host - .clone() - .unwrap_or(String::from("0.0.0.0")); - let port = CONFIG.get().unwrap().lock().unwrap().server_port.unwrap(); - - let context = Arc::new(POIRadioContext::init(Arc::clone(&local_attestations)).await); + if config.server_port().is_none() { + return; + } + let port = config.server_port().unwrap(); + let context = + Arc::new(POIRadioContext::init(config.clone(), Arc::clone(&persisted_state)).await); let schema = build_schema(Arc::clone(&context)).await; - info!(host, port, "API Service starting"); + debug!("Setting up HTTP service"); let app = Router::new() .route("/health", get(health)) @@ -48,8 +46,13 @@ pub async fn run_server( ) .layer(Extension(schema)) .layer(Extension(context)); - let addr = SocketAddr::from_str(&format!("{}:{}", host, port)).expect("Start HTTP Service"); + let addr = SocketAddr::from_str(&format!("{}:{}", config.server_host(), port)) + .expect("Create address"); + info!( + host = tracing::field::debug(config.server_host()), + port, "Bind and serve" + ); Server::bind(&addr) .serve(app.into_make_service()) .with_graceful_shutdown(shutdown_signal(running_program)) diff --git a/src/server/model/mod.rs b/src/server/model/mod.rs index 6719b12..c03b315 100644 --- a/src/server/model/mod.rs +++ b/src/server/model/mod.rs @@ -1,16 +1,18 @@ use async_graphql::{ Context, EmptyMutation, EmptySubscription, InputObject, Object, Schema, SimpleObject, }; -use std::sync::Arc; +use std::sync::{Arc, Mutex as SyncMutex}; use tokio::sync::Mutex as AsyncMutex; use tracing::debug; use crate::{ - attestation::{ + config::Config, + operator::attestation::{ attestations_to_vec, compare_attestations, process_messages, Attestation, AttestationEntry, AttestationError, ComparisonResult, ComparisonResultType, LocalAttestationsMap, }, - RadioPayloadMessage, CONFIG, MESSAGES, + state::PersistedState, + RadioPayloadMessage, }; use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage; @@ -24,21 +26,28 @@ pub struct QueryRoot; impl QueryRoot { async fn radio_payload_messages( &self, - _ctx: &Context<'_>, + ctx: &Context<'_>, ) -> Result>, anyhow::Error> { - Ok(MESSAGES.get().unwrap().lock().unwrap().to_vec()) + let state = ctx + .data_unchecked::>>() + .lock() + .unwrap() + .clone(); + Ok(state.remote_messages().lock().unwrap().clone()) } async fn radio_payload_messages_by_deployment( &self, - _ctx: &Context<'_>, + ctx: &Context<'_>, identifier: String, ) -> Result>, anyhow::Error> { - Ok(MESSAGES - .get() - .unwrap() + let state = ctx + .data_unchecked::>>() .lock() .unwrap() + .clone(); + let msg = state.remote_messages().lock().unwrap().clone(); + Ok(msg .iter() .cloned() .filter(|message| message.identifier == identifier.clone()) @@ -51,9 +60,13 @@ impl QueryRoot { identifier: Option, block: Option, ) -> Result, anyhow::Error> { - let attestations = &ctx.data_unchecked::>>(); + let state = ctx + .data_unchecked::>>() + .lock() + .unwrap() + .clone(); + let attestations = &state.local_attestations(); let filtered = attestations_to_vec(attestations) - .await .into_iter() .filter(|entry| filter_attestations(entry, &identifier, &block)) .collect::>(); @@ -86,7 +99,7 @@ impl QueryRoot { let r = self .comparison_result(ctx, entry.deployment, entry.block_number) .await; - // Return err if just one has err? (ignored for now) + // ignore errored comparison for now if r.is_err() { continue; } @@ -105,31 +118,22 @@ impl QueryRoot { deployment: String, block: u64, ) -> Result { - let local_attestations = &ctx.data_unchecked::>>(); - let filter_msg: Vec> = MESSAGES - .get() - .unwrap() + let state = ctx + .data_unchecked::>>() .lock() - .unwrap() + .await + .clone(); + let msgs = state.remote_messages().lock().unwrap().clone(); + let local_attestations = &state.local_attestations(); + let config = ctx.data_unchecked::(); + let filter_msg: Vec> = msgs .iter() .filter(|&m| m.block_number == block) .cloned() .collect(); - let registry_subgraph = CONFIG - .get() - .unwrap() - .lock() - .unwrap() - .registry_subgraph - .clone(); - let network_subgraph = CONFIG - .get() - .unwrap() - .lock() - .unwrap() - .network_subgraph - .clone(); + let registry_subgraph = config.registry_subgraph.clone(); + let network_subgraph = config.network_subgraph.clone(); let remote_attestations_result = process_messages(filter_msg, ®istry_subgraph, &network_subgraph).await; let remote_attestations = match remote_attestations_result { @@ -244,21 +248,34 @@ pub fn stake_weight_str(attestations: &[Attestation], local_npoi: String) -> Str pub async fn build_schema(ctx: Arc) -> POIRadioSchema { Schema::build(QueryRoot, EmptyMutation, EmptySubscription) - .data(Arc::clone(&ctx.local_attestations)) + .data(Arc::clone(&ctx.persisted_state)) .finish() } pub struct POIRadioContext { - pub local_attestations: Arc>, + pub radio_config: Config, + pub persisted_state: Arc>, } impl POIRadioContext { - pub async fn init(local_attestations: Arc>) -> Self { - Self { local_attestations } + pub async fn init( + radio_config: Config, + persisted_state: Arc>, + ) -> Self { + Self { + radio_config, + persisted_state, + } } pub async fn local_attestations(&self) -> LocalAttestationsMap { - self.local_attestations.lock().await.clone() + self.persisted_state + .lock() + .unwrap() + .local_attestations() + .lock() + .unwrap() + .clone() } } diff --git a/src/server/routes/mod.rs b/src/server/routes/mod.rs index 4d5ac38..d784954 100644 --- a/src/server/routes/mod.rs +++ b/src/server/routes/mod.rs @@ -41,9 +41,7 @@ pub(crate) async fn graphql_handler( trace!("Processing GraphQL request"); - let local = context.local_attestations().await; - - let response = async move { schema.execute(req.into_inner().data(local)).await } + let response = async move { schema.execute(req.into_inner().data(context)).await } .instrument(span.clone()) .await; diff --git a/src/state.rs b/src/state.rs index f1aab3a..4a51a98 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,33 +1,28 @@ +use serde::{Deserialize, Serialize}; +use std::sync::{Arc, Mutex as SyncMutex}; use std::{ collections::HashMap, fs::{remove_file, File}, io::{BufReader, Write}, }; +use tracing::warn; use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage; -use serde::{Deserialize, Serialize}; -use std::sync::{Arc, Mutex as SyncMutex}; -use tokio::sync::Mutex as AsyncMutex; -use tracing::warn; - -use crate::{attestation::Attestation, RadioPayloadMessage}; +use crate::{operator::attestation::Attestation, RadioPayloadMessage}; -type Local = Arc>>>; +type Local = Arc>>>; type Remote = Arc>>>; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct PersistedState { - local_attestations: HashMap>, - remote_messages: Arc>>>, + local_attestations: Local, + remote_messages: Remote, } impl PersistedState { - pub fn new( - local: Option>>, - remote: Option>>>>, - ) -> PersistedState { - let local_attestations = local.unwrap_or(HashMap::new()); + pub fn new(local: Option, remote: Option) -> PersistedState { + let local_attestations = local.unwrap_or(Arc::new(SyncMutex::new(HashMap::new()))); let remote_messages = remote.unwrap_or(Arc::new(SyncMutex::new(vec![]))); PersistedState { @@ -38,15 +33,14 @@ impl PersistedState { /// Optional updates for either local_attestations or remote_messages without requiring either to be in-scope pub async fn update( - &self, + &mut self, local_attestations: Option, remote_messages: Option, ) -> PersistedState { - let local_attestations: HashMap> = - match local_attestations { - None => self.local_attestations.clone(), - Some(l) => l.lock().await.clone(), - }; + let local_attestations = match local_attestations { + None => self.local_attestations.clone(), + Some(l) => l, + }; let remote_messages = match remote_messages { None => self.remote_messages.clone(), Some(r) => r, @@ -57,14 +51,29 @@ impl PersistedState { } } + /// Updates for local_attestations + pub async fn update_local(&mut self, local_attestations: Local) { + self.local_attestations = local_attestations; + } + + /// Updates for remote_messages + pub async fn update_remote(&mut self, remote_messages: Remote) { + self.remote_messages = remote_messages; + } + + /// Updates for remote_messages + pub async fn add_remote_message(&mut self, msg: GraphcastMessage) { + self.remote_messages.lock().unwrap().push(msg) + } + /// Getter for local_attestations - pub fn local_attestations(&self) -> HashMap> { + pub fn local_attestations(&self) -> Arc>>> { self.local_attestations.clone() } /// Getter for one local_attestation pub fn local_attestation(&self, deployment: String, block_number: u64) -> Option { - match self.local_attestations.get(&deployment) { + match self.local_attestations.lock().unwrap().get(&deployment) { None => None, Some(blocks_map) => blocks_map.get(&block_number).cloned(), } @@ -135,7 +144,7 @@ impl PersistedState { mod tests { use graphcast_sdk::networks::NetworkName; - use crate::attestation::save_local_attestation; + use crate::operator::attestation::save_local_attestation; use super::*; @@ -146,34 +155,31 @@ mod tests { PersistedState::delete_cache(path); let mut state = PersistedState::load_cache(path); - assert!(state.local_attestations.is_empty()); + assert!(state.local_attestations.lock().unwrap().is_empty()); assert!(state.remote_messages.lock().unwrap().is_empty()); - let local_attestations = Arc::new(AsyncMutex::new(HashMap::new())); + let local_attestations = Arc::new(SyncMutex::new(HashMap::new())); let messages = Arc::new(SyncMutex::new(Vec::new())); save_local_attestation( local_attestations.clone(), "npoi-x".to_string(), "0xa1".to_string(), 0, - ) - .await; + ); save_local_attestation( local_attestations.clone(), "npoi-y".to_string(), "0xa1".to_string(), 1, - ) - .await; + ); save_local_attestation( local_attestations.clone(), "npoi-z".to_string(), "0xa2".to_string(), 2, - ) - .await; + ); let hash: String = "QmWECgZdP2YMcV9RtKU41GxcdW8EGYqMNoG98ubu5RGN6U".to_string(); let content: String = @@ -202,13 +208,33 @@ mod tests { let state = PersistedState::load_cache(path); assert!(state.remote_messages.lock().unwrap().len() == 1); - assert!(!state.local_attestations.is_empty()); - assert!(state.local_attestations.len() == 2); - assert!(state.local_attestations.get("0xa1").unwrap().len() == 2); - assert!(state.local_attestations.get("0xa2").unwrap().len() == 1); + assert!(!state.local_attestations.lock().unwrap().is_empty()); + assert!(state.local_attestations.lock().unwrap().len() == 2); assert!( state .local_attestations + .lock() + .unwrap() + .get("0xa1") + .unwrap() + .len() + == 2 + ); + assert!( + state + .local_attestations + .lock() + .unwrap() + .get("0xa2") + .unwrap() + .len() + == 1 + ); + assert!( + state + .local_attestations + .lock() + .unwrap() .get("0xa1") .unwrap() .get(&0)