Skip to content

Commit

Permalink
chore: Add logic to dispatch validation based on event type (#493)
Browse files Browse the repository at this point in the history
* chore: Add logic to dispatch validation based on event type.

* review comments
  • Loading branch information
stbrody authored Aug 20, 2024
1 parent 1d8a224 commit 479baed
Showing 1 changed file with 70 additions and 15 deletions.
85 changes: 70 additions & 15 deletions service/src/event/service.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
use std::collections::HashSet;

use super::{
migration::Migrator,
order_events::OrderEvents,
ordering_task::{DeliverableTask, OrderingTask},
};
use async_trait::async_trait;
use ceramic_core::{EventId, Network};
use ceramic_event::unvalidated;
use ceramic_event::unvalidated::Event;
use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool};
use cid::Cid;
use futures::stream::BoxStream;
use ipld_core::ipld::Ipld;
use recon::ReconItem;
use tokio::try_join;
use tracing::{trace, warn};

use super::{
migration::Migrator,
order_events::OrderEvents,
ordering_task::{DeliverableTask, OrderingTask},
};

use crate::{Error, Result};

/// How many events to select at once to see if they've become deliverable when we have downtime
Expand Down Expand Up @@ -129,23 +130,77 @@ impl CeramicEventService {
)?)
}

pub(crate) async fn insert_events(
&self,
items: &[recon::ReconItem<EventId>],
source: DeliverableRequirement,
) -> Result<InsertResult> {
let mut to_insert = Vec::new();
let mut invalid = Vec::new();
async fn validate_signed_events(
events: Vec<EventInsertable>,
) -> Result<(Vec<EventInsertable>, Vec<InvalidItem>)> {
// TODO: IMPLEMENT THIS
Ok((events, Vec::new()))
}

async fn validate_time_events(
events: Vec<EventInsertable>,
) -> Result<(Vec<EventInsertable>, Vec<InvalidItem>)> {
// TODO: IMPLEMENT THIS
Ok((events, Vec::new()))
}

pub(crate) async fn validate_events(
items: &[ReconItem<EventId>],
) -> Result<(Vec<EventInsertable>, Vec<InvalidItem>)> {
let mut parsed_events = Vec::with_capacity(items.len());
let mut invalid_events = Vec::new();
for event in items {
match Self::parse_discovered_event(event).await {
Ok(insertable) => to_insert.push(insertable),
Err(err) => invalid.push(InvalidItem::InvalidFormat {
Ok(insertable) => parsed_events.push(insertable),
Err(err) => invalid_events.push(InvalidItem::InvalidFormat {
key: event.key.clone(),
reason: err.to_string(),
}),
}
}

// Group events by their type
let mut valid_events = Vec::with_capacity(parsed_events.len());
let mut signed_events = Vec::with_capacity(parsed_events.len());
let mut time_events = Vec::with_capacity(parsed_events.len());
for event in parsed_events {
match event.event() {
Event::Time(_) => {
time_events.push(event);
}
Event::Signed(_) => {
signed_events.push(event);
}
Event::Unsigned(_) => {
// Unsigned events need no extra validation.
valid_events.push(event);
}
}
}

let (
(valid_signed_events, invalid_signed_events),
(valid_time_events, invalid_time_events),
) = try_join!(
Self::validate_signed_events(signed_events),
Self::validate_time_events(time_events)
)?;

valid_events.extend(valid_signed_events);
valid_events.extend(valid_time_events);
invalid_events.extend(invalid_signed_events);
invalid_events.extend(invalid_time_events);

Ok((valid_events, invalid_events))
}

pub(crate) async fn insert_events(
&self,
items: &[ReconItem<EventId>],
source: DeliverableRequirement,
) -> Result<InsertResult> {
let (to_insert, mut invalid) = Self::validate_events(items).await?;

let ordered = OrderEvents::try_new(&self.pool, to_insert).await?;

// api writes shouldn't have any missed history so we don't insert those events and
Expand Down

0 comments on commit 479baed

Please sign in to comment.