Skip to content

Commit

Permalink
chore: Change EventInsertable to store the parsed Event object instea…
Browse files Browse the repository at this point in the history
…d of its raw blocks (#487)
  • Loading branch information
stbrody authored Aug 19, 2024
1 parent 551d339 commit b87732c
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 164 deletions.
105 changes: 65 additions & 40 deletions event/src/unvalidated/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,46 @@ use std::{collections::HashMap, fmt::Debug};
use tokio::io::AsyncRead;
use tracing::debug;

use super::{cid_from_dag_cbor, init, signed};
use super::{cid_from_dag_cbor, init, signed, Payload};

/// Helper function for Event::decode_car for gathering all the Ipld blocks used by a time event
/// witness proof.
fn get_time_event_witness_blocks(
event: &RawTimeEvent,
proof: &Proof,
car_blocks: HashMap<Cid, Vec<u8>>,
) -> anyhow::Result<Vec<Ipld>> {
let mut blocks_in_path = Vec::new();
if event.prev == proof.root && event.path.is_empty() {
return Ok(blocks_in_path);
}

let block_bytes = car_blocks
.get(&proof.root())
.ok_or_else(|| anyhow!("Time Event CAR data missing block for root",))?;
blocks_in_path.push(serde_ipld_dagcbor::from_slice(block_bytes)?);
let parts: Vec<_> = event.path().split('/').collect();
// Add blocks for all parts but the last as it is the prev.
for index in parts.iter().take(parts.len() - 1) {
// unwrap() is safe because .last() only returns None if the Vec is empty, and we know
// it has at least one element because we pushed a block before entering the loop.
let cid = blocks_in_path
.last()
.unwrap()
.get(*index)?
.ok_or_else(|| anyhow!("Time Event path indexes missing data"))?;
let cid = match cid {
Ipld::Link(cid) => cid,
_ => bail!("Time Event path does not index to a CID"),
};
let block_bytes = car_blocks
.get(cid)
.ok_or_else(|| anyhow!("Time Event CAR data missing block for path index"))?;
blocks_in_path.push(serde_ipld_dagcbor::from_slice(block_bytes)?);
}

Ok(blocks_in_path)
}

/// Materialized Ceramic Event where internal structure is accessible.
#[derive(Debug)]
Expand All @@ -28,50 +67,37 @@ impl<D> Event<D>
where
D: serde::Serialize + for<'de> serde::Deserialize<'de>,
{
/// Encode the event into a CAR bytes containing all blocks of the event.
pub async fn encode_car(&self) -> anyhow::Result<Vec<u8>> {
/// Returns true if this Event is an init event, and false otherwise
pub fn is_init(&self) -> bool {
match self {
Event::Time(event) => event.encode_car().await,
Event::Signed(event) => event.encode_car().await,
Event::Unsigned(event) => event.encode_car().await,
Event::Time(_) => false,
Event::Signed(event) => match event.payload() {
Payload::Data(_) => false,
Payload::Init(_) => true,
},
Event::Unsigned(_) => true,
}
}

fn get_time_event_witness_blocks(
event: &RawTimeEvent,
proof: &Proof,
car_blocks: HashMap<Cid, Vec<u8>>,
) -> anyhow::Result<Vec<Ipld>> {
let mut blocks_in_path = Vec::new();
if event.prev == proof.root && event.path.is_empty() {
return Ok(blocks_in_path);
/// Returns the prev CID (or None if the event is an init event)
pub fn prev(&self) -> Option<Cid> {
match self {
Event::Time(t) => Some(t.prev()),
Event::Signed(event) => match event.payload() {
Payload::Data(d) => Some(*d.prev()),
Payload::Init(_) => None,
},
Event::Unsigned(_) => None,
}
}

let block_bytes = car_blocks
.get(&proof.root())
.ok_or_else(|| anyhow!("Time Event CAR data missing block for root",))?;
blocks_in_path.push(serde_ipld_dagcbor::from_slice(block_bytes)?);
let parts: Vec<_> = event.path().split('/').collect();
// Add blocks for all parts but the last as it is the prev.
for index in parts.iter().take(parts.len() - 1) {
// unwrap() is safe because .last() only returns None if the Vec is empty, and we know
// it has at least one element because we pushed a block before entering the loop.
let cid = blocks_in_path
.last()
.unwrap()
.get(*index)?
.ok_or_else(|| anyhow!("Time Event path indexes missing data"))?;
let cid = match cid {
Ipld::Link(cid) => cid,
_ => bail!("Time Event path does not index to a CID"),
};
let block_bytes = car_blocks
.get(cid)
.ok_or_else(|| anyhow!("Time Event CAR data missing block for path index"))?;
blocks_in_path.push(serde_ipld_dagcbor::from_slice(block_bytes)?);
/// Encode the event into a CAR bytes containing all blocks of the event.
pub async fn encode_car(&self) -> anyhow::Result<Vec<u8>> {
match self {
Event::Time(event) => event.encode_car().await,
Event::Signed(event) => event.encode_car().await,
Event::Unsigned(event) => event.encode_car().await,
}

Ok(blocks_in_path)
}

/// Decode bytes into a materialized event.
Expand Down Expand Up @@ -119,8 +145,7 @@ where
.ok_or_else(|| anyhow!("Time Event CAR data missing block for proof"))?;
let proof: Proof =
serde_ipld_dagcbor::from_slice(proof_bytes).context("decoding proof")?;
let blocks_in_path =
Self::get_time_event_witness_blocks(&event, &proof, car_blocks)?;
let blocks_in_path = get_time_event_witness_blocks(&event, &proof, car_blocks)?;
let blocks_in_path = blocks_in_path
.into_iter()
.map(|block| match block {
Expand Down
63 changes: 29 additions & 34 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@ use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool};

use crate::Result;

use super::service::EventMetadata;

pub(crate) struct OrderEvents {
deliverable: Vec<(EventInsertable, EventMetadata)>,
missing_history: Vec<(EventInsertable, EventMetadata)>,
deliverable: Vec<EventInsertable>,
missing_history: Vec<EventInsertable>,
}

impl OrderEvents {
pub fn deliverable(&self) -> &[(EventInsertable, EventMetadata)] {
pub fn deliverable(&self) -> &[EventInsertable] {
&self.deliverable
}

pub fn missing_history(&self) -> &[(EventInsertable, EventMetadata)] {
pub fn missing_history(&self) -> &[EventInsertable] {
&self.missing_history
}
}
Expand All @@ -36,24 +34,24 @@ impl OrderEvents {
/// *could* mark B deliverable and then C and D, but we DO NOT want to do this here to prevent API users from writing events that they haven't seen.
pub async fn try_new(
pool: &SqlitePool,
mut candidate_events: Vec<(EventInsertable, EventMetadata)>,
mut candidate_events: Vec<EventInsertable>,
) -> Result<Self> {
let mut new_cids: HashMap<Cid, bool> =
HashMap::from_iter(candidate_events.iter_mut().map(|(e, meta)| {
HashMap::from_iter(candidate_events.iter_mut().map(|e| {
// all init events are deliverable so we mark them as such before we do anything else
if matches!(meta, EventMetadata::Init { .. }) {
if e.event().is_init() {
e.set_deliverable(true);
}
(e.cid(), e.deliverable())
}));
let mut deliverable = Vec::with_capacity(candidate_events.len());
let mut remaining_candidates = Vec::with_capacity(candidate_events.len());

for (e, h) in candidate_events {
for e in candidate_events {
if e.deliverable() {
deliverable.push((e, h))
deliverable.push(e)
} else {
remaining_candidates.push((e, h))
remaining_candidates.push(e)
}
}

Expand All @@ -67,8 +65,8 @@ impl OrderEvents {
let mut undelivered_prevs_in_memory = VecDeque::with_capacity(remaining_candidates.len());
let mut missing_history = Vec::with_capacity(remaining_candidates.len());

while let Some((mut event, header)) = remaining_candidates.pop() {
match header.prev() {
while let Some(mut event) = remaining_candidates.pop() {
match event.event().prev() {
None => {
unreachable!("Init events should have been filtered out since they're always deliverable");
}
Expand All @@ -77,19 +75,19 @@ impl OrderEvents {
if *in_mem_is_deliverable {
event.set_deliverable(true);
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
deliverable.push((event, header));
deliverable.push(event);
} else {
undelivered_prevs_in_memory.push_back((event, header));
undelivered_prevs_in_memory.push_back(event);
}
} else {
let (_exists, prev_deliverable) =
CeramicOneEvent::deliverable_by_cid(pool, &prev).await?;
if prev_deliverable {
event.set_deliverable(true);
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
deliverable.push((event, header));
deliverable.push(event);
} else {
missing_history.push((event, header));
missing_history.push(event);
}
}
}
Expand All @@ -103,21 +101,21 @@ impl OrderEvents {
// We can't quite get rid of this loop because we may have discovered our prev's prev from the database in the previous pass.
let max_iterations = undelivered_prevs_in_memory.len();
let mut iteration = 0;
while let Some((mut event, header)) = undelivered_prevs_in_memory.pop_front() {
while let Some(mut event) = undelivered_prevs_in_memory.pop_front() {
iteration += 1;
match header.prev() {
match event.event().prev() {
None => {
unreachable!("Init events should have been filtered out of the in memory set");
}
Some(prev) => {
if new_cids.get(&prev).map_or(false, |v| *v) {
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
event.set_deliverable(true);
deliverable.push((event, header));
deliverable.push(event);
// reset the iteration count since we made changes. once it doesn't change for a loop through the queue we're done
iteration = 0;
} else {
undelivered_prevs_in_memory.push_back((event, header));
undelivered_prevs_in_memory.push_back(event);
}
}
}
Expand Down Expand Up @@ -149,7 +147,7 @@ mod test {
async fn get_2_streams() -> (
Vec<ReconItem<EventId>>,
Vec<ReconItem<EventId>>,
Vec<(EventInsertable, EventMetadata)>,
Vec<EventInsertable>,
) {
let stream_2 = get_n_events(10).await;
let stream_1 = get_n_events(10).await;
Expand All @@ -167,11 +165,11 @@ mod test {
fn split_deliverable_order_by_stream(
stream_1: &[ReconItem<EventId>],
stream_2: &[ReconItem<EventId>],
events: &[(EventInsertable, EventMetadata)],
events: &[EventInsertable],
) -> (Vec<EventId>, Vec<EventId>) {
let mut after_1 = Vec::with_capacity(stream_1.len());
let mut after_2 = Vec::with_capacity(stream_2.len());
for (event, _) in events {
for event in events {
assert!(event.deliverable());
if stream_1.iter().any(|e| e.key == *event.order_key()) {
after_1.push(event.order_key().clone());
Expand All @@ -187,10 +185,7 @@ mod test {
async fn get_insertable_events(
events: &[ReconItem<EventId>],
first_vec_count: usize,
) -> (
Vec<(EventInsertable, EventMetadata)>,
Vec<(EventInsertable, EventMetadata)>,
) {
) -> (Vec<EventInsertable>, Vec<EventInsertable>) {
let mut insertable = Vec::with_capacity(first_vec_count);
let mut remaining = Vec::with_capacity(events.len() - first_vec_count);
let mut i = 0;
Expand Down Expand Up @@ -271,7 +266,7 @@ mod test {

let stream_1 = get_n_events(10).await;
let (to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await;
CeramicOneEvent::insert_many(&pool, to_insert.iter().map(|(i, _)| i))
CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();

Expand All @@ -295,16 +290,16 @@ mod test {
let stream_1 = get_n_events(10).await;
let (mut to_insert, mut remaining) = get_insertable_events(&stream_1, 3).await;
for item in to_insert.as_mut_slice() {
item.0.set_deliverable(true)
item.set_deliverable(true)
}

CeramicOneEvent::insert_many(&pool, to_insert.iter().map(|(ei, _)| ei))
CeramicOneEvent::insert_many(&pool, to_insert.iter())
.await
.unwrap();

let expected = remaining
.iter()
.map(|(i, _)| i.order_key().clone())
.map(|i| i.order_key().clone())
.collect::<Vec<_>>();
remaining.shuffle(&mut thread_rng());

Expand All @@ -317,7 +312,7 @@ mod test {
let after = ordered
.deliverable
.iter()
.map(|(e, _)| e.order_key().clone())
.map(|e| e.order_key().clone())
.collect::<Vec<_>>();
assert_eq!(expected, after);
}
Expand Down
6 changes: 3 additions & 3 deletions service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl StreamEvent {
.await
.map_err(Error::new_app)?;

let metadata = EventMetadata::from(parsed);
let metadata = EventMetadata::from(&parsed);

let known_prev = match &metadata {
EventMetadata::Init => {
Expand Down Expand Up @@ -541,7 +541,7 @@ impl OrderingState {
let mut event_cnt = 0;
let mut discovered_inits = Vec::new();
for (cid, parsed_event) in event_data {
let metadata = EventMetadata::from(parsed_event);
let metadata = EventMetadata::from(&parsed_event);

let (stream_cid, loaded) = match &metadata {
EventMetadata::Init => {
Expand Down Expand Up @@ -610,7 +610,7 @@ mod test {
let mut res = Vec::with_capacity(n);
let events = get_n_events(n).await;
for event in events {
let (event, _) = CeramicEventService::parse_discovered_event(&event)
let event = CeramicEventService::parse_discovered_event(&event)
.await
.unwrap();
res.push(event);
Expand Down
Loading

0 comments on commit b87732c

Please sign in to comment.