Skip to content
This repository has been archived by the owner on Jul 20, 2023. It is now read-only.

Commit

Permalink
fix: more checks for message uniqueness
Browse files Browse the repository at this point in the history
  • Loading branch information
pete-eiger committed Mar 28, 2023
1 parent 2868d2e commit c2c1988
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 65 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "poi-radio"
version = "0.0.12"
version = "0.0.13"
edition = "2021"
authors = ["GraphOps (axiomatic-aardvark, hopeyen)"]
description="POI Radio monitors subgraph data integrity in real time using Graphcast SDK"
Expand All @@ -10,7 +10,7 @@ keywords=["graphprotocol", "data-integrity", "Indexer", "waku", "p2p"]
categories=["network-programming", "web-programming::http-client"]

[dependencies]
graphcast-sdk = "0.0.15"
graphcast-sdk = "0.0.16"
prost = "0.11"
once_cell = "1.15"
chrono = "0.4"
Expand Down
35 changes: 17 additions & 18 deletions src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ pub async fn process_messages(
network_subgraph: &str,
) -> Result<RemoteAttestationsMap, AttestationError> {
let mut remote_attestations: RemoteAttestationsMap = HashMap::new();
let messages = AsyncMutex::new(messages.lock().await);

for msg in messages.lock().await.iter() {
let radio_msg = &msg.payload.clone().unwrap();
Expand All @@ -114,24 +113,24 @@ pub async fn process_messages(
.iter_mut()
.find(|a| a.npoi == radio_msg.payload_content());

match existing_attestation {
Some(existing_attestation) => {
Attestation::update(
existing_attestation,
indexer_address,
sender_stake,
msg.nonce,
)?;
}
None => {
// Unwrap is okay because bytes (Vec<u8>) is a valid utf-8 sequence
attestations.push(Attestation::new(
radio_msg.payload_content().to_string(),
sender_stake,
vec![indexer_address],
vec![msg.nonce],
));
if let Some(existing_attestation) = existing_attestation {
if let Ok(updated_attestation) = Attestation::update(
existing_attestation,
indexer_address,
sender_stake,
msg.nonce,
) {
// Replace the existing_attestation with the updated_attestation
*existing_attestation = updated_attestation;
}
} else {
// Unwrap is okay because bytes (Vec<u8>) is a valid utf-8 sequence
attestations.push(Attestation::new(
radio_msg.payload_content().to_string(),
sender_stake,
vec![indexer_address],
vec![msg.nonce],
));
}
}
Ok(remote_attestations)
Expand Down
103 changes: 58 additions & 45 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::{Arc, Mutex as SyncMutex};
use std::{thread::sleep, time::Duration};
use tokio::sync::Mutex as AsyncMutex;
use tracing::log::warn;
use tracing::{debug, error, info};
use tracing::{debug, error, info, trace};

/// Radio specific query function to fetch Proof of Indexing for each allocated subgraph
use graphcast_sdk::bots::{DiscordBot, SlackBot};
Expand Down Expand Up @@ -207,56 +207,69 @@ async fn main() {
latest_block.number
);
if latest_block.number >= message_block {
let block_hash = match GRAPHCAST_AGENT
.get()
.unwrap()
.get_block_hash(network_name.to_string(), message_block)
if local_attestations
.lock()
.await
.get(&id)
.and_then(|blocks| blocks.get(&message_block))
.is_none()
{
Ok(hash) => hash,
Err(e) => {
error!("Failed to query graph node for the block hash: {e}");
continue;
}
};

match poi_query(block_hash.clone(), message_block.try_into().unwrap()).await {
Ok(content) => {
let attestation = Attestation::new(
content.clone(),
my_stake.clone(),
vec![my_address.clone()],
vec![time],
);

save_local_attestation(
&mut *local_attestations.lock().await,
attestation,
id.clone(),
message_block,
);

let radio_message = RadioPayloadMessage::new(id.clone(), content.clone());
match GRAPHCAST_AGENT
.get()
.unwrap()
.send_message(
id.clone(),
network_name,
message_block,
Some(radio_message),
)
.await
{
Ok(id) => messages_sent.push(id),
Err(e) => error!("{}: {}", "Failed to send message", e),
};
let block_hash = match GRAPHCAST_AGENT
.get()
.unwrap()
.get_block_hash(network_name.to_string(), message_block)
.await
{
Ok(hash) => hash,
Err(e) => {
error!("Failed to query graph node for the block hash: {e}");
continue;
}
};

match poi_query(block_hash.clone(), message_block.try_into().unwrap()).await {
Ok(content) => {
let radio_message =
RadioPayloadMessage::new(id.clone(), content.clone());
match GRAPHCAST_AGENT
.get()
.unwrap()
.send_message(
id.clone(),
network_name,
message_block,
Some(radio_message),
)
.await
{
Ok(id) => {
messages_sent.push(id.clone());

let attestation = Attestation::new(
content.clone(),
my_stake.clone(),
vec![my_address.clone()],
vec![time],
);

save_local_attestation(
&mut *local_attestations.lock().await,
attestation,
id.clone(),
message_block,
);
}
Err(e) => error!("{}: {}", "Failed to send message", e),
};
}
Err(e) => error!("{}: {}", "Failed to query message content", e),
}
Err(e) => error!("{}: {}", "Failed to query message content", e),
} else {
trace!("Skipping sending message for block: {}", message_block);
}
}

if time >= collect_window_end {
if time >= collect_window_end && message_block > compare_block {
let msgs = MESSAGES.get().unwrap().lock().unwrap().to_vec();
// Update to only process the identifier&compare_block related messages within the collection window
let msgs: Vec<GraphcastMessage<RadioPayloadMessage>> = msgs
Expand Down

0 comments on commit c2c1988

Please sign in to comment.