diff --git a/Cargo.lock b/Cargo.lock index 0fe76242..64242725 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2062,6 +2062,7 @@ version = "0.47.3" dependencies = [ "async-trait", "ceramic-actor-macros", + "ceramic-metrics", "shutdown", "snafu", "tokio", @@ -2075,6 +2076,7 @@ version = "0.47.3" dependencies = [ "async-trait", "ceramic-actor", + "ceramic-metrics", "quote", "syn 2.0.90", "tokio", @@ -2336,6 +2338,7 @@ dependencies = [ "http 1.1.0", "mockall", "object_store", + "prometheus-client", "shutdown", "test-log", "tokio", @@ -2618,6 +2621,7 @@ dependencies = [ "ceramic-actor", "ceramic-core", "ceramic-event", + "ceramic-metrics", "cid 0.11.1", "datafusion", "datafusion-functions-json", @@ -2629,6 +2633,7 @@ dependencies = [ "mockall", "object_store", "parking_lot", + "prometheus-client", "serde", "serde_json", "shutdown", diff --git a/actor-macros/Cargo.toml b/actor-macros/Cargo.toml index 1bcc1777..b3fa0fae 100644 --- a/actor-macros/Cargo.toml +++ b/actor-macros/Cargo.toml @@ -15,5 +15,6 @@ quote = "1.0" [dev-dependencies] ceramic-actor.workspace = true +ceramic-metrics.workspace = true async-trait.workspace = true tokio.workspace = true diff --git a/actor-macros/src/lib.rs b/actor-macros/src/lib.rs index 14eeec6c..f1315b86 100644 --- a/actor-macros/src/lib.rs +++ b/actor-macros/src/lib.rs @@ -14,6 +14,8 @@ use syn::{parse_macro_input, Attribute, DeriveInput, GenericParam, Lit}; /// - handle: The name of the derived `ActorHandle` implementation. /// - actor_trait: The name of the actor specific trait. This is the same as the second /// argument to the actor_envelope! macro. +/// - recorder_trait: The name of the recorder trait. This is the thrid arugment to the +/// actor_envelope! macro. /// /// # Example /// ``` @@ -21,12 +23,13 @@ use syn::{parse_macro_input, Attribute, DeriveInput, GenericParam, Lit}; /// use ceramic_actor::{Actor, actor_envelope}; /// /// #[derive(Actor)] -/// #[actor(envelope = "PlayerEnv", handle = "PlayerH", actor_trait = "PlayerI")] +/// #[actor(envelope = "PlayerEnv", handle = "PlayerH", actor_trait = "PlayerI", recorder_trait = "PlayerR")] /// pub struct Player { } /// /// actor_envelope!{ /// PlayerEnv, /// PlayerI, +/// PlayerR, /// Score => ScoreMessage, /// } /// @@ -50,7 +53,8 @@ pub fn actor(item: TokenStream) -> TokenStream { let struct_name = item.ident; let Config { - trait_name, + actor_trait, + recorder_trait, envelope_name, handle_name, } = Config::from_attributes(&struct_name, &item.attrs); @@ -85,18 +89,24 @@ pub fn actor(item: TokenStream) -> TokenStream { impl #generics ceramic_actor::Actor for #struct_name < #(#generic_types,)*> { type Envelope = #envelope_name; } - impl #generics #trait_name for #struct_name < #(#generic_types,)*> { } + impl #generics #actor_trait for #struct_name < #(#generic_types,)*> { } impl #generics #struct_name < #(#generic_types,)*> { /// Start the actor returning a handle that can be easily cloned and shared. /// The actor stops once all handles are dropped. - pub fn spawn(size: usize, actor: impl #trait_name + ::std::marker::Send + 'static, shutdown: impl ::std::future::Future + ::std::marker::Send + 'static) -> (#handle_name < #(#generic_types,)*>, tokio::task::JoinHandle<()>) { + pub fn spawn( + size: usize, + actor: impl #actor_trait, + recorder: impl #recorder_trait, + shutdown: impl ::std::future::Future + ::std::marker::Send + 'static) -> (#handle_name < #(#generic_types,)*>, tokio::task::JoinHandle<()>, + ) { let (sender, receiver) = ceramic_actor::channel(size); let task_handle = tokio::spawn(async move { #envelope_name::run(actor, receiver, shutdown).await }); ( #handle_name { sender, + recorder: ::std::sync::Arc::new(recorder), #(#phantom_values,)* }, task_handle, @@ -104,16 +114,18 @@ pub fn actor(item: TokenStream) -> TokenStream { } } - /// Handle for [`#struct_name`]. + #[doc = concat!("Handle for [`", stringify!(#actor_trait), "`].")] #[derive(Debug)] pub struct #handle_name #generics { sender: ceramic_actor::Sender<#envelope_name>, + recorder: ::std::sync::Arc, #(#phantom_fields,)* } impl #generics ::core::clone::Clone for #handle_name < #(#generic_types,)*> { fn clone(&self) -> Self { Self{ - sender:self.sender.clone(), + sender: self.sender.clone(), + recorder: self.recorder.clone(), #(#phantom_values,)* } } @@ -126,6 +138,40 @@ pub fn actor(item: TokenStream) -> TokenStream { self.sender.clone() } } + impl #handle_name { + /// Notify the actor of the message. Do not wait for the response. + /// Record the messsage event using the recorder provided to the handler at spawn. + pub async fn notify(&self, msg: Msg) -> ::std::result::Result<(), ::ceramic_actor::Error> + where + Msg: ::ceramic_actor::Message + + ::std::convert::TryFrom<#envelope_name> + + ::std::fmt::Debug + + ::std::marker::Send + + 'static, + >::Error: ::std::fmt::Debug, + #envelope_name: + ::std::convert::From<(Msg, ::tokio::sync::oneshot::Sender)>, + dyn #recorder_trait: ::ceramic_metrics::Recorder<::ceramic_actor::MessageEvent> + Send + 'static, + { + ::ceramic_actor::ActorHandle::notify(self, msg, self.recorder.clone()).await + } + /// Send a message to the actor waiting for the response. + /// Record the messsage event using the recorder provided to the handler at spawn. + pub async fn send(&self, msg: Msg) -> ::std::result::Result> + where + Msg: ::ceramic_actor::Message + + ::std::convert::TryFrom<#envelope_name> + + ::std::fmt::Debug + + ::std::marker::Send + + 'static, + >::Error: ::std::fmt::Debug, + #envelope_name: + ::std::convert::From<(Msg, ::tokio::sync::oneshot::Sender)>, + dyn #recorder_trait: ::ceramic_metrics::Recorder<::ceramic_actor::MessageEvent> + Send + 'static, + { + ::ceramic_actor::ActorHandle::send(self, msg, self.recorder.clone()).await + } + } }; @@ -133,14 +179,17 @@ pub fn actor(item: TokenStream) -> TokenStream { } struct Config { - trait_name: syn::Ident, + actor_trait: syn::Ident, + recorder_trait: syn::Ident, envelope_name: syn::Ident, handle_name: syn::Ident, } impl Config { fn from_attributes(struct_name: &syn::Ident, attrs: &[Attribute]) -> Self { - let mut trait_name = syn::Ident::new(&format!("{}Actor", struct_name), struct_name.span()); + let mut actor_trait = syn::Ident::new(&format!("{}Actor", struct_name), struct_name.span()); + let mut recorder_trait = + syn::Ident::new(&format!("{}Recorder", struct_name), struct_name.span()); let mut envelope_name = syn::Ident::new(&format!("{}Envelope", struct_name), struct_name.span()); let mut handle_name = @@ -161,7 +210,12 @@ impl Config { } else if meta.path.is_ident("actor_trait") { let value: Lit = meta.value()?.parse()?; if let Lit::Str(lit_str) = value { - trait_name = syn::Ident::new(&lit_str.value(), lit_str.span()) + actor_trait = syn::Ident::new(&lit_str.value(), lit_str.span()) + } + } else if meta.path.is_ident("recorder_trait") { + let value: Lit = meta.value()?.parse()?; + if let Lit::Str(lit_str) = value { + recorder_trait = syn::Ident::new(&lit_str.value(), lit_str.span()) } } Ok(()) @@ -170,7 +224,8 @@ impl Config { } } Self { - trait_name, + actor_trait, + recorder_trait, envelope_name, handle_name, } diff --git a/actor/Cargo.toml b/actor/Cargo.toml index b6334366..9bf0c5a6 100644 --- a/actor/Cargo.toml +++ b/actor/Cargo.toml @@ -11,6 +11,7 @@ async-trait.workspace = true tokio.workspace = true tracing.workspace = true ceramic-actor-macros.workspace = true +ceramic-metrics.workspace = true snafu.workspace = true [dev-dependencies] diff --git a/actor/examples/game/main.rs b/actor/examples/game/main.rs index 75e66af2..4e544ce2 100644 --- a/actor/examples/game/main.rs +++ b/actor/examples/game/main.rs @@ -1,7 +1,8 @@ use std::ops::AddAssign; use async_trait::async_trait; -use ceramic_actor::{actor_envelope, Actor, ActorHandle, Error, Handler, Message}; +use ceramic_actor::{actor_envelope, Actor, Error, Handler, Message, MessageEvent}; +use ceramic_metrics::Recorder; use shutdown::Shutdown; use tracing::{instrument, Level}; @@ -20,6 +21,7 @@ impl Game { actor_envelope! { GameEnvelope, GameActor, + GameRecorder, GetScore => GetScoreMessage, Score => ScoreMessage, } @@ -67,7 +69,12 @@ impl Handler for Game { #[derive(Actor)] // The envelope and handle types names can be explicitly named. -#[actor(envelope = "PlayerEnv", handle = "PlayerH", actor_trait = "PlayerI")] +#[actor( + envelope = "PlayerEnv", + handle = "PlayerH", + actor_trait = "PlayerI", + recorder_trait = "PlayerR" +)] pub struct Player { is_home: bool, game: GameHandle, @@ -82,6 +89,7 @@ impl Player { actor_envelope! { PlayerEnv, PlayerI, + PlayerR, Shoot => ShootMessage, } @@ -109,6 +117,22 @@ impl Handler for Player { } } +#[derive(Debug)] +struct NoOpRecorder; + +impl Recorder> for NoOpRecorder { + fn record(&self, _event: &MessageEvent) {} +} +impl Recorder> for NoOpRecorder { + fn record(&self, _event: &MessageEvent) {} +} +impl GameRecorder for NoOpRecorder {} + +impl Recorder> for NoOpRecorder { + fn record(&self, _event: &MessageEvent) {} +} +impl PlayerR for NoOpRecorder {} + #[tokio::main] async fn main() { tracing_subscriber::fmt() @@ -116,11 +140,19 @@ async fn main() { .pretty() .init(); let shutdown = Shutdown::new(); - let (game, _) = Game::spawn(1_000, Game::new(), shutdown.wait_fut()); - let (player_home, _) = - Player::spawn(1_000, Player::new(true, game.clone()), shutdown.wait_fut()); - let (player_away, _) = - Player::spawn(1_000, Player::new(false, game.clone()), shutdown.wait_fut()); + let (game, _) = Game::spawn(1_000, Game::new(), NoOpRecorder, shutdown.wait_fut()); + let (player_home, _) = Player::spawn( + 1_000, + Player::new(true, game.clone()), + NoOpRecorder, + shutdown.wait_fut(), + ); + let (player_away, _) = Player::spawn( + 1_000, + Player::new(false, game.clone()), + NoOpRecorder, + shutdown.wait_fut(), + ); player_home.notify(ShootMessage).await.unwrap(); player_away.send(ShootMessage).await.unwrap(); // Send with retry without cloning the message to be sent. diff --git a/actor/src/lib.rs b/actor/src/lib.rs index ac34d1c7..8c69663c 100644 --- a/actor/src/lib.rs +++ b/actor/src/lib.rs @@ -4,6 +4,7 @@ //! - In memory message passing //! - Strongly typed //! - tracing spans are preserved from caller to actor +//! - first class support for recording metrics about messages //! //! # Example //! ``` @@ -12,8 +13,11 @@ #![warn(missing_docs)] mod macros; +use std::sync::Arc; + pub use ceramic_actor_macros::*; +use ceramic_metrics::Recorder; pub use tracing; use snafu::prelude::*; @@ -83,13 +87,19 @@ pub trait ActorHandle: Clone + Send + Sync + 'static { fn sender(&self) -> Sender<::Envelope>; /// Notify the actor of the message. Do not wait for result of the message. - async fn notify(&self, msg: Msg) -> Result<(), Error> + async fn notify(&self, msg: Msg, recorder: Arc) -> Result<(), Error> where - Msg: Message + Send + std::fmt::Debug + 'static, - ::Envelope: TryInto, - <<::Actor as Actor>::Envelope as TryInto>::Error: std::fmt::Debug, - (Msg, oneshot::Sender): Into<::Envelope>, + Msg: Message + TryFrom<::Envelope> + std::fmt::Debug + Send + 'static, + ::Envelope>>::Error: std::fmt::Debug, + ::Envelope: From<(Msg, oneshot::Sender)>, + R: Recorder> + Send + Sync + ?Sized + 'static, { + let event = MessageEvent { + message: msg, + actor_type: Self::Actor::type_name(), + message_type: Msg::type_name(), + }; + recorder.record(&event); let span = debug_span!( "notify", actor_type = Self::Actor::type_name(), @@ -99,16 +109,13 @@ pub trait ActorHandle: Clone + Send + Sync + 'static { let (tx, _rx) = oneshot::channel(); sender .send(DeliverOp::Notify(Traced { - value: (msg, tx).into(), + value: ::Envelope::from((event.message, tx)), span, })) .await .map_err(|err| { mpsc::error::SendError( - err.0 - .into_inner() - .into_inner() - .try_into() + Msg::try_from(err.0.into_inner().into_inner()) .expect("should be able to extract the message from the envelope"), ) }) @@ -117,13 +124,20 @@ pub trait ActorHandle: Clone + Send + Sync + 'static { } /// Send the actor a message waiting for the result. - async fn send(&self, msg: Msg) -> Result> + /// Additionally record the message event. + async fn send(&self, msg: Msg, recorder: Arc) -> Result> where - Msg: Message + Send + std::fmt::Debug + 'static, - ::Envelope: TryInto, - <<::Actor as Actor>::Envelope as TryInto>::Error: std::fmt::Debug, - (Msg, oneshot::Sender): Into<::Envelope>, + Msg: Message + TryFrom<::Envelope> + std::fmt::Debug + Send + 'static, + ::Envelope>>::Error: std::fmt::Debug, + ::Envelope: From<(Msg, oneshot::Sender)>, + R: Recorder> + Send + Sync + ?Sized + 'static, { + let event = MessageEvent { + message: msg, + actor_type: Self::Actor::type_name(), + message_type: Msg::type_name(), + }; + recorder.record(&event); let span = debug_span!( "send", actor_type = Self::Actor::type_name(), @@ -133,16 +147,13 @@ pub trait ActorHandle: Clone + Send + Sync + 'static { let (tx, rx) = oneshot::channel(); sender .send(DeliverOp::Send(Traced { - value: (msg, tx).into(), + value: ::Envelope::from((event.message, tx)), span, })) .await .map_err(|err| { mpsc::error::SendError( - err.0 - .into_inner() - .into_inner() - .try_into() + Msg::try_from(err.0.into_inner().into_inner()) .expect("should be able to extract message from the envelope"), ) }) @@ -220,3 +231,13 @@ pub enum Error { source: oneshot::error::RecvError, }, } + +/// Metadata about a message used in recording metrics about the occurrence of a message. +pub struct MessageEvent { + /// The message itself. + pub message: M, + /// The name of the type actor handling the message. + pub actor_type: &'static str, + /// The type of the message. + pub message_type: &'static str, +} diff --git a/actor/src/macros.rs b/actor/src/macros.rs index fba597f0..e5a68f8b 100644 --- a/actor/src/macros.rs +++ b/actor/src/macros.rs @@ -2,14 +2,18 @@ /// /// The first identifier is the name of the enum. /// The second identifier is the name of a trait specific to the actor. +/// The third identifier is the name of a trait for recording message events. /// The remaining pairs are the variants of the envelope indicating the messages the actor handles. /// -/// The constructed trait is a union of the [`crate::Handler`] traits for each message along with the [`crate::Actor`] trait. +/// The constructed actor trait is a union of the [`crate::Handler`] traits for each message along with the [`crate::Actor`] trait. +/// +/// The constructed recorder trait is a union of the [`ceramic_metrics::Recorder`] traits for each message. #[macro_export] macro_rules! actor_envelope { ( $enum_name:ident, - $trait_name:ident, + $actor_trait:ident, + $recorder_trait:ident, $( $variant_name:ident => $message_type:ty, )* @@ -47,15 +51,23 @@ macro_rules! actor_envelope { } } } - #[doc = std::stringify!($trait_name)] - #[doc = " is an [`crate::Actor`] and [`crate::Handler`] for each message type in the actor envelope "] + #[doc = std::stringify!($actor_trait)] + #[doc = " is an [`ceramic_actor::Actor`] and [`ceramic_actor::Handler`] for each message type in the actor envelope "] + #[doc = stringify!($enum_name)] + #[doc = "."] + pub trait $actor_trait : $crate::Actor $( + $crate::Handler<$message_type> )* + ::std::marker::Send + 'static { } + + #[doc = std::stringify!($recorder_trait)] + #[doc = " is an [`ceramic_metrics::Recorder`] for each message type in the actor envelope "] #[doc = stringify!($enum_name)] #[doc = "."] - pub trait $trait_name : $crate::Actor $( + $crate::Handler<$message_type> )* { } + pub trait $recorder_trait : $(::ceramic_metrics::Recorder<$crate::MessageEvent<$message_type>> +)* + ::std::fmt::Debug + ::std::marker::Send + ::std::marker::Sync + 'static { } + impl $enum_name { /// Runs the actor handling messages as they arrive. pub async fn run(mut actor: A, mut receiver: $crate::Receiver, mut shutdown: impl ::std::future::Future + ::std::marker::Send + 'static) - where A: $trait_name + where A: $actor_trait { let mut shutdown = Box::pin(shutdown); loop { diff --git a/flight/Cargo.toml b/flight/Cargo.toml index 2efeacd9..be119534 100644 --- a/flight/Cargo.toml +++ b/flight/Cargo.toml @@ -28,6 +28,7 @@ ceramic-pipeline.workspace = true expect-test.workspace = true http.workspace = true mockall.workspace = true +prometheus-client.workspace = true object_store.workspace = true shutdown.workspace = true test-log.workspace = true diff --git a/flight/tests/server.rs b/flight/tests/server.rs index 780e7cd2..4b4cfbf1 100644 --- a/flight/tests/server.rs +++ b/flight/tests/server.rs @@ -5,7 +5,6 @@ use arrow::{compute::concat_batches, util::pretty::pretty_format_batches}; use arrow_array::RecordBatch; use arrow_flight::{sql::client::FlightSqlServiceClient, FlightInfo}; use arrow_schema::Schema; -use ceramic_actor::ActorHandle as _; use ceramic_flight::server::new_server; use ceramic_pipeline::{ aggregator::SubscribeSinceMsg, ConclusionData, ConclusionEvent, ConclusionFeed, ConclusionInit, @@ -16,6 +15,7 @@ use expect_test::expect; use futures::TryStreamExt as _; use http::Uri; use mockall::{mock, predicate}; +use prometheus_client::registry::Registry; use shutdown::Shutdown; use test_log::test; use tokio::net::TcpListener; @@ -33,10 +33,12 @@ async fn channel(addr: &SocketAddr) -> Channel { async fn start_server(feed: MockFeed) -> (FlightSqlServiceClient, PipelineHandle) { let shutdown = Shutdown::new(); + let metrics = ceramic_pipeline::Metrics::register(&mut Registry::default()); let (ctx, _) = ceramic_pipeline::spawn_actors(ceramic_pipeline::Config { aggregator: true, conclusion_feed: feed.into(), object_store: Arc::new(object_store::memory::InMemory::new()), + metrics, shutdown, }) .await diff --git a/one/src/daemon.rs b/one/src/daemon.rs index 3b004ace..d7d02321 100644 --- a/one/src/daemon.rs +++ b/one/src/daemon.rs @@ -515,6 +515,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { let http_metrics = Arc::new(ceramic_metrics::MetricsHandle::register( http_metrics::Metrics::register, )); + let pipeline_metrics = MetricsHandle::register(ceramic_pipeline::Metrics::register); // Create recon store for peers. let peer_svc = ceramic_peer_svc::store::StoreMetricsMiddleware::new( @@ -629,6 +630,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> { aggregator: opts.aggregator.unwrap_or(false), conclusion_feed: feed.into(), object_store, + metrics: pipeline_metrics, shutdown: shutdown.clone(), }) .await?; diff --git a/pipeline/Cargo.toml b/pipeline/Cargo.toml index f488dadc..11558c87 100644 --- a/pipeline/Cargo.toml +++ b/pipeline/Cargo.toml @@ -21,6 +21,7 @@ async-trait.workspace = true ceramic-actor.workspace = true ceramic-core.workspace = true ceramic-event.workspace = true +ceramic-metrics.workspace = true cid.workspace = true datafusion-functions-json = "0.43.0" datafusion.workspace = true @@ -29,16 +30,17 @@ futures.workspace = true int-enum.workspace = true ipld-core.workspace = true json-patch = "3.0.1" +mockall = { workspace = true, optional = true } object_store.workspace = true parking_lot = "0.12.3" +prometheus-client.workspace = true serde.workspace = true serde_json.workspace = true shutdown.workspace = true +tokio-stream = { workspace = true, features = ["sync"] } tokio.workspace = true tracing.workspace = true url.workspace = true -tokio-stream = { workspace = true, features = ["sync"] } -mockall = { workspace = true, optional = true } [dev-dependencies] test-log.workspace = true diff --git a/pipeline/src/aggregator/metrics.rs b/pipeline/src/aggregator/metrics.rs new file mode 100644 index 00000000..35e6147f --- /dev/null +++ b/pipeline/src/aggregator/metrics.rs @@ -0,0 +1,24 @@ +use ceramic_actor::MessageEvent; +use ceramic_metrics::Recorder; + +use crate::metrics::{MessageLabels, Metrics}; + +use super::{AggregatorRecorder, NewConclusionEventsMsg, StreamStateMsg}; + +impl Recorder> for Metrics { + fn record(&self, event: &MessageEvent) { + self.message_count + .get_or_create(&MessageLabels::from(event)) + .inc(); + self.aggregator_new_conclusion_events_count + .inc_by(event.message.events.num_rows() as u64); + } +} +impl Recorder> for Metrics { + fn record(&self, event: &MessageEvent) { + self.message_count + .get_or_create(&MessageLabels::from(event)) + .inc(); + } +} +impl AggregatorRecorder for Metrics {} diff --git a/pipeline/src/aggregator/mock.rs b/pipeline/src/aggregator/mock.rs index 6c98cd0c..e18f5b85 100644 --- a/pipeline/src/aggregator/mock.rs +++ b/pipeline/src/aggregator/mock.rs @@ -2,6 +2,9 @@ use async_trait::async_trait; use ceramic_actor::{Actor, Handler, Message}; use mockall::mock; +use prometheus_client::registry::Registry; + +use crate::metrics::Metrics; use super::{ Aggregator, AggregatorActor, AggregatorEnvelope, AggregatorHandle, NewConclusionEventsMsg, @@ -69,7 +72,9 @@ impl AggregatorActor for MockAggregator {} impl MockAggregator { /// Spawn a mock aggregator actor. pub fn spawn(mock_actor: MockAggregator) -> AggregatorHandle { - let (handle, _task_handle) = Aggregator::spawn(1_000, mock_actor, std::future::pending()); + let metrics = Metrics::register(&mut Registry::default()); + let (handle, _task_handle) = + Aggregator::spawn(1_000, mock_actor, metrics, std::future::pending()); handle } } diff --git a/pipeline/src/aggregator/mod.rs b/pipeline/src/aggregator/mod.rs index af0f5d82..b133c712 100644 --- a/pipeline/src/aggregator/mod.rs +++ b/pipeline/src/aggregator/mod.rs @@ -3,6 +3,7 @@ //! Applies each new event to the previous state of the stream producing the stream state at each //! event in the stream. mod ceramic_patch; +mod metrics; #[cfg(any(test, feature = "mock"))] pub mod mock; @@ -16,7 +17,7 @@ use arrow::{ }; use arrow_schema::SchemaRef; use async_trait::async_trait; -use ceramic_actor::{actor_envelope, Actor, ActorHandle, Handler, Message}; +use ceramic_actor::{actor_envelope, Actor, Handler, Message}; use ceramic_core::StreamId; use ceramic_patch::CeramicPatch; use cid::Cid; @@ -51,6 +52,7 @@ use tracing::{error, instrument}; use crate::{ cache_table::CacheTable, concluder::ConcluderHandle, + metrics::Metrics, schemas, since::{rows_since, StreamTable, StreamTableSource}, PipelineContext, Result, SessionContextRef, @@ -91,6 +93,7 @@ impl Aggregator { ctx: &PipelineContext, max_cached_rows: Option, concluder: ConcluderHandle, + metrics: Metrics, shutdown: Shutdown, ) -> Result<(AggregatorHandle, Vec>)> { let (broadcast_tx, _broadcast_rx) = broadcast::channel(1_000); @@ -99,17 +102,18 @@ impl Aggregator { broadcast_tx, max_cached_rows: max_cached_rows.unwrap_or(DEFAULT_MAX_CACHED_ROWS), }; - Self::spawn_with(size, ctx, aggregator, concluder, shutdown).await + Self::spawn_with(size, ctx, aggregator, concluder, metrics, shutdown).await } /// Spawn the actor given an implementation of the aggregator. pub async fn spawn_with( size: usize, ctx: &PipelineContext, - aggregator: impl AggregatorActor + Send + 'static, + aggregator: impl AggregatorActor, concluder: ConcluderHandle, + metrics: Metrics, shutdown: Shutdown, ) -> Result<(AggregatorHandle, Vec>)> { - let (handle, task_handle) = Self::spawn(size, aggregator, shutdown.wait_fut()); + let (handle, task_handle) = Self::spawn(size, aggregator, metrics, shutdown.wait_fut()); // Register aggregator tables let file_format = ParquetFormat::default().with_enable_pruning(true); @@ -224,6 +228,7 @@ async fn concluder_subscription( actor_envelope! { AggregatorEnvelope, AggregatorActor, + AggregatorRecorder, SubscribeSince => SubscribeSinceMsg, NewConclusionEvents => NewConclusionEventsMsg, StreamState => StreamStateMsg, @@ -576,6 +581,7 @@ mod tests { use futures::stream; use mockall::predicate; use object_store::memory::InMemory; + use prometheus_client::registry::Registry; use test_log::test; use crate::{ @@ -647,6 +653,7 @@ mod tests { object_store: Arc, max_cached_rows: Option, ) -> anyhow::Result { + let metrics = Metrics::register(&mut Registry::default()); let shutdown = Shutdown::new(); let pipeline_ctx = pipeline_ctx(object_store.clone()).await?; let (aggregator, handles) = Aggregator::spawn_new( @@ -654,6 +661,7 @@ mod tests { &pipeline_ctx, max_cached_rows, MockConcluder::spawn(mock_concluder), + metrics, shutdown.clone(), ) .await?; @@ -667,13 +675,13 @@ mod tests { async fn do_test(conclusion_events: RecordBatch) -> anyhow::Result { let ctx = init().await?; - let r = do_pass(&ctx.aggregator, None, Some(conclusion_events)).await; + let r = do_pass(ctx.aggregator.clone(), None, Some(conclusion_events)).await; ctx.shutdown().await?; r } async fn do_pass( - aggregator: &impl ActorHandle, + aggregator: AggregatorHandle, offset: Option, conclusion_events: Option, ) -> anyhow::Result { @@ -960,7 +968,7 @@ mod tests { // events for each pass. let ctx = init().await.unwrap(); let event_states = do_pass( - &ctx.aggregator, + ctx.aggregator.clone(), None, Some( conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { @@ -996,7 +1004,7 @@ mod tests { | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); let event_states = do_pass( - &ctx.aggregator, + ctx.aggregator.clone(), Some(0), Some( conclusion_events_to_record_batch(&[ConclusionEvent::Time(ConclusionTime { @@ -1034,7 +1042,7 @@ mod tests { | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); let event_states = do_pass( - &ctx.aggregator, + ctx.aggregator.clone(), Some(1), Some(conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { index: 2, @@ -1071,7 +1079,7 @@ mod tests { async fn multiple_passes() { let ctx = init().await.unwrap(); let event_states = do_pass( - &ctx.aggregator, + ctx.aggregator.clone(), None, Some( conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { @@ -1107,7 +1115,7 @@ mod tests { | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"content":{"a":0}} | +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-------------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); let event_states = do_pass( - &ctx.aggregator, + ctx.aggregator.clone(), Some(0), Some(conclusion_events_to_record_batch(&[ ConclusionEvent::Time(ConclusionTime { @@ -1244,7 +1252,7 @@ mod tests { }); let ctx = init_with_concluder(mock_concluder).await.unwrap(); - let event_states = do_pass(&ctx.aggregator, None, None).await.unwrap(); + let event_states = do_pass(ctx.aggregator.clone(), None, None).await.unwrap(); expect![[r#" +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+ | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | @@ -1333,7 +1341,7 @@ mod tests { ) .await .unwrap(); - let event_states = do_pass(&ctx.aggregator, None, None).await.unwrap(); + let event_states = do_pass(ctx.aggregator.clone(), None, None).await.unwrap(); expect![[r#" +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+--------------------------------------------------------------+ | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | diff --git a/pipeline/src/concluder/metrics.rs b/pipeline/src/concluder/metrics.rs new file mode 100644 index 00000000..75249804 --- /dev/null +++ b/pipeline/src/concluder/metrics.rs @@ -0,0 +1,22 @@ +use ceramic_actor::MessageEvent; +use ceramic_metrics::Recorder; + +use crate::metrics::{MessageLabels, Metrics}; + +use super::{ConcluderRecorder, EventsSinceMsg, NewEventsMsg}; + +impl Recorder> for Metrics { + fn record(&self, event: &MessageEvent) { + self.message_count + .get_or_create(&MessageLabels::from(event)) + .inc(); + } +} +impl Recorder> for Metrics { + fn record(&self, event: &MessageEvent) { + self.message_count + .get_or_create(&MessageLabels::from(event)) + .inc(); + } +} +impl ConcluderRecorder for Metrics {} diff --git a/pipeline/src/concluder/mock.rs b/pipeline/src/concluder/mock.rs index e0f4f610..c4e08f31 100644 --- a/pipeline/src/concluder/mock.rs +++ b/pipeline/src/concluder/mock.rs @@ -2,6 +2,9 @@ use async_trait::async_trait; use ceramic_actor::{Actor, Handler, Message}; use mockall::mock; +use prometheus_client::registry::Registry; + +use crate::metrics::Metrics; use super::{ Concluder, ConcluderActor, ConcluderEnvelope, ConcluderHandle, EventsSinceMsg, NewEventsMsg, @@ -65,7 +68,9 @@ impl ConcluderActor for MockConcluder {} impl MockConcluder { /// Spawn a mock concluder actor. pub fn spawn(mock_actor: MockConcluder) -> ConcluderHandle { - let (handle, _task_handle) = Concluder::spawn(1_000, mock_actor, std::future::pending()); + let metrics = Metrics::register(&mut Registry::default()); + let (handle, _task_handle) = + Concluder::spawn(1_000, mock_actor, metrics, std::future::pending()); handle } } diff --git a/pipeline/src/concluder/mod.rs b/pipeline/src/concluder/mod.rs index ea6b42de..632df35a 100644 --- a/pipeline/src/concluder/mod.rs +++ b/pipeline/src/concluder/mod.rs @@ -2,6 +2,7 @@ //! In its current form its is just a wrapper over the sqlite tables in order to expose them as //! datafusion tables. mod event; +mod metrics; #[cfg(any(test, feature = "mock"))] pub mod mock; mod table; @@ -11,7 +12,7 @@ use std::{sync::Arc, time::Duration}; use arrow::{array::RecordBatch, compute::kernels::aggregate}; use arrow_schema::SchemaRef; use async_trait::async_trait; -use ceramic_actor::{actor_envelope, Actor, ActorHandle, Handler, Message}; +use ceramic_actor::{actor_envelope, Actor, Handler, Message}; use datafusion::{ common::{cast::as_uint64_array, exec_datafusion_err}, execution::SendableRecordBatchStream, @@ -21,10 +22,16 @@ use datafusion::{ use futures::TryStreamExt as _; use shutdown::{Shutdown, ShutdownSignal}; use table::FeedTable; -use tokio::{select, sync::broadcast, task::JoinHandle, time::interval}; +use tokio::{ + select, + sync::broadcast, + task::JoinHandle, + time::{interval, MissedTickBehavior}, +}; use tracing::{debug, error, warn}; use crate::{ + metrics::Metrics, schemas, since::{rows_since, StreamTable, StreamTableSource}, ConclusionFeedSource, PipelineContext, Result, SessionContextRef, @@ -54,6 +61,7 @@ impl Concluder { size: usize, ctx: &PipelineContext, feed: ConclusionFeedSource, + metrics: Metrics, shutdown: Shutdown, ) -> Result<(ConcluderHandle, Vec>)> { let (broadcast_tx, _broadcast_rx) = broadcast::channel(1_000); @@ -61,7 +69,7 @@ impl Concluder { ctx: ctx.session(), broadcast_tx, }; - let (handle, task_handle) = Self::spawn(size, actor, shutdown.wait_fut()); + let (handle, task_handle) = Self::spawn(size, actor, metrics.clone(), shutdown.wait_fut()); // Register tables let last_processed_index = match feed { ConclusionFeedSource::Direct(conclusion_feed) => { @@ -116,13 +124,14 @@ impl Concluder { poll_handle, session, last_processed_index, + metrics, shutdown.wait_fut(), ) .await { error!(%err, "poll_new_events loop failed") } else { - debug!("poll_new_events task finished"); + error!("poll_new_events task finished"); } }); @@ -133,16 +142,16 @@ impl Concluder { actor_envelope! { ConcluderEnvelope, ConcluderActor, + ConcluderRecorder, NewEvents => NewEventsMsg, SubscribeSince => SubscribeSinceMsg, EventsSince => EventsSinceMsg, - } /// Notify actor of new events #[derive(Debug)] pub struct NewEventsMsg { - /// Events as a record batch, must have the schema of the [`CONCLUSION_EVENTS_TABLE`]. + /// Events as a record batch, must have the schema of the [`crate::schemas::conclusion_events`] table. pub events: RecordBatch, } impl Message for NewEventsMsg { @@ -187,12 +196,15 @@ async fn poll_new_events( handle: ConcluderHandle, ctx: SessionContextRef, mut last_processed_index: Option, + metrics: Metrics, mut shutdown: ShutdownSignal, ) -> anyhow::Result<()> { let mut interval = interval(Duration::from_millis(1_000)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); // Poll for new events until shutdown loop { + metrics.concluder_poll_new_events_loop_count.inc(); select! { _ = &mut shutdown => { break Ok(()); diff --git a/pipeline/src/config.rs b/pipeline/src/config.rs index f9e5313e..778b3b5a 100644 --- a/pipeline/src/config.rs +++ b/pipeline/src/config.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use object_store::ObjectStore; use shutdown::Shutdown; +use crate::Metrics; + /// Configuration for pipeline session. pub struct Config { /// When true the aggregator actor is enabled. @@ -11,6 +13,8 @@ pub struct Config { pub conclusion_feed: ConclusionFeedSource, /// Access to an object store. pub object_store: Arc, + /// Metrics object for recording statistics about pipeline actors. + pub metrics: Metrics, /// A shutdown signal channel. pub shutdown: Shutdown, } diff --git a/pipeline/src/lib.rs b/pipeline/src/lib.rs index b070da0a..a94fd519 100644 --- a/pipeline/src/lib.rs +++ b/pipeline/src/lib.rs @@ -11,6 +11,7 @@ mod cache_table; pub mod cid_string; pub mod concluder; mod config; +mod metrics; pub mod schemas; mod since; #[cfg(test)] @@ -38,6 +39,7 @@ pub use concluder::{ ConclusionInit, ConclusionTime, }; pub use config::{ConclusionFeedSource, Config}; +pub use metrics::Metrics; /// A reference to a shared [`SessionContext`]. /// @@ -175,6 +177,7 @@ pub async fn spawn_actors( 100, &pipeline_ctx, config.conclusion_feed, + config.metrics.clone(), config.shutdown.clone(), ) .await?; @@ -186,6 +189,7 @@ pub async fn spawn_actors( &pipeline_ctx, None, concluder.clone(), + config.metrics.clone(), config.shutdown.clone(), ) .await?; diff --git a/pipeline/src/metrics.rs b/pipeline/src/metrics.rs new file mode 100644 index 00000000..fb8d14c2 --- /dev/null +++ b/pipeline/src/metrics.rs @@ -0,0 +1,64 @@ +use ceramic_actor::MessageEvent; +use ceramic_metrics::register; +use prometheus_client::{ + encoding::EncodeLabelSet, + metrics::{counter::Counter, family::Family}, + registry::Registry, +}; + +/// Metrics for Ceramic Pipeline events +#[derive(Debug, Clone)] +pub struct Metrics { + pub(crate) message_count: Family, + + pub(crate) aggregator_new_conclusion_events_count: Counter, + + pub(crate) concluder_poll_new_events_loop_count: Counter, +} +impl Metrics { + /// Register and construct Metrics + pub fn register(registry: &mut Registry) -> Self { + let sub_registry = registry.sub_registry_with_prefix("pipeline"); + + register!( + message_count, + "Number of messages delivered to actors", + Family::::default(), + sub_registry + ); + register!( + aggregator_new_conclusion_events_count, + "Number of new conclusion events delivered to the aggregator", + Counter::default(), + sub_registry + ); + + register!( + concluder_poll_new_events_loop_count, + "Number of times the loop to poll new conclusion events has run", + Counter::default(), + sub_registry + ); + + Self { + message_count, + aggregator_new_conclusion_events_count, + concluder_poll_new_events_loop_count, + } + } +} + +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub(crate) struct MessageLabels { + actor: &'static str, + message: &'static str, +} + +impl From<&MessageEvent> for MessageLabels { + fn from(value: &MessageEvent) -> Self { + Self { + actor: value.actor_type, + message: value.message_type, + } + } +} diff --git a/pipeline/src/since/metrics.rs b/pipeline/src/since/metrics.rs new file mode 100644 index 00000000..930054d2 --- /dev/null +++ b/pipeline/src/since/metrics.rs @@ -0,0 +1,14 @@ +use ceramic_actor::MessageEvent; +use ceramic_metrics::Recorder; + +use crate::metrics::{MessageLabels, Metrics}; + +use super::SubscribeSinceMsg; + +impl Recorder> for Metrics { + fn record(&self, event: &MessageEvent) { + self.message_count + .get_or_create(&MessageLabels::from(event)) + .inc(); + } +} diff --git a/pipeline/src/since/mod.rs b/pipeline/src/since/mod.rs index 84938721..af6fbbe5 100644 --- a/pipeline/src/since/mod.rs +++ b/pipeline/src/since/mod.rs @@ -4,6 +4,7 @@ //! //! Actor handles may implement [`StreamTableSource`] and register a [`StreamTable`] on the [`datafusion::execution::context::SessionContext`] in order to provide query access to the stream. +mod metrics; mod stream; pub use stream::{StreamTable, StreamTableSource};