diff --git a/Cargo.toml b/Cargo.toml index 9146e2f..73dceb0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,12 +16,14 @@ once_cell = "1.15" chrono = "0.4" serde = "1.0.147" serde_json = "1.0.87" +sha3 = "0.10.6" tokio = { version = "1.1.1", features = ["full"] } anyhow = "1.0.69" graphql_client = "0.9.0" serde_derive = "1.0.114" reqwest = { version = "0.11.0", features = ["json"] } ethers = "2.0.0" +thiserror = "1.0.40" regex = "1.7.1" ethers-contract = "2.0.0" ethers-core = "2.0.0" diff --git a/src/attestation.rs b/src/attestation.rs new file mode 100644 index 0000000..acf537b --- /dev/null +++ b/src/attestation.rs @@ -0,0 +1,741 @@ +use num_bigint::BigUint; +use sha3::{Digest, Sha3_256}; +use std::{ + collections::HashMap, + fmt::{self, Display}, + sync::Arc, +}; +use tokio::sync::Mutex as AsyncMutex; +use tracing::{debug, error, info, warn}; + +use graphcast_sdk::{ + graphcast_agent::message_typing::{get_indexer_stake, BuildMessageError, GraphcastMessage}, + graphql::client_registry::query_registry_indexer, + networks::NetworkName, +}; + +use crate::RadioPayloadMessage; + +/// A wrapper around an attested NPOI, tracks Indexers that have sent it plus their accumulated stake +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct Attestation { + pub npoi: String, + pub stake_weight: BigUint, + pub senders: Vec, + pub sender_group_hash: String, + pub timestamp: Vec, +} + +impl Attestation { + pub fn new( + npoi: String, + stake_weight: BigUint, + senders: Vec, + timestamp: Vec, + ) -> Self { + let addresses = &mut senders.clone(); + sort_addresses(addresses); + let sender_group_hash = hash_addresses(addresses); + Attestation { + npoi, + stake_weight, + senders, + sender_group_hash, + timestamp, + } + } + + /// Used whenever we receive a new attestation for an NPOI that already exists in the store + pub fn update( + base: &Self, + address: String, + stake: BigUint, + timestamp: i64, + ) -> Result { + if base.senders.contains(&address) { + Err(AttestationError::UpdateError( + "There is already an attestation from this address. Skipping...".to_string(), + )) + } else { + Ok(Self::new( + base.npoi.clone(), + base.stake_weight.clone() + stake, + [base.senders.clone(), vec![address]].concat(), + [base.timestamp.clone(), vec![timestamp]].concat(), + )) + } + } +} + +impl fmt::Display for Attestation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "NPOI: {}\nsender addresses: {:#?}\nstake weight: {}", + self.npoi, self.senders, self.stake_weight + ) + } +} + +pub type RemoteAttestationsMap = HashMap>>; +pub type LocalAttestationsMap = HashMap>; + +/// This function processes the global messages map that we populate when +/// messages are being received. It constructs the remote attestations +/// map and returns it if the processing succeeds. +pub async fn process_messages( + messages: Arc>>>, + registry_subgraph: &str, + network_subgraph: &str, +) -> Result { + let mut remote_attestations: RemoteAttestationsMap = HashMap::new(); + let messages = AsyncMutex::new(messages.lock().await); + + for msg in messages.lock().await.iter() { + let radio_msg = &msg.payload.clone().unwrap(); + let sender = msg + .recover_sender_address() + .map_err(AttestationError::BuildError)?; + let indexer_address = query_registry_indexer(registry_subgraph.to_string(), sender.clone()) + .await + .map_err(|e| AttestationError::BuildError(BuildMessageError::FieldDerivations(e)))?; + let sender_stake = get_indexer_stake(indexer_address.clone(), network_subgraph) + .await + .map_err(|e| AttestationError::BuildError(BuildMessageError::FieldDerivations(e)))?; + + //TODO: update this to utilize update_blocks? + // Check if there are existing attestations for the block + let blocks = remote_attestations + .entry(msg.identifier.to_string()) + .or_default(); + let attestations = blocks.entry(msg.block_number).or_default(); + + let existing_attestation = attestations + .iter_mut() + .find(|a| a.npoi == radio_msg.payload_content()); + + match existing_attestation { + Some(existing_attestation) => { + Attestation::update( + existing_attestation, + indexer_address, + sender_stake, + msg.nonce, + )?; + } + None => { + // Unwrap is okay because bytes (Vec) is a valid utf-8 sequence + attestations.push(Attestation::new( + radio_msg.payload_content().to_string(), + sender_stake, + vec![indexer_address], + vec![msg.nonce], + )); + } + } + } + Ok(remote_attestations) +} + +/// Determine the comparison pointer on both block and time based on the local attestations +/// If they don't exist, then return default value that shall never be validated to trigger +pub async fn local_comparison_point( + local_attestations: Arc>, + id: String, + collect_window_duration: i64, +) -> (u64, i64) { + let local_attestation = local_attestations.lock().await; + if let Some(blocks_map) = local_attestation.get(&id) { + // Find the attestaion by the smallest block + blocks_map + .iter() + .min_by_key(|(&min_block, attestation)| { + // unwrap is okay because we add timestamp at local creation of attestation + (min_block, *attestation.timestamp.first().unwrap()) + }) + .map(|(&block, a)| { + ( + block, + *a.timestamp.first().unwrap() + collect_window_duration, + ) + }) + .unwrap_or((0_u64, i64::MAX)) + } else { + (0_u64, i64::MAX) + } +} + +/// Updates the `blocks` HashMap to include the new attestation. +pub fn update_blocks( + block_number: u64, + blocks: &HashMap>, + npoi: String, + stake: BigUint, + address: String, + timestamp: i64, +) -> HashMap> { + let mut blocks_clone: HashMap> = HashMap::new(); + blocks_clone.extend(blocks.clone()); + blocks_clone.insert( + block_number, + vec![Attestation::new( + npoi, + stake, + vec![address], + vec![timestamp], + )], + ); + blocks_clone +} + +/// Saves NPOIs that we've generated locally, in order to compare them with remote ones later +pub fn save_local_attestation( + local_attestations: &mut LocalAttestationsMap, + attestation: Attestation, + ipfs_hash: String, + block_number: u64, +) { + let blocks = local_attestations.get(&ipfs_hash); + + match blocks { + Some(blocks) => { + let mut blocks_clone: HashMap = HashMap::new(); + blocks_clone.extend(blocks.clone()); + blocks_clone.insert(block_number, attestation); + local_attestations.insert(ipfs_hash, blocks_clone); + } + None => { + let mut blocks_clone: HashMap = HashMap::new(); + blocks_clone.insert(block_number, attestation); + local_attestations.insert(ipfs_hash, blocks_clone); + } + } +} + +/// Clear the expired local attesatoins +pub fn clear_local_attestation( + local_attestations: &mut LocalAttestationsMap, + ipfs_hash: String, + block_number: u64, +) { + let blocks = local_attestations.get(&ipfs_hash); + + if let Some(blocks) = blocks { + let mut blocks_clone: HashMap = HashMap::new(); + blocks_clone.extend(blocks.clone()); + blocks_clone.remove(&block_number); + local_attestations.insert(ipfs_hash, blocks_clone); + } +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub enum ComparisonResult { + NotFound(String), + Divergent(String), + Match(String), +} + +impl Display for ComparisonResult { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ComparisonResult::NotFound(s) => write!(f, "NotFound: {s}"), + ComparisonResult::Divergent(s) => write!(f, "Divergent: {s}"), + ComparisonResult::Match(s) => write!(f, "Matched: {s}"), + } + } +} + +/// Compares local attestations against remote ones using the attestation stores we populated while processing saved GraphcastMessage messages. +/// It takes our attestation (NPOI) for a given subgraph on a given block and compares it to the top-attested one from the remote attestations. +/// The top remote attestation is found by grouping attestations together and increasing their total stake-weight every time we see a new message +/// with the same NPOI from an Indexer (NOTE: one Indexer can only send 1 attestation per subgraph per block). The attestations are then sorted +/// and we take the one with the highest total stake-weight. +pub async fn compare_attestations( + network_name: NetworkName, + attestation_block: u64, + remote: RemoteAttestationsMap, + local: Arc>, + ipfs_hash: &str, +) -> Result { + debug!( + "Comparing attestations:\nlocal: {:#?}\n remote: {:#?}", + local, remote + ); + + let local = local.lock().await; + + let blocks = match local.get(ipfs_hash) { + Some(blocks) => blocks, + None => { + return Ok(ComparisonResult::NotFound(String::from( + "No local attestation found", + ))) + } + }; + let local_attestation = match blocks.get(&attestation_block) { + Some(attestations) => attestations, + None => { + return Ok(ComparisonResult::NotFound(format!( + "No local attestation found for block {attestation_block}" + ))) + } + }; + + let remote_blocks = match remote.get(ipfs_hash) { + Some(blocks) => blocks, + None => { + return Ok(ComparisonResult::NotFound(format!( + "No remote attestation found for subgraph {ipfs_hash}" + ))) + } + }; + let remote_attestations = match remote_blocks.get(&attestation_block) { + Some(attestations) => attestations, + None => { + return Ok(ComparisonResult::NotFound(format!( + "No remote attestation found for subgraph {ipfs_hash} on block {attestation_block}" + ))) + } + }; + + let mut remote_attestations = remote_attestations.clone(); + remote_attestations.sort_by(|a, b| a.stake_weight.partial_cmp(&b.stake_weight).unwrap()); + + if remote_attestations.len() > 1 { + warn!( + "More than 1 nPOI found for subgraph {} on block {}. Attestations (sorted): {:#?}", + ipfs_hash, attestation_block, remote_attestations + ); + } + + let most_attested_npoi = &remote_attestations.last().unwrap().npoi; + if most_attested_npoi == &local_attestation.npoi { + info!( + "nPOI matched for subgraph {} on block {} with {} of remote attestations", + ipfs_hash, + attestation_block, + remote_attestations.len(), + ); + Ok(ComparisonResult::Match(format!( + "POIs match for subgraph {ipfs_hash} on block {attestation_block}!: {most_attested_npoi}" + ))) + } else { + info!( + "Number of nPOI submitted for block {}: {:#?}\n{}: {:#?}", + attestation_block, remote_attestations, "Local attestation", local_attestation + ); + Ok(ComparisonResult::Divergent(format!( + "❗ POIs don't match for subgraph {ipfs_hash} on network {network_name} at block {attestation_block}!\n\nLocal attestation:\n{local_attestation:#?}\n\nRemote attestations:\n{remote_attestations:#?}" + ))) + } +} + +/// Deterministically sort addresses +fn sort_addresses(addresses: &mut [String]) { + addresses.sort_by(|a, b| { + let bytes_a = hex::decode(&a[2..]).unwrap(); + let bytes_b = hex::decode(&b[2..]).unwrap(); + bytes_a.cmp(&bytes_b) + }); +} + +/// Deterministically ordering the indexer addresses attesting to a nPOI, and then hashing that list +fn hash_addresses(addresses: &[String]) -> String { + // create a SHA3-256 object + let mut hasher = Sha3_256::new(); + // iteratively decode addresses to bytes + let mut bytes = Vec::new(); + for address in addresses { + let addr = address[2..].to_string(); + bytes.extend(hex::decode(addr).unwrap()); + } + + // write input message + hasher.update(&bytes); + // read hash digest + let result = hasher.finalize(); + hex::encode(result) +} + +#[derive(Debug, thiserror::Error)] +pub enum AttestationError { + #[error("Failed to build attestation: {0}")] + BuildError(BuildMessageError), + #[error("Failed to update attestation: {0}")] + UpdateError(String), +} + +#[cfg(test)] +mod tests { + use super::*; + use num_traits::One; + + #[test] + fn test_update_blocks() { + let mut blocks: HashMap> = HashMap::new(); + blocks.insert( + 42, + vec![Attestation::new( + "default".to_string(), + BigUint::default(), + Vec::new(), + Vec::new(), + )], + ); + let block_clone = update_blocks( + 42, + &blocks, + "awesome-npoi".to_string(), + BigUint::default(), + "0xadd3".to_string(), + 1, + ); + + assert_eq!( + block_clone.get(&42).unwrap().first().unwrap().npoi, + "awesome-npoi".to_string() + ); + } + + #[test] + fn test_sort_sender_addresses_unique() { + let attestation = Attestation::new( + "awesome-npoi".to_string(), + BigUint::one(), + vec!["0xaac5349585cbbf924026d25a520ffa9e8b51a39b".to_string()], + vec![1], + ); + let attestation2 = Attestation::new( + "awesome-npoi".to_string(), + BigUint::one(), + vec!["0xbbc5349585cbbf924026d25a520ffa9e8b51a39b".to_string()], + vec![1], + ); + assert_ne!( + attestation2.sender_group_hash, + attestation.sender_group_hash + ); + } + + #[test] + fn test_sort_sender_addresses() { + let attestation = Attestation::new( + "awesome-npoi".to_string(), + BigUint::one(), + vec![ + "0xaac5349585cbbf924026d25a520ffa9e8b51a39b".to_string(), + "0xbbc5349585cbbf924026d25a520ffa9e8b51a39b".to_string(), + ], + vec![1, 2], + ); + let attestation2 = Attestation::new( + "awesome-npoi".to_string(), + BigUint::one(), + vec![ + "0xbbc5349585cbbf924026d25a520ffa9e8b51a39b".to_string(), + "0xaac5349585cbbf924026d25a520ffa9e8b51a39b".to_string(), + ], + vec![1, 2], + ); + assert_eq!( + attestation2.sender_group_hash, + attestation.sender_group_hash + ); + } + + #[test] + fn test_attestation_sorting() { + let attestation1 = Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + vec!["0xa1".to_string()], + vec![0], + ); + + let attestation2 = Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + vec!["0xa2".to_string()], + vec![1], + ); + + let attestation3 = Attestation::new( + "awesome-npoi".to_string(), + BigUint::one(), + vec!["0xa3".to_string()], + vec![2], + ); + + let mut attestations = vec![attestation1, attestation2, attestation3]; + + attestations.sort_by(|a, b| a.stake_weight.partial_cmp(&b.stake_weight).unwrap()); + + assert_eq!(attestations.last().unwrap().stake_weight, BigUint::one()); + assert_eq!( + attestations.last().unwrap().senders.first().unwrap(), + &"0xa3".to_string() + ); + assert_eq!(attestations.last().unwrap().timestamp, vec![2]); + } + + #[test] + fn test_attestation_update_success() { + let attestation = Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + vec!["0xa1".to_string()], + vec![2], + ); + + let updated_attestation = + Attestation::update(&attestation, "0xa2".to_string(), BigUint::one(), 1); + + assert!(updated_attestation.is_ok()); + assert_eq!( + updated_attestation.as_ref().unwrap().stake_weight, + BigUint::one() + ); + assert_eq!(updated_attestation.unwrap().timestamp, [2, 1]); + } + + #[test] + fn test_attestation_update_fail() { + let attestation = Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + vec!["0xa1".to_string()], + vec![0], + ); + + let updated_attestation = + Attestation::update(&attestation, "0xa1".to_string(), BigUint::default(), 0); + + assert!(updated_attestation.is_err()); + assert_eq!( + updated_attestation.unwrap_err().to_string(), + "Failed to update attestation: There is already an attestation from this address. Skipping...".to_string() + ); + } + + #[tokio::test] + async fn test_compare_attestations_generic_fail() { + let res = compare_attestations( + NetworkName::Goerli, + 42, + HashMap::new(), + Arc::new(AsyncMutex::new(HashMap::new())), + "non-existent-ipfs-hash", + ) + .await; + + assert!(res.is_ok()); + assert_eq!( + res.unwrap().to_string(), + "NotFound: No local attestation found".to_string() + ); + } + + #[tokio::test] + async fn test_compare_attestations_remote_not_found_fail() { + let mut remote_blocks: HashMap> = HashMap::new(); + let mut local_blocks: HashMap = HashMap::new(); + + remote_blocks.insert( + 42, + vec![Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + vec!["0xa1".to_string()], + vec![1], + )], + ); + + local_blocks.insert( + 42, + Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + Vec::new(), + vec![0], + ), + ); + + let mut remote_attestations: HashMap>> = + HashMap::new(); + let mut local_attestations: HashMap> = HashMap::new(); + + remote_attestations.insert("my-awesome-hash".to_string(), remote_blocks); + local_attestations.insert("different-awesome-hash".to_string(), local_blocks); + + let res = compare_attestations( + NetworkName::Goerli, + 42, + remote_attestations, + Arc::new(AsyncMutex::new(local_attestations)), + "different-awesome-hash", + ) + .await; + + assert!(res.is_ok()); + assert_eq!( + res.unwrap().to_string(), + "NotFound: No remote attestation found for subgraph different-awesome-hash".to_string() + ); + } + + #[tokio::test] + async fn test_compare_attestations_local_not_found_fail() { + let remote_blocks: HashMap> = HashMap::new(); + let local_blocks: HashMap = HashMap::new(); + + let mut remote_attestations: HashMap>> = + HashMap::new(); + let mut local_attestations: HashMap> = HashMap::new(); + + remote_attestations.insert("my-awesome-hash".to_string(), remote_blocks); + local_attestations.insert("my-awesome-hash".to_string(), local_blocks); + + let res = compare_attestations( + NetworkName::Goerli, + 42, + remote_attestations, + Arc::new(AsyncMutex::new(local_attestations)), + "my-awesome-hash", + ) + .await; + + assert!(res.is_ok()); + assert_eq!( + res.unwrap().to_string(), + "NotFound: No local attestation found for block 42".to_string() + ); + } + + #[tokio::test] + async fn test_compare_attestations_success() { + let mut remote_blocks: HashMap> = HashMap::new(); + let mut local_blocks: HashMap = HashMap::new(); + + remote_blocks.insert( + 42, + vec![Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + vec!["0xa1".to_string()], + vec![0], + )], + ); + + local_blocks.insert( + 42, + Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + Vec::new(), + vec![0], + ), + ); + + let mut remote_attestations: HashMap>> = + HashMap::new(); + let mut local_attestations: HashMap> = HashMap::new(); + + remote_attestations.insert("my-awesome-hash".to_string(), remote_blocks); + local_attestations.insert("my-awesome-hash".to_string(), local_blocks); + + let res = compare_attestations( + NetworkName::Goerli, + 42, + remote_attestations, + Arc::new(AsyncMutex::new(local_attestations)), + "my-awesome-hash", + ) + .await; + + assert!(res.is_ok()); + assert_eq!( + res.unwrap(), + ComparisonResult::Match( + "POIs match for subgraph my-awesome-hash on block 42!: awesome-npoi".to_string() + ) + ); + } + + #[test] + fn clear_local_attestation_success() { + let mut local_blocks: HashMap = HashMap::new(); + let attestation1 = Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + vec!["0xa1".to_string()], + vec![0], + ); + + let attestation2 = Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + vec!["0xa2".to_string()], + vec![1], + ); + + let attestation3 = Attestation::new( + "awesome-npoi".to_string(), + BigUint::one(), + vec!["0xa3".to_string()], + vec![2], + ); + + local_blocks.insert(42, attestation1); + local_blocks.insert(43, attestation2); + local_blocks.insert(44, attestation3); + + let mut local_attestations: HashMap> = HashMap::new(); + local_attestations.insert("hash".to_string(), local_blocks.clone()); + local_attestations.insert("hash2".to_string(), local_blocks); + + clear_local_attestation(&mut local_attestations, "hash".to_string(), 43); + + assert_eq!(local_attestations.get("hash").unwrap().len(), 2); + assert!(local_attestations.get("hash").unwrap().get(&43).is_none()); + assert_eq!(local_attestations.get("hash2").unwrap().len(), 3); + } + + #[tokio::test] + async fn local_attestation_pointer_success() { + let mut local_blocks: HashMap = HashMap::new(); + let attestation1 = Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + vec!["0xa1".to_string()], + vec![2], + ); + + let attestation2 = Attestation::new( + "awesome-npoi".to_string(), + BigUint::default(), + vec!["0xa2".to_string()], + vec![4], + ); + + let attestation3 = Attestation::new( + "awesome-npoi".to_string(), + BigUint::one(), + vec!["0xa3".to_string()], + vec![6], + ); + + local_blocks.insert(42, attestation1); + local_blocks.insert(43, attestation2); + local_blocks.insert(44, attestation3); + + let mut local_attestations: HashMap> = HashMap::new(); + local_attestations.insert("hash".to_string(), local_blocks.clone()); + local_attestations.insert("hash2".to_string(), local_blocks); + let local = Arc::new(AsyncMutex::new(local_attestations)); + let (block_num, collect_window_end) = + local_comparison_point(Arc::clone(&local), "hash".to_string(), 120).await; + + assert_eq!(block_num, 42); + assert_eq!(collect_window_end, 122); + } +} diff --git a/src/lib.rs b/src/lib.rs index 77d5503..0d684e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,35 +1,38 @@ -use anyhow::anyhow; use ethers_contract::EthAbiType; use ethers_core::types::transaction::eip712::Eip712; use ethers_derive_eip712::*; -use num_bigint::BigUint; -use num_traits::Zero; use once_cell::sync::OnceCell; use prost::Message; use serde::{Deserialize, Serialize}; use std::{ collections::{HashMap, HashSet}, - fmt::{self, Display}, sync::{Arc, Mutex as SyncMutex}, }; -use tokio::sync::Mutex as AsyncMutex; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error}; -use graphcast_sdk::config::CoverageLevel; +use graphcast_sdk::{config::CoverageLevel, graphql::client_graph_node::get_indexing_statuses}; use graphcast_sdk::{ graphcast_agent::{ - message_typing::{get_indexer_stake, GraphcastMessage}, - waku_handling::WakuHandlingError, - GraphcastAgent, - }, - graphql::{ - client_graph_node::get_indexing_statuses, client_network::query_network_subgraph, - client_registry::query_registry_indexer, + message_typing::GraphcastMessage, waku_handling::WakuHandlingError, GraphcastAgent, }, + graphql::client_network::query_network_subgraph, networks::NetworkName, BlockPointer, }; +pub mod attestation; + +/// A global static (singleton) instance of GraphcastAgent. It is useful to ensure that we have only one GraphcastAgent +/// per Radio instance, so that we can keep track of state and more easily test our Radio application. +pub static GRAPHCAST_AGENT: OnceCell = OnceCell::new(); + +/// A global static (singleton) instance of A GraphcastMessage vector. +/// It is used to save incoming messages after they've been validated, in order +/// defer their processing for later, because async code is required for the processing but +/// it is not allowed in the handler itself. +pub static MESSAGES: OnceCell>>>> = + OnceCell::new(); + #[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize)] #[eip712( name = "Graphcast POI Radio", @@ -57,41 +60,18 @@ impl RadioPayloadMessage { } } -pub type RemoteAttestationsMap = HashMap>>; -pub type LocalAttestationsMap = HashMap>; - -/// A global static (singleton) instance of GraphcastAgent. It is useful to ensure that we have only one GraphcastAgent -/// per Radio instance, so that we can keep track of state and more easily test our Radio application. -pub static GRAPHCAST_AGENT: OnceCell = OnceCell::new(); - -/// A global static (singleton) instance of A GraphcastMessage vector. -/// It is used to save incoming messages after they've been validated, in order -/// defer their processing for later, because async code is required for the processing but -/// it is not allowed in the handler itself. -pub static MESSAGES: OnceCell>>>> = - OnceCell::new(); - -/// Updates the `blocks` HashMap to include the new attestation. -pub fn update_blocks( - block_number: u64, - blocks: &HashMap>, - npoi: String, - stake: BigUint, - address: String, - timestamp: i64, -) -> HashMap> { - let mut blocks_clone: HashMap> = HashMap::new(); - blocks_clone.extend(blocks.clone()); - blocks_clone.insert( - block_number, - vec![Attestation::new( - npoi, - stake, - vec![address], - vec![timestamp], - )], - ); - blocks_clone +/// Custom callback for handling the validated GraphcastMessage, in this case we only save the messages to a local store +/// to process them at a later time. This is required because for the processing we use async operations which are not allowed +/// in the handler. +pub fn radio_msg_handler( +) -> impl Fn(Result, WakuHandlingError>) { + |msg: Result, WakuHandlingError>| { + // TODO: Handle the error case by incrementing a Prometheus "error" counter + if let Ok(msg) = msg { + debug!("Received message: {:?}", msg); + MESSAGES.get().unwrap().lock().unwrap().push(msg); + } + } } /// Generate default topics that is operator address resolved to indexer address @@ -172,310 +152,6 @@ pub async fn generate_topics( } } -/// This function processes the global messages map that we populate when -/// messages are being received. It constructs the remote attestations -/// map and returns it if the processing succeeds. -pub async fn process_messages( - messages: Arc>>>, - registry_subgraph: &str, - network_subgraph: &str, -) -> Result { - let mut remote_attestations: RemoteAttestationsMap = HashMap::new(); - let messages = AsyncMutex::new(messages.lock().await); - - for msg in messages.lock().await.iter() { - let radio_msg = &msg.payload.clone().unwrap(); - let sender = msg.recover_sender_address()?; - let indexer_address = - query_registry_indexer(registry_subgraph.to_string(), sender.clone()).await?; - // If the indexer has active allocation on that topic then use their stake, otherwise simply include the msg with no stake weight - let indexer_allocations = query_network_subgraph( - network_subgraph.to_string().clone(), - indexer_address.clone(), - ) - .await? - .indexer_allocations(); - let sender_stake = if indexer_allocations.contains(&msg.identifier) { - get_indexer_stake(indexer_address.clone(), network_subgraph).await? - } else { - Zero::zero() - }; - - // Check if there are existing attestations for the block - let blocks = remote_attestations - .entry(msg.identifier.to_string()) - .or_default(); - let attestations = blocks.entry(msg.block_number).or_default(); - - let existing_attestation = attestations - .iter_mut() - .find(|a| a.npoi == radio_msg.payload_content()); - - match existing_attestation { - Some(existing_attestation) => { - existing_attestation.stake_weight += sender_stake; - if !existing_attestation - .senders - .contains(&indexer_address.clone()) - { - existing_attestation.senders.push(indexer_address.clone()); - } - } - None => { - attestations.push(Attestation::new( - radio_msg.payload_content().to_string(), - sender_stake, - vec![indexer_address], - vec![msg.nonce], - )); - } - } - } - Ok(remote_attestations) -} - -/// Determine the comparison pointer on both block and time based on the local attestations -/// If they don't exist, then return default value that shall never be validated to trigger -pub async fn local_comparison_point( - local_attestations: Arc>, - id: String, - collect_window_duration: i64, -) -> (u64, i64) { - let local_attestation = local_attestations.lock().await; - if let Some(blocks_map) = local_attestation.get(&id) { - // Find the attestaion by the smallest block - blocks_map - .iter() - .min_by_key(|(&min_block, attestation)| { - // unwrap is okay because we add timestamp at local creation of attestation - (min_block, *attestation.timestamp.first().unwrap()) - }) - .map(|(&block, a)| { - ( - block, - *a.timestamp.first().unwrap() + collect_window_duration, - ) - }) - .unwrap_or((0_u64, i64::MAX)) - } else { - (0_u64, i64::MAX) - } -} - -/// A wrapper around an attested NPOI, tracks Indexers that have sent it plus their accumulated stake -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct Attestation { - pub npoi: String, - pub stake_weight: BigUint, - pub senders: Vec, - pub timestamp: Vec, -} - -impl Attestation { - pub fn new( - npoi: String, - stake_weight: BigUint, - senders: Vec, - timestamp: Vec, - ) -> Self { - Attestation { - npoi, - stake_weight, - senders, - timestamp, - } - } - - /// Used whenever we receive a new attestation for an NPOI that already exists in the store - pub fn update( - base: &Self, - address: String, - stake: BigUint, - timestamp: i64, - ) -> Result { - if base.senders.contains(&address) { - Err(anyhow!( - "{}", - "There is already an attestation from this address. Skipping...".to_string() - )) - } else { - Ok(Self::new( - base.npoi.clone(), - base.stake_weight.clone() + stake, - [base.senders.clone(), vec![address]].concat(), - [base.timestamp.clone(), vec![timestamp]].concat(), - )) - } - } -} - -impl fmt::Display for Attestation { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "NPOI: {}\nsender addresses: {:#?}\nstake weight: {}", - self.npoi, self.senders, self.stake_weight - ) - } -} - -/// Saves NPOIs that we've generated locally, in order to compare them with remote ones later -pub fn save_local_attestation( - local_attestations: &mut LocalAttestationsMap, - attestation: Attestation, - ipfs_hash: String, - block_number: u64, -) { - let blocks = local_attestations.get(&ipfs_hash); - - match blocks { - Some(blocks) => { - let mut blocks_clone: HashMap = HashMap::new(); - blocks_clone.extend(blocks.clone()); - blocks_clone.insert(block_number, attestation); - local_attestations.insert(ipfs_hash, blocks_clone); - } - None => { - let mut blocks_clone: HashMap = HashMap::new(); - blocks_clone.insert(block_number, attestation); - local_attestations.insert(ipfs_hash, blocks_clone); - } - } -} - -/// Clear the expired local attesatoins -pub fn clear_local_attestation( - local_attestations: &mut LocalAttestationsMap, - ipfs_hash: String, - block_number: u64, -) { - let blocks = local_attestations.get(&ipfs_hash); - - if let Some(blocks) = blocks { - let mut blocks_clone: HashMap = HashMap::new(); - blocks_clone.extend(blocks.clone()); - blocks_clone.remove(&block_number); - local_attestations.insert(ipfs_hash, blocks_clone); - } -} - -/// Custom callback for handling the validated GraphcastMessage, in this case we only save the messages to a local store -/// to process them at a later time. This is required because for the processing we use async operations which are not allowed -/// in the handler. -pub fn attestation_handler( -) -> impl Fn(Result, WakuHandlingError>) { - |msg: Result, WakuHandlingError>| { - // TODO: Handle the error case by incrementing a Prometheus "error" counter - if let Ok(msg) = msg { - debug!("Received message: {:?}", msg); - MESSAGES.get().unwrap().lock().unwrap().push(msg); - } - } -} - -#[derive(Debug, PartialEq, Eq, Hash)] -pub enum ComparisonResult { - NotFound(String), - Divergent(String), - Match(String), -} - -impl Display for ComparisonResult { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - ComparisonResult::NotFound(s) => write!(f, "NotFound: {s}"), - ComparisonResult::Divergent(s) => write!(f, "Divergent: {s}"), - ComparisonResult::Match(s) => write!(f, "Matched: {s}"), - } - } -} - -/// Compares local attestations against remote ones using the attestation stores we populated while processing saved GraphcastMessage messages. -/// It takes our attestation (NPOI) for a given subgraph on a given block and compares it to the top-attested one from the remote attestations. -/// The top remote attestation is found by grouping attestations together and increasing their total stake-weight every time we see a new message -/// with the same NPOI from an Indexer (NOTE: one Indexer can only send 1 attestation per subgraph per block). The attestations are then sorted -/// and we take the one with the highest total stake-weight. -pub async fn compare_attestations( - network_name: NetworkName, - attestation_block: u64, - remote: RemoteAttestationsMap, - local: Arc>, - ipfs_hash: &str, -) -> Result { - debug!( - "Comparing attestations:\nlocal: {:#?}\n remote: {:#?}", - local, remote - ); - - let local = local.lock().await; - - let blocks = match local.get(ipfs_hash) { - Some(blocks) => blocks, - None => { - return Ok(ComparisonResult::NotFound(String::from( - "No local attestation found", - ))) - } - }; - - let local_attestation = match blocks.get(&attestation_block) { - Some(attestations) => attestations, - None => { - return Ok(ComparisonResult::NotFound(format!( - "No local attestation found for block {attestation_block}" - ))) - } - }; - - let remote_blocks = match remote.get(ipfs_hash) { - Some(blocks) => blocks, - None => { - return Ok(ComparisonResult::NotFound(format!( - "No remote attestation found for subgraph {ipfs_hash}" - ))) - } - }; - let remote_attestations = match remote_blocks.get(&attestation_block) { - Some(attestations) => attestations, - None => { - return Ok(ComparisonResult::NotFound(format!( - "No remote attestation found for subgraph {ipfs_hash} on block {attestation_block}" - ))) - } - }; - - let mut remote_attestations = remote_attestations.clone(); - remote_attestations.sort_by(|a, b| a.stake_weight.partial_cmp(&b.stake_weight).unwrap()); - - if remote_attestations.len() > 1 { - warn!( - "More than 1 nPOI found for subgraph {} on block {}. Attestations (sorted): {:#?}", - ipfs_hash, attestation_block, remote_attestations - ); - } - - let most_attested_npoi = &remote_attestations.last().unwrap().npoi; - if most_attested_npoi == &local_attestation.npoi { - info!( - "nPOI matched for subgraph {} on block {} with {} of remote attestations", - ipfs_hash, - attestation_block, - remote_attestations.len(), - ); - Ok(ComparisonResult::Match(format!( - "POIs match for subgraph {ipfs_hash} on block {attestation_block}!: {most_attested_npoi}" - ))) - } else { - info!( - "Number of nPOI submitted for block {}: {:#?}\n{}: {:#?}", - attestation_block, remote_attestations, "Local nPOI", &local_attestation.npoi - ); - Ok(ComparisonResult::Divergent(format!( - "❗ POIs don't match for subgraph {ipfs_hash} on network {network_name} at block {attestation_block}!\n\nLocal attestation:\n{local_attestation:#?}\n\nRemote attestations:\n{remote_attestations:#?}" - ))) - } -} - pub fn chainhead_block_str( network_chainhead_blocks: &HashMap, ) -> String { @@ -493,14 +169,12 @@ pub fn chainhead_block_str( #[cfg(test)] mod tests { - use num_traits::One; - use super::*; const NETWORK: NetworkName = NetworkName::Goerli; #[test] - fn test_basic_global_map() { + fn test_add_message() { _ = MESSAGES.set(Arc::new(SyncMutex::new(Vec::new()))); let mut messages = MESSAGES.get().unwrap().lock().unwrap(); @@ -533,33 +207,6 @@ mod tests { ); } - #[test] - fn test_update_blocks() { - let mut blocks: HashMap> = HashMap::new(); - blocks.insert( - 42, - vec![Attestation::new( - "default".to_string(), - BigUint::default(), - Vec::new(), - Vec::new(), - )], - ); - let block_clone = update_blocks( - 42, - &blocks, - "awesome-npoi".to_string(), - BigUint::default(), - "address".to_string(), - 1, - ); - - assert_eq!( - block_clone.get(&42).unwrap().first().unwrap().npoi, - "awesome-npoi".to_string() - ); - } - #[test] fn test_delete_messages() { _ = MESSAGES.set(Arc::new(SyncMutex::new(Vec::new()))); @@ -591,304 +238,4 @@ mod tests { messages.clear(); assert!(messages.is_empty()); } - - #[test] - fn test_attestation_sorting() { - let attestation1 = Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - vec!["i-am-groot1".to_string()], - vec![0], - ); - - let attestation2 = Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - vec!["i-am-groot2".to_string()], - vec![1], - ); - - let attestation3 = Attestation::new( - "awesome-npoi".to_string(), - BigUint::one(), - vec!["i-am-groot3".to_string()], - vec![2], - ); - - let mut attestations = vec![attestation1, attestation2, attestation3]; - - attestations.sort_by(|a, b| a.stake_weight.partial_cmp(&b.stake_weight).unwrap()); - - assert_eq!(attestations.last().unwrap().stake_weight, BigUint::one()); - assert_eq!( - attestations.last().unwrap().senders.first().unwrap(), - &"i-am-groot3".to_string() - ); - assert_eq!(attestations.last().unwrap().timestamp, vec![2]); - } - - #[test] - fn test_attestation_update_success() { - let attestation = Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - vec!["i-am-groot".to_string()], - vec![2], - ); - - let updated_attestation = - Attestation::update(&attestation, "soggip".to_string(), BigUint::one(), 1); - - assert!(updated_attestation.is_ok()); - assert_eq!( - updated_attestation.as_ref().unwrap().stake_weight, - BigUint::one() - ); - assert_eq!(updated_attestation.unwrap().timestamp, [2, 1]); - } - - #[test] - fn test_attestation_update_fail() { - let attestation = Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - vec!["i-am-groot".to_string()], - vec![0], - ); - - let updated_attestation = Attestation::update( - &attestation, - "i-am-groot".to_string(), - BigUint::default(), - 0, - ); - - assert!(updated_attestation.is_err()); - assert_eq!( - updated_attestation.unwrap_err().to_string(), - "There is already an attestation from this address. Skipping...".to_string() - ); - } - - #[tokio::test] - async fn test_compare_attestations_generic_fail() { - let res = compare_attestations( - NetworkName::Goerli, - 42, - HashMap::new(), - Arc::new(AsyncMutex::new(HashMap::new())), - "non-existent-ipfs-hash", - ) - .await; - - assert!(res.is_ok()); - assert_eq!( - res.unwrap().to_string(), - "NotFound: No local attestation found".to_string() - ); - } - - #[tokio::test] - async fn test_compare_attestations_remote_not_found_fail() { - let mut remote_blocks: HashMap> = HashMap::new(); - let mut local_blocks: HashMap = HashMap::new(); - - remote_blocks.insert( - 42, - vec![Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - vec!["i-am-groot".to_string()], - vec![1], - )], - ); - - local_blocks.insert( - 42, - Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - Vec::new(), - vec![0], - ), - ); - - let mut remote_attestations: HashMap>> = - HashMap::new(); - let mut local_attestations: HashMap> = HashMap::new(); - - remote_attestations.insert("my-awesome-hash".to_string(), remote_blocks); - local_attestations.insert("different-awesome-hash".to_string(), local_blocks); - - let res = compare_attestations( - NetworkName::Goerli, - 42, - remote_attestations, - Arc::new(AsyncMutex::new(local_attestations)), - "different-awesome-hash", - ) - .await; - - assert!(res.is_ok()); - assert_eq!( - res.unwrap().to_string(), - "NotFound: No remote attestation found for subgraph different-awesome-hash".to_string() - ); - } - - #[tokio::test] - async fn test_compare_attestations_local_not_found_fail() { - let remote_blocks: HashMap> = HashMap::new(); - let local_blocks: HashMap = HashMap::new(); - - let mut remote_attestations: HashMap>> = - HashMap::new(); - let mut local_attestations: HashMap> = HashMap::new(); - - remote_attestations.insert("my-awesome-hash".to_string(), remote_blocks); - local_attestations.insert("my-awesome-hash".to_string(), local_blocks); - - let res = compare_attestations( - NetworkName::Goerli, - 42, - remote_attestations, - Arc::new(AsyncMutex::new(local_attestations)), - "my-awesome-hash", - ) - .await; - - assert!(res.is_ok()); - assert_eq!( - res.unwrap().to_string(), - "NotFound: No local attestation found for block 42".to_string() - ); - } - - #[tokio::test] - async fn test_compare_attestations_success() { - let mut remote_blocks: HashMap> = HashMap::new(); - let mut local_blocks: HashMap = HashMap::new(); - - remote_blocks.insert( - 42, - vec![Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - vec!["i-am-groot".to_string()], - vec![0], - )], - ); - - local_blocks.insert( - 42, - Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - Vec::new(), - vec![0], - ), - ); - - let mut remote_attestations: HashMap>> = - HashMap::new(); - let mut local_attestations: HashMap> = HashMap::new(); - - remote_attestations.insert("my-awesome-hash".to_string(), remote_blocks); - local_attestations.insert("my-awesome-hash".to_string(), local_blocks); - - let res = compare_attestations( - NetworkName::Goerli, - 42, - remote_attestations, - Arc::new(AsyncMutex::new(local_attestations)), - "my-awesome-hash", - ) - .await; - - assert!(res.is_ok()); - assert_eq!( - res.unwrap(), - ComparisonResult::Match( - "POIs match for subgraph my-awesome-hash on block 42!: awesome-npoi".to_string() - ) - ); - } - - #[test] - fn clear_local_attestation_success() { - let mut local_blocks: HashMap = HashMap::new(); - let attestation1 = Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - vec!["i-am-groot1".to_string()], - vec![0], - ); - - let attestation2 = Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - vec!["i-am-groot2".to_string()], - vec![1], - ); - - let attestation3 = Attestation::new( - "awesome-npoi".to_string(), - BigUint::one(), - vec!["i-am-groot3".to_string()], - vec![2], - ); - - local_blocks.insert(42, attestation1); - local_blocks.insert(43, attestation2); - local_blocks.insert(44, attestation3); - - let mut local_attestations: HashMap> = HashMap::new(); - local_attestations.insert("hash".to_string(), local_blocks.clone()); - local_attestations.insert("hash2".to_string(), local_blocks); - - clear_local_attestation(&mut local_attestations, "hash".to_string(), 43); - - assert_eq!(local_attestations.get("hash").unwrap().len(), 2); - assert!(local_attestations.get("hash").unwrap().get(&43).is_none()); - assert_eq!(local_attestations.get("hash2").unwrap().len(), 3); - } - - #[tokio::test] - async fn local_attestation_pointer_success() { - let mut local_blocks: HashMap = HashMap::new(); - let attestation1 = Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - vec!["i-am-groot1".to_string()], - vec![2], - ); - - let attestation2 = Attestation::new( - "awesome-npoi".to_string(), - BigUint::default(), - vec!["i-am-groot2".to_string()], - vec![4], - ); - - let attestation3 = Attestation::new( - "awesome-npoi".to_string(), - BigUint::one(), - vec!["i-am-groot3".to_string()], - vec![6], - ); - - local_blocks.insert(42, attestation1); - local_blocks.insert(43, attestation2); - local_blocks.insert(44, attestation3); - - let mut local_attestations: HashMap> = HashMap::new(); - local_attestations.insert("hash".to_string(), local_blocks.clone()); - local_attestations.insert("hash2".to_string(), local_blocks); - let local = Arc::new(AsyncMutex::new(local_attestations)); - let (block_num, collect_window_end) = - local_comparison_point(Arc::clone(&local), "hash".to_string(), 120).await; - - assert_eq!(block_num, 42); - assert_eq!(collect_window_end, 122); - } } diff --git a/src/main.rs b/src/main.rs index 45fc9af..35fca5e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,15 @@ use chrono::Utc; use dotenv::dotenv; -use graphcast_sdk::bots::{DiscordBot, SlackBot}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex as SyncMutex}; +use std::{thread::sleep, time::Duration}; +use tokio::sync::Mutex as AsyncMutex; +use tracing::log::warn; +use tracing::{debug, error, info}; + /// Radio specific query function to fetch Proof of Indexing for each allocated subgraph +use graphcast_sdk::bots::{DiscordBot, SlackBot}; use graphcast_sdk::config::Config; use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage; use graphcast_sdk::graphcast_agent::GraphcastAgent; @@ -11,18 +18,14 @@ use graphcast_sdk::graphql::client_network::query_network_subgraph; use graphcast_sdk::graphql::client_registry::query_registry_indexer; use graphcast_sdk::networks::NetworkName; use graphcast_sdk::{build_wallet, determine_message_block, graphcast_id_address, BlockPointer}; - use poi_radio::{ - attestation_handler, chainhead_block_str, clear_local_attestation, compare_attestations, - generate_topics, local_comparison_point, process_messages, save_local_attestation, Attestation, - ComparisonResult, LocalAttestationsMap, RadioPayloadMessage, GRAPHCAST_AGENT, MESSAGES, + attestation::{ + clear_local_attestation, compare_attestations, local_comparison_point, process_messages, + save_local_attestation, Attestation, ComparisonResult, LocalAttestationsMap, + }, + chainhead_block_str, generate_topics, radio_msg_handler, RadioPayloadMessage, GRAPHCAST_AGENT, + MESSAGES, }; -use std::collections::HashMap; -use std::sync::{Arc, Mutex as SyncMutex}; -use std::{thread::sleep, time::Duration}; -use tokio::sync::Mutex as AsyncMutex; -use tracing::log::warn; -use tracing::{debug, error, info}; use crate::graphql::query_graph_node_poi; @@ -92,7 +95,7 @@ async fn main() { GRAPHCAST_AGENT .get() .unwrap() - .register_handler(Arc::new(AsyncMutex::new(attestation_handler()))) + .register_handler(Arc::new(AsyncMutex::new(radio_msg_handler()))) .expect("Could not register handler"); let mut network_chainhead_blocks: HashMap = HashMap::new(); @@ -219,12 +222,12 @@ async fn main() { match poi_query(block_hash.clone(), message_block.try_into().unwrap()).await { Ok(content) => { - let attestation = Attestation { - npoi: content.clone(), - stake_weight: my_stake.clone(), - senders: vec![my_address.clone()], - timestamp: vec![time], - }; + let attestation = Attestation::new( + content.clone(), + my_stake.clone(), + vec![my_address.clone()], + vec![time], + ); save_local_attestation( &mut *local_attestations.lock().await,