Skip to content

Commit

Permalink
feat: modify recon store to support event validation concepts (#473)
Browse files Browse the repository at this point in the history
* feat: use struct to inject recon protocol behavior

* feat: modify recon and store traits to return a struct to return valid/invalid item context

* feat: track and resubmit events from recon when history discovered

we also explicitly hangup if the store says something is invalid

* fix: don't derive Hash on types containing recon keys

this probalby doesn't hurt to include, but we're particular about using our AHash so we'll leave it explicit for now

* chore: review comments

- rename InsertBatch -> InsertResult
- use for loop in pending_cache instead of while loop
- consistent Self::Key usage in trait

* fix: track pending as vec (will delete this shortly)

* feat: store returns information about event validation

- recon pending tracking simplified to just retry every batch since we should get items close together
- fixes a bug where any event in an api batch that was invalid could have crashed the entire batch (now only that one gets an error)
- TODO: still need to modify the service to have a better validation hook that keeps track of the data we need to return so we don't have to iterate back through or lose info along the way

* feat: simplify pending event tracking and add initial event validation phases

* chore: clean up and review comments

* feat: add labels to invalid recon item metric
  • Loading branch information
dav1do authored Aug 15, 2024
1 parent 509237f commit a84018e
Show file tree
Hide file tree
Showing 19 changed files with 407 additions and 184 deletions.
7 changes: 5 additions & 2 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ mod tests {
use futures::TryStreamExt;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use recon::{RangeHash, ReconItem, Result as ReconResult, Sha256a, SyncState};
use recon::{InsertResult, RangeHash, ReconItem, Result as ReconResult, Sha256a, SyncState};
use ssh_key::private::Ed25519Keypair;

use libp2p::{identity::Keypair as Libp2pKeypair, kad::RecordKey};
Expand Down Expand Up @@ -1256,7 +1256,10 @@ mod tests {
type Key = K;
type Hash = Sha256a;

async fn insert(&self, _items: Vec<ReconItem<Self::Key>>) -> ReconResult<()> {
async fn insert(
&self,
_items: Vec<ReconItem<Self::Key>>,
) -> ReconResult<InsertResult<Self::Key>> {
unreachable!()
}

Expand Down
6 changes: 3 additions & 3 deletions recon/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tracing::warn;

use crate::{
recon::{RangeHash, ReconItem, SyncState},
AssociativeHash, Error, InterestProvider, Key, Metrics, Recon, Result, Store,
AssociativeHash, Error, InsertResult, InterestProvider, Key, Metrics, Recon, Result, Store,
};

/// Client to a [`Recon`] [`Server`].
Expand All @@ -26,7 +26,7 @@ where
H: AssociativeHash,
{
/// Sends an insert request to the server and awaits the response.
pub async fn insert(&self, items: Vec<ReconItem<K>>) -> Result<()> {
pub async fn insert(&self, items: Vec<ReconItem<K>>) -> Result<InsertResult<K>> {
let (ret, rx) = oneshot::channel();
self.sender.send(Request::Insert { items, ret }).await?;
rx.await?
Expand Down Expand Up @@ -145,7 +145,7 @@ where
{
Insert {
items: Vec<ReconItem<K>>,
ret: oneshot::Sender<Result<()>>,
ret: oneshot::Sender<Result<InsertResult<K>>>,
},
Len {
ret: oneshot::Sender<Result<usize>>,
Expand Down
2 changes: 1 addition & 1 deletion recon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use crate::{
metrics::Metrics,
recon::{
btreestore::BTreeStore, AssociativeHash, EventIdStore, FullInterests, HashCount,
InsertResult, InterestProvider, InterestStore, Key, RangeHash, Recon,
InsertResult, InterestProvider, InterestStore, InvalidItem, Key, RangeHash, Recon,
ReconInterestProvider, ReconItem, Split, Store, SyncState,
},
sha256a::Sha256a,
Expand Down
18 changes: 8 additions & 10 deletions recon/src/libp2p/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,16 @@ use libp2p::swarm::ConnectionId;
use libp2p_identity::PeerId;
use tracing::Level;

use crate::protocol::ProtocolConfig;
use crate::{
libp2p::stream_set::StreamSet,
protocol::{self, Recon},
};

// The max number of writes we'll batch up before flushing anything to disk.
// As we descend the tree and find smaller ranges, this won't apply as we have to flush
// before recomputing a range, but it will be used when we're processing large ranges we don't yet have.
const INSERT_BATCH_SIZE: usize = 100;

// Initiate Recon synchronization with a peer over a stream.
// Intiate Recon synchronization with a peer over a stream.
#[tracing::instrument(skip(recon, stream, ), ret(level = Level::DEBUG))]
pub async fn initiate_synchronize<S, R>(
remote_peer_id: PeerId, // included for context only
remote_peer_id: PeerId,
connection_id: ConnectionId, // included for context only
stream_set: StreamSet,
recon: R,
Expand All @@ -30,13 +26,14 @@ where
{
let codec = CborCodec::new();
let stream = Framed::new(stream, codec);
protocol::initiate_synchronize(recon, stream, INSERT_BATCH_SIZE).await?;
protocol::initiate_synchronize(recon, stream, ProtocolConfig::new_peer_id(remote_peer_id))
.await?;
Ok(stream_set)
}
// Intiate Recon synchronization with a peer over a stream.
#[tracing::instrument(skip(recon, stream, ), ret(level = Level::DEBUG))]
pub async fn respond_synchronize<S, R>(
remote_peer_id: PeerId, // included for context only
remote_peer_id: PeerId,
connection_id: ConnectionId, // included for context only
stream_set: StreamSet,
recon: R,
Expand All @@ -48,6 +45,7 @@ where
{
let codec = CborCodec::new();
let stream = Framed::new(stream, codec);
protocol::respond_synchronize(recon, stream, INSERT_BATCH_SIZE).await?;
protocol::respond_synchronize(recon, stream, ProtocolConfig::new_peer_id(remote_peer_id))
.await?;
Ok(stream_set)
}
2 changes: 1 addition & 1 deletion recon/src/libp2p/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ where
type Key = K;
type Hash = H;

async fn insert_many(&self, items: &[ReconItem<K>]) -> ReconResult<InsertResult> {
async fn insert_many(&self, items: &[ReconItem<K>]) -> ReconResult<InsertResult<Self::Key>> {
self.as_error()?;

self.inner.insert_many(items).await
Expand Down
53 changes: 52 additions & 1 deletion recon/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use prometheus_client::{

use crate::{
protocol::{InitiatorMessage, ResponderMessage},
AssociativeHash, Key,
AssociativeHash, InvalidItem, Key,
};

/// Metrics for Recon P2P events
Expand All @@ -24,6 +24,9 @@ pub struct Metrics {

protocol_write_loop_count: Counter,
protocol_run_duration: Histogram,

protocol_pending_items: Counter,
protocol_invalid_items: Family<InvalidItemLabels, Counter>,
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
Expand Down Expand Up @@ -97,11 +100,27 @@ impl Metrics {
sub_registry
);

register!(
protocol_pending_items,
"Number of items received that depend on undiscovered events",
Counter::default(),
sub_registry
);

register!(
protocol_invalid_items,
"Number of items received that were considered invalid",
Family::<InvalidItemLabels, Counter>::default(),
sub_registry
);

Self {
protocol_message_received_count,
protocol_message_sent_count,
protocol_write_loop_count,
protocol_run_duration,
protocol_pending_items,
protocol_invalid_items,
}
}
}
Expand Down Expand Up @@ -145,3 +164,35 @@ impl Recorder<ProtocolRun> for Metrics {
self.protocol_run_duration.observe(event.0.as_secs_f64());
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub(crate) struct InvalidItemLabels {
pub(crate) reason: &'static str,
}

impl<K: Key> Recorder<InvalidItem<K>> for Metrics {
fn record(&self, event: &InvalidItem<K>) {
let labels = event.into();
self.protocol_invalid_items.get_or_create(&labels).inc();
}
}

impl<K: Key> From<&InvalidItem<K>> for InvalidItemLabels {
fn from(value: &InvalidItem<K>) -> Self {
match value {
InvalidItem::InvalidFormat { .. } => InvalidItemLabels {
reason: "InvalidFormat",
},
InvalidItem::InvalidSignature { .. } => InvalidItemLabels {
reason: "InvalidSignature",
},
}
}
}

pub(crate) struct PendingEvents(pub u64);
impl Recorder<PendingEvents> for Metrics {
fn record(&self, event: &PendingEvents) {
self.protocol_pending_items.inc_by(event.0);
}
}
Loading

0 comments on commit a84018e

Please sign in to comment.