From 479baeda6853a20ed763f52cd64a357d161b761b Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Tue, 20 Aug 2024 10:20:15 -0400 Subject: [PATCH] chore: Add logic to dispatch validation based on event type (#493) * chore: Add logic to dispatch validation based on event type. * review comments --- service/src/event/service.rs | 85 +++++++++++++++++++++++++++++------- 1 file changed, 70 insertions(+), 15 deletions(-) diff --git a/service/src/event/service.rs b/service/src/event/service.rs index c9be0cfc7..aa2a79bb2 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -1,21 +1,22 @@ use std::collections::HashSet; +use super::{ + migration::Migrator, + order_events::OrderEvents, + ordering_task::{DeliverableTask, OrderingTask}, +}; use async_trait::async_trait; use ceramic_core::{EventId, Network}; use ceramic_event::unvalidated; +use ceramic_event::unvalidated::Event; use ceramic_store::{CeramicOneEvent, EventInsertable, SqlitePool}; use cid::Cid; use futures::stream::BoxStream; use ipld_core::ipld::Ipld; use recon::ReconItem; +use tokio::try_join; use tracing::{trace, warn}; -use super::{ - migration::Migrator, - order_events::OrderEvents, - ordering_task::{DeliverableTask, OrderingTask}, -}; - use crate::{Error, Result}; /// How many events to select at once to see if they've become deliverable when we have downtime @@ -129,23 +130,77 @@ impl CeramicEventService { )?) } - pub(crate) async fn insert_events( - &self, - items: &[recon::ReconItem], - source: DeliverableRequirement, - ) -> Result { - let mut to_insert = Vec::new(); - let mut invalid = Vec::new(); + async fn validate_signed_events( + events: Vec, + ) -> Result<(Vec, Vec)> { + // TODO: IMPLEMENT THIS + Ok((events, Vec::new())) + } + + async fn validate_time_events( + events: Vec, + ) -> Result<(Vec, Vec)> { + // TODO: IMPLEMENT THIS + Ok((events, Vec::new())) + } + + pub(crate) async fn validate_events( + items: &[ReconItem], + ) -> Result<(Vec, Vec)> { + let mut parsed_events = Vec::with_capacity(items.len()); + let mut invalid_events = Vec::new(); for event in items { match Self::parse_discovered_event(event).await { - Ok(insertable) => to_insert.push(insertable), - Err(err) => invalid.push(InvalidItem::InvalidFormat { + Ok(insertable) => parsed_events.push(insertable), + Err(err) => invalid_events.push(InvalidItem::InvalidFormat { key: event.key.clone(), reason: err.to_string(), }), } } + // Group events by their type + let mut valid_events = Vec::with_capacity(parsed_events.len()); + let mut signed_events = Vec::with_capacity(parsed_events.len()); + let mut time_events = Vec::with_capacity(parsed_events.len()); + for event in parsed_events { + match event.event() { + Event::Time(_) => { + time_events.push(event); + } + Event::Signed(_) => { + signed_events.push(event); + } + Event::Unsigned(_) => { + // Unsigned events need no extra validation. + valid_events.push(event); + } + } + } + + let ( + (valid_signed_events, invalid_signed_events), + (valid_time_events, invalid_time_events), + ) = try_join!( + Self::validate_signed_events(signed_events), + Self::validate_time_events(time_events) + )?; + + valid_events.extend(valid_signed_events); + valid_events.extend(valid_time_events); + invalid_events.extend(invalid_signed_events); + invalid_events.extend(invalid_time_events); + + Ok((valid_events, invalid_events)) + } + + pub(crate) async fn insert_events( + &self, + items: &[ReconItem], + source: DeliverableRequirement, + ) -> Result { + let (to_insert, mut invalid) = Self::validate_events(items).await?; + let ordered = OrderEvents::try_new(&self.pool, to_insert).await?; // api writes shouldn't have any missed history so we don't insert those events and