Skip to content

Commit

Permalink
chore: Flatten EventInsertable type (#480)
Browse files Browse the repository at this point in the history
Removes the EventInsertableBody type and pushes its fields up into the parent EventInsertable
  • Loading branch information
stbrody authored Aug 16, 2024
1 parent 163ce35 commit 07d4093
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 91 deletions.
20 changes: 10 additions & 10 deletions service/src/event/order_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl OrderEvents {
HashMap::from_iter(candidate_events.iter_mut().map(|(e, meta)| {
// all init events are deliverable so we mark them as such before we do anything else
if matches!(meta, EventMetadata::Init { .. }) {
e.body.set_deliverable(true);
e.set_deliverable(true);
}
(e.cid(), e.deliverable())
}));
Expand Down Expand Up @@ -73,7 +73,7 @@ impl OrderEvents {
Some(prev) => {
if let Some(in_mem_is_deliverable) = new_cids.get(&prev) {
if *in_mem_is_deliverable {
event.body.set_deliverable(true);
event.set_deliverable(true);
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
deliverable.push((event, header));
} else {
Expand All @@ -83,7 +83,7 @@ impl OrderEvents {
let (_exists, prev_deliverable) =
CeramicOneEvent::deliverable_by_cid(pool, &prev).await?;
if prev_deliverable {
event.body.set_deliverable(true);
event.set_deliverable(true);
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
deliverable.push((event, header));
} else {
Expand All @@ -110,7 +110,7 @@ impl OrderEvents {
Some(prev) => {
if new_cids.get(&prev).map_or(false, |v| *v) {
*new_cids.get_mut(&event.cid()).expect("CID must exist") = true;
event.body.set_deliverable(true);
event.set_deliverable(true);
deliverable.push((event, header));
// reset the iteration count since we made changes. once it doesn't change for a loop through the queue we're done
iteration = 0;
Expand Down Expand Up @@ -171,10 +171,10 @@ mod test {
let mut after_2 = Vec::with_capacity(stream_2.len());
for (event, _) in events {
assert!(event.deliverable());
if stream_1.iter().any(|e| e.key == event.order_key) {
after_1.push(event.order_key.clone());
if stream_1.iter().any(|e| e.key == *event.order_key()) {
after_1.push(event.order_key().clone());
} else {
after_2.push(event.order_key.clone());
after_2.push(event.order_key().clone());
}
}

Expand Down Expand Up @@ -290,7 +290,7 @@ mod test {
.iter_mut()
.take(3)
.map(|(i, _)| {
i.body.set_deliverable(true);
i.set_deliverable(true);
i.clone()
})
.collect::<Vec<_>>();
Expand All @@ -301,7 +301,7 @@ mod test {

let expected = remaining
.iter()
.map(|(i, _)| i.order_key.clone())
.map(|(i, _)| i.order_key().clone())
.collect::<Vec<_>>();
remaining.shuffle(&mut thread_rng());

Expand All @@ -314,7 +314,7 @@ mod test {
let after = ordered
.deliverable
.iter()
.map(|(e, _)| e.order_key.clone())
.map(|(e, _)| e.order_key().clone())
.collect::<Vec<_>>();
assert_eq!(expected, after);
}
Expand Down
2 changes: 1 addition & 1 deletion service/src/event/ordering_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ mod test {
async fn insert_10_with_9_undelivered(pool: &SqlitePool) {
let insertable = get_n_insertable_events(10).await;
let mut init = insertable.first().unwrap().to_owned();
init.body.set_deliverable(true);
init.set_deliverable(true);
let undelivered = insertable.into_iter().skip(1).collect::<Vec<_>>();

let new = CeramicOneEvent::insert_many(pool, undelivered.iter())
Expand Down
12 changes: 5 additions & 7 deletions service/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::collections::HashSet;
use async_trait::async_trait;
use ceramic_core::{EventId, Network};
use ceramic_event::unvalidated;
use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool};
use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool};
use cid::Cid;
use futures::stream::BoxStream;
use ipld_core::ipld::Ipld;
Expand Down Expand Up @@ -134,13 +134,11 @@ impl CeramicEventService {
)));
}

let body = EventInsertableBody::try_from_carfile(cid, item.value.as_slice()).await?;
let event_insertable =
EventInsertable::try_from_carfile(item.key.to_owned(), item.value.as_slice()).await?;
let metadata = EventMetadata::from(parsed_event);

Ok((
EventInsertable::try_new(item.key.to_owned(), body)?,
metadata,
))
Ok((event_insertable, metadata))
}

pub(crate) async fn insert_events(
Expand Down Expand Up @@ -169,7 +167,7 @@ impl CeramicEventService {
let to_insert = ordered.deliverable().iter().map(|(e, _)| e);
invalid.extend(ordered.missing_history().iter().map(|(e, _)| {
InvalidItem::RequiresHistory {
key: e.order_key.clone(),
key: e.order_key().clone(),
}
}));
CeramicOneEvent::insert_many(&self.pool, to_insert).await?
Expand Down
2 changes: 1 addition & 1 deletion store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod sql;
pub use error::Error;
pub use metrics::{Metrics, StoreMetricsMiddleware};
pub use sql::{
entities::{BlockHash, EventBlockRaw, EventInsertable, EventInsertableBody},
entities::{BlockHash, EventBlockRaw, EventInsertable},
CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, CeramicOneVersion,
InsertResult, InsertedEvent, Migrations, SqlitePool, SqliteRootStore, SqliteTransaction,
};
Expand Down
6 changes: 3 additions & 3 deletions store/src/sql/access/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@ impl CeramicOneEvent {
let mut tx = pool.begin_tx().await.map_err(Error::from)?;

for item in to_add {
let new_key = Self::insert_event(&mut tx, &item.order_key, item.deliverable()).await?;
let new_key = Self::insert_event(&mut tx, item.order_key(), item.deliverable()).await?;
inserted.push(InsertedEvent::new(
item.order_key.clone(),
item.order_key().clone(),
new_key,
item.deliverable(),
));
if new_key {
for block in item.body.blocks().iter() {
for block in item.blocks().iter() {
CeramicOneBlock::insert(&mut tx, block.multihash.inner(), &block.bytes).await?;
CeramicOneEventBlock::insert(&mut tx, block).await?;
}
Expand Down
88 changes: 26 additions & 62 deletions store/src/sql/entities/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,46 +43,7 @@ pub async fn rebuild_car(blocks: Vec<BlockRow>) -> Result<Option<Vec<u8>>> {
/// The type we use to insert events into the database
pub struct EventInsertable {
/// The event order key (e.g. EventID)
pub order_key: EventId,
/// The data that makes up the event
pub body: EventInsertableBody,
}

impl EventInsertable {
/// Try to build the EventInsertable struct from a carfile.
pub async fn try_from_carfile(order_key: EventId, body: &[u8]) -> Result<Self> {
let cid = order_key.cid().ok_or_else(|| {
Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", order_key))
})?;
let body = EventInsertableBody::try_from_carfile(cid, body).await?;
Ok(Self { order_key, body })
}

/// Build the EventInsertable struct from an EventID and EventInsertableBody.
/// Will error if the CID in the EventID doesn't match the CID in the EventInsertableBody.
pub fn try_new(order_key: EventId, body: EventInsertableBody) -> Result<Self> {
if order_key.cid() != Some(body.cid()) {
return Err(Error::new_invalid_arg(anyhow!(
"EventID CID does not match the body CID"
)));
}
Ok(Self { order_key, body })
}

/// Get the CID of the event
pub fn cid(&self) -> Cid {
self.body.cid
}

/// Whether this event is deliverable currently
pub fn deliverable(&self) -> bool {
self.body.deliverable()
}
}

#[derive(Debug, Clone)]
/// The type we use to insert events into the database
pub struct EventInsertableBody {
order_key: EventId,
/// The event CID i.e. the root CID from the car file
cid: Cid,
/// Whether the event is deliverable i.e. it's prev has been delivered and the chain is continuous to an init event
Expand All @@ -92,22 +53,30 @@ pub struct EventInsertableBody {
blocks: Vec<EventBlockRaw>,
}

impl EventInsertableBody {
/// Create a new EventInsertRaw struct. Deliverable is set to false by default.
pub fn new(cid: Cid, blocks: Vec<EventBlockRaw>, deliverable: bool) -> Self {
impl EventInsertable {
/// EventInsertable constructor
pub fn new(order_key: EventId, blocks: Vec<EventBlockRaw>, deliverable: bool) -> Self {
let cid = order_key.cid().unwrap();

Self {
order_key,
cid,
deliverable,
blocks,
}
}

/// Get the Recon order key (EventId) of the event.
pub fn order_key(&self) -> &EventId {
&self.order_key
}

/// Get the CID of the event
pub fn cid(&self) -> Cid {
self.cid
}

/// Whether this event is deliverable currently
/// Underlying bytes that make up the event
pub fn blocks(&self) -> &Vec<EventBlockRaw> {
&self.blocks
}
Expand All @@ -123,30 +92,20 @@ impl EventInsertableBody {
self.deliverable = deliverable;
}

/// Find a block from the carfile for a given CID if it's included
pub fn block_for_cid_opt(&self, cid: &Cid) -> Option<&EventBlockRaw> {
self.blocks
.iter()
.find(|b| Cid::new_v1(b.codec.try_into().unwrap(), *b.multihash.inner()) == *cid)
}

/// Find a block from the carfile for a given CID if it's included
pub fn block_for_cid(&self, cid: &Cid) -> Result<&EventBlockRaw> {
self.block_for_cid_opt(cid)
.ok_or_else(|| Error::new_app(anyhow!("Event data is missing data for CID {}", cid)))
}
/// Try to build the EventInsertable struct from a carfile.
pub async fn try_from_carfile(order_key: EventId, car_bytes: &[u8]) -> Result<Self> {
let event_cid = order_key.cid().ok_or_else(|| {
Error::new_invalid_arg(anyhow::anyhow!("EventID is missing a CID: {}", order_key))
})?;

/// Builds a new EventInsertRaw from a CAR file. Will error if the CID in the EventID doesn't match the
/// first root of the carfile.
pub async fn try_from_carfile(event_cid: Cid, val: &[u8]) -> Result<Self> {
if val.is_empty() {
if car_bytes.is_empty() {
return Err(Error::new_app(anyhow!(
"CAR file is empty: cid={}",
event_cid
)))?;
}

let mut reader = CarReader::new(val)
let mut reader = CarReader::new(car_bytes)
.await
.map_err(|e| Error::new_app(anyhow!(e)))?;
let root_cid = reader
Expand All @@ -169,6 +128,11 @@ impl EventInsertableBody {
blocks.push(ebr);
idx += 1;
}
Ok(Self::new(event_cid, blocks, false))
Ok(Self {
order_key,
cid: event_cid,
blocks,
deliverable: false,
})
}
}
2 changes: 1 addition & 1 deletion store/src/sql/entities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod utils;
mod version;

pub use block::{BlockBytes, BlockRow};
pub use event::{rebuild_car, EventInsertable, EventInsertableBody};
pub use event::{rebuild_car, EventInsertable};
pub use event_block::{EventBlockRaw, ReconEventBlockRaw};
pub use hash::{BlockHash, ReconHash};
pub use version::VersionRow;
Expand Down
8 changes: 2 additions & 6 deletions store/src/sql/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use cid::Cid;
use expect_test::expect;
use test_log::test;

use crate::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool};
use crate::{CeramicOneEvent, EventInsertable, SqlitePool};

const MODEL_ID: &str = "k2t6wz4yhfp1r5pwi52gw89nzjbu53qk7m32o5iguw42c6knsaj0feuf927agb";
const CONTROLLER: &str = "did:key:z6Mkqtw7Pj5Lv9xc4PgUYAnwfaVoMC6FRneGWVr5ekTEfKVL";
Expand All @@ -28,11 +28,7 @@ fn random_event(cid: &str) -> EventInsertable {
let order_key = event_id_builder()
.with_event(&Cid::from_str(cid).unwrap())
.build();
let cid = order_key.cid().unwrap();
EventInsertable {
order_key,
body: EventInsertableBody::new(cid, vec![], true),
}
EventInsertable::new(order_key, vec![], true)
}

#[test(tokio::test)]
Expand Down

0 comments on commit 07d4093

Please sign in to comment.