Skip to content

Commit

Permalink
feat: recon event insert batching (#470)
Browse files Browse the repository at this point in the history
* refactor: remove recon store insert

rely on insert_many instead

* refactor: remove "new key" boolean from recon insert api

The value was being ignored already, and this gives us more freedom to add a queue for event validation and storage as recon doesn't need to care about the result of the request.

* refactor: remove recon item lifetime

* feat: support insert batching in recon

* chore: minor clean up and demonstrate linear download messages are identical w/wo batching

added a log message to prove to myself that batching was actually happening

* chore: fix typo in comment

Co-authored-by: Spencer T Brody <spencer+github@3box.io>

---------

Co-authored-by: Spencer T Brody <spencer+github@3box.io>
  • Loading branch information
dav1do and stbrody authored Aug 13, 2024
1 parent 68ab303 commit 509237f
Show file tree
Hide file tree
Showing 22 changed files with 420 additions and 333 deletions.
3 changes: 2 additions & 1 deletion api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ mod server;
pub use resume_token::ResumeToken;

pub use server::{
EventDataResult, EventInsertResult, EventStore, IncludeEventData, InterestStore, Server,
ApiItem, EventDataResult, EventInsertResult, EventStore, IncludeEventData, InterestStore,
Server,
};

#[cfg(test)]
Expand Down
26 changes: 22 additions & 4 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,29 @@ impl TryFrom<String> for IncludeEventData {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ApiItem {
/// The recon event ID for the payload
pub key: EventId,
/// The event payload as a carfile
pub value: Arc<Vec<u8>>,
}

impl ApiItem {
/// Create a new item from an ID and carfile
pub fn new(key: EventId, value: Vec<u8>) -> Self {
Self {
key,
value: Arc::new(value),
}
}
}

/// Trait for accessing persistent storage of Events
#[async_trait]
pub trait EventStore: Send + Sync {
/// Returns (new_key, new_value) where true if was newly inserted, false if it already existed.
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<EventInsertResult>>;
async fn insert_many(&self, items: Vec<ApiItem>) -> Result<Vec<EventInsertResult>>;
async fn range_with_values(
&self,
range: Range<EventId>,
Expand Down Expand Up @@ -262,7 +280,7 @@ pub trait EventStore: Send + Sync {

#[async_trait::async_trait]
impl<S: EventStore> EventStore for Arc<S> {
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<EventInsertResult>> {
async fn insert_many(&self, items: Vec<ApiItem>) -> Result<Vec<EventInsertResult>> {
self.as_ref().insert_many(items).await
}

Expand Down Expand Up @@ -400,10 +418,10 @@ where
let mut items = Vec::with_capacity(events.len());
events.drain(..).for_each(|req: EventInsert| {
oneshots.insert(req.id.to_bytes(), req.tx);
items.push((req.id, req.data));
items.push(ApiItem::new(req.id, req.data));
});
tracing::trace!("calling insert many with {} items.", items.len());
match event_store.insert_many(&items).await {
match event_store.insert_many(items).await {
Ok(results) => {
tracing::debug!("insert many returned {} results.", results.len());
for result in results {
Expand Down
21 changes: 13 additions & 8 deletions api/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
use std::{ops::Range, str::FromStr, sync::Arc};

use crate::server::{decode_multibase_data, BuildResponse, Server};
use crate::{EventDataResult, EventInsertResult, EventStore, IncludeEventData, InterestStore};
use crate::{
ApiItem, EventDataResult, EventInsertResult, EventStore, IncludeEventData, InterestStore,
};

use anyhow::Result;
use async_trait::async_trait;
Expand Down Expand Up @@ -109,7 +111,7 @@ mock! {
pub EventStoreTest {}
#[async_trait]
impl EventStore for EventStoreTest {
async fn insert_many(&self, items: &[(EventId, Vec<u8>)]) -> Result<Vec<EventInsertResult>>;
async fn insert_many(&self, items: Vec<ApiItem>) -> Result<Vec<EventInsertResult>>;
async fn range_with_values(
&self,
range: Range<EventId>,
Expand Down Expand Up @@ -178,7 +180,7 @@ async fn create_event() {
let mock_interest = MockAccessInterestStoreTest::new();
let mut mock_event_store = MockEventStoreTest::new();
mock_get_init_event(&mut mock_event_store);
let args = vec![(
let args = vec![ApiItem::new(
expected_event_id,
decode_multibase_data(&event_data).unwrap(),
)];
Expand All @@ -189,8 +191,8 @@ async fn create_event() {
.times(1)
.returning(|input| {
Ok(input
.iter()
.map(|(id, _)| EventInsertResult::new_ok(id.clone()))
.into_iter()
.map(|v| EventInsertResult::new_ok(v.key.clone()))
.collect())
});
let server = Server::new(peer_id, network, mock_interest, Arc::new(mock_event_store));
Expand Down Expand Up @@ -219,7 +221,7 @@ async fn create_event_fails() {
let mock_interest = MockAccessInterestStoreTest::new();
let mut mock_event_store = MockEventStoreTest::new();
mock_get_init_event(&mut mock_event_store);
let args = vec![(
let args = vec![ApiItem::new(
expected_event_id.clone(),
decode_multibase_data(&event_data).unwrap(),
)];
Expand All @@ -231,8 +233,11 @@ async fn create_event_fails() {
.returning(|input| {
Ok(input
.iter()
.map(|(id, _)| {
EventInsertResult::new_failed(id.clone(), "Event is missing prev".to_string())
.map(|i| {
EventInsertResult::new_failed(
i.key.clone(),
"Event is missing prev".to_string(),
)
})
.collect())
});
Expand Down
4 changes: 2 additions & 2 deletions p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1174,7 +1174,7 @@ mod tests {
use futures::TryStreamExt;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use recon::{RangeHash, Result as ReconResult, Sha256a, SyncState};
use recon::{RangeHash, ReconItem, Result as ReconResult, Sha256a, SyncState};
use ssh_key::private::Ed25519Keypair;

use libp2p::{identity::Keypair as Libp2pKeypair, kad::RecordKey};
Expand Down Expand Up @@ -1256,7 +1256,7 @@ mod tests {
type Key = K;
type Hash = Sha256a;

async fn insert(&self, _key: Self::Key, _value: Vec<u8>) -> ReconResult<()> {
async fn insert(&self, _items: Vec<ReconItem<Self::Key>>) -> ReconResult<()> {
unreachable!()
}

Expand Down
29 changes: 14 additions & 15 deletions recon/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use crate::{

/// Client to a [`Recon`] [`Server`].
#[derive(Debug, Clone)]
pub struct Client<K, H> {
pub struct Client<K, H>
where
K: Key,
{
sender: Sender<Request<K, H>>,
metrics: Metrics,
}
Expand All @@ -23,11 +26,9 @@ where
H: AssociativeHash,
{
/// Sends an insert request to the server and awaits the response.
pub async fn insert(&self, key: K, value: Vec<u8>) -> Result<bool> {
pub async fn insert(&self, items: Vec<ReconItem<K>>) -> Result<()> {
let (ret, rx) = oneshot::channel();
self.sender
.send(Request::Insert { key, value, ret })
.await?;
self.sender.send(Request::Insert { items, ret }).await?;
rx.await?
}

Expand Down Expand Up @@ -138,11 +139,13 @@ where
}
}

enum Request<K, H> {
enum Request<K, H>
where
K: Key,
{
Insert {
key: K,
value: Vec<u8>,
ret: oneshot::Sender<Result<bool>>,
items: Vec<ReconItem<K>>,
ret: oneshot::Sender<Result<()>>,
},
Len {
ret: oneshot::Sender<Result<usize>>,
Expand Down Expand Up @@ -239,12 +242,8 @@ where
let request = self.requests.recv().await;
if let Some(request) = request {
match request {
Request::Insert { key, value, ret } => {
let val = self
.recon
.insert(&ReconItem::new(&key, &value))
.await
.map_err(Error::from);
Request::Insert { items, ret } => {
let val = self.recon.insert(items).await.map_err(Error::from);
send(ret, val);
}
Request::Len { ret } => {
Expand Down
11 changes: 8 additions & 3 deletions recon/src/libp2p/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ use crate::{
protocol::{self, Recon},
};

// Intiate Recon synchronization with a peer over a stream.
// The max number of writes we'll batch up before flushing anything to disk.
// As we descend the tree and find smaller ranges, this won't apply as we have to flush
// before recomputing a range, but it will be used when we're processing large ranges we don't yet have.
const INSERT_BATCH_SIZE: usize = 100;

// Initiate Recon synchronization with a peer over a stream.
#[tracing::instrument(skip(recon, stream, ), ret(level = Level::DEBUG))]
pub async fn initiate_synchronize<S, R>(
remote_peer_id: PeerId, // included for context only
Expand All @@ -25,7 +30,7 @@ where
{
let codec = CborCodec::new();
let stream = Framed::new(stream, codec);
protocol::initiate_synchronize(recon, stream).await?;
protocol::initiate_synchronize(recon, stream, INSERT_BATCH_SIZE).await?;
Ok(stream_set)
}
// Intiate Recon synchronization with a peer over a stream.
Expand All @@ -43,6 +48,6 @@ where
{
let codec = CborCodec::new();
let stream = Framed::new(stream, codec);
protocol::respond_synchronize(recon, stream).await?;
protocol::respond_synchronize(recon, stream, INSERT_BATCH_SIZE).await?;
Ok(stream_set)
}
8 changes: 1 addition & 7 deletions recon/src/libp2p/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,7 @@ where
type Key = K;
type Hash = H;

async fn insert<'a>(&self, item: &ReconItem<'a, Self::Key>) -> ReconResult<bool> {
self.as_error()?;

self.inner.insert(item).await
}

async fn insert_many<'a>(&self, items: &[ReconItem<'a, K>]) -> ReconResult<InsertResult> {
async fn insert_many(&self, items: &[ReconItem<K>]) -> ReconResult<InsertResult> {
self.as_error()?;

self.inner.insert_many(items).await
Expand Down
Loading

0 comments on commit 509237f

Please sign in to comment.