Skip to content

Commit

Permalink
feat: add pipeline metrics
Browse files Browse the repository at this point in the history
Adds metrics for the pipeline system. Support was added to the actor
framework to directly support metrics.
  • Loading branch information
nathanielc committed Jan 21, 2025
1 parent 6115889 commit e3ab6e5
Show file tree
Hide file tree
Showing 22 changed files with 361 additions and 64 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions actor-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ quote = "1.0"

[dev-dependencies]
ceramic-actor.workspace = true
ceramic-metrics.workspace = true
async-trait.workspace = true
tokio.workspace = true
73 changes: 64 additions & 9 deletions actor-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@ use syn::{parse_macro_input, Attribute, DeriveInput, GenericParam, Lit};
/// - handle: The name of the derived `ActorHandle` implementation.
/// - actor_trait: The name of the actor specific trait. This is the same as the second
/// argument to the actor_envelope! macro.
/// - recorder_trait: The name of the recorder trait. This is the thrid arugment to the
/// actor_envelope! macro.
///
/// # Example
/// ```
/// # use ceramic_actor::{Handler, Message};
/// use ceramic_actor::{Actor, actor_envelope};
///
/// #[derive(Actor)]
/// #[actor(envelope = "PlayerEnv", handle = "PlayerH", actor_trait = "PlayerI")]
/// #[actor(envelope = "PlayerEnv", handle = "PlayerH", actor_trait = "PlayerI", recorder_trait = "PlayerR")]
/// pub struct Player { }
///
/// actor_envelope!{
/// PlayerEnv,
/// PlayerI,
/// PlayerR,
/// Score => ScoreMessage,
/// }
///
Expand All @@ -50,7 +53,8 @@ pub fn actor(item: TokenStream) -> TokenStream {
let struct_name = item.ident;

let Config {
trait_name,
actor_trait,
recorder_trait,
envelope_name,
handle_name,
} = Config::from_attributes(&struct_name, &item.attrs);
Expand Down Expand Up @@ -85,18 +89,24 @@ pub fn actor(item: TokenStream) -> TokenStream {
impl #generics ceramic_actor::Actor for #struct_name < #(#generic_types,)*> {
type Envelope = #envelope_name;
}
impl #generics #trait_name for #struct_name < #(#generic_types,)*> { }
impl #generics #actor_trait for #struct_name < #(#generic_types,)*> { }

impl #generics #struct_name < #(#generic_types,)*> {
/// Start the actor returning a handle that can be easily cloned and shared.
/// The actor stops once all handles are dropped.
pub fn spawn(size: usize, actor: impl #trait_name + ::std::marker::Send + 'static, shutdown: impl ::std::future::Future<Output=()> + ::std::marker::Send + 'static) -> (#handle_name < #(#generic_types,)*>, tokio::task::JoinHandle<()>) {
pub fn spawn(
size: usize,
actor: impl #actor_trait,
recorder: impl #recorder_trait,
shutdown: impl ::std::future::Future<Output=()> + ::std::marker::Send + 'static) -> (#handle_name < #(#generic_types,)*>, tokio::task::JoinHandle<()>,
) {
let (sender, receiver) = ceramic_actor::channel(size);
let task_handle = tokio::spawn(async move { #envelope_name::run(actor, receiver, shutdown).await });

(
#handle_name {
sender,
recorder: ::std::sync::Arc::new(recorder),
#(#phantom_values,)*
},
task_handle,
Expand All @@ -108,12 +118,14 @@ pub fn actor(item: TokenStream) -> TokenStream {
#[derive(Debug)]
pub struct #handle_name #generics {
sender: ceramic_actor::Sender<#envelope_name>,
recorder: ::std::sync::Arc<dyn #recorder_trait>,
#(#phantom_fields,)*
}
impl #generics ::core::clone::Clone for #handle_name < #(#generic_types,)*> {
fn clone(&self) -> Self {
Self{
sender:self.sender.clone(),
sender: self.sender.clone(),
recorder: self.recorder.clone(),
#(#phantom_values,)*
}
}
Expand All @@ -126,21 +138,58 @@ pub fn actor(item: TokenStream) -> TokenStream {
self.sender.clone()
}
}
impl #handle_name {
/// Notify the actor of the message. Do not wait for the response.
/// Record the messsage event using the recorder provided to the handler at spawn.
pub async fn notify<Msg>(&self, msg: Msg) -> ::std::result::Result<(), ::ceramic_actor::Error<Msg>>
where
Msg: ::ceramic_actor::Message
+ ::std::convert::TryFrom<#envelope_name>
+ ::std::fmt::Debug
+ ::std::marker::Send
+ 'static,
<Msg as ::std::convert::TryFrom<#envelope_name>>::Error: ::std::fmt::Debug,
#envelope_name:
::std::convert::From<(Msg, ::tokio::sync::oneshot::Sender<Msg::Result>)>,
dyn #recorder_trait: ::ceramic_metrics::Recorder<::ceramic_actor::MessageEvent<Msg>> + Send + 'static,
{
::ceramic_actor::ActorHandle::notify(self, msg, self.recorder.clone()).await
}
/// Send a message to the actor waiting for the response.
/// Record the messsage event using the recorder provided to the handler at spawn.
pub async fn send<Msg>(&self, msg: Msg) -> ::std::result::Result<Msg::Result, ::ceramic_actor::Error<Msg>>
where
Msg: ::ceramic_actor::Message
+ ::std::convert::TryFrom<#envelope_name>
+ ::std::fmt::Debug
+ ::std::marker::Send
+ 'static,
<Msg as ::std::convert::TryFrom<#envelope_name>>::Error: ::std::fmt::Debug,
#envelope_name:
::std::convert::From<(Msg, ::tokio::sync::oneshot::Sender<Msg::Result>)>,
dyn #recorder_trait: ::ceramic_metrics::Recorder<::ceramic_actor::MessageEvent<Msg>> + Send + 'static,
{
::ceramic_actor::ActorHandle::send(self, msg, self.recorder.clone()).await
}
}

};

TokenStream::from(expanded)
}

struct Config {
trait_name: syn::Ident,
actor_trait: syn::Ident,
recorder_trait: syn::Ident,
envelope_name: syn::Ident,
handle_name: syn::Ident,
}

impl Config {
fn from_attributes(struct_name: &syn::Ident, attrs: &[Attribute]) -> Self {
let mut trait_name = syn::Ident::new(&format!("{}Actor", struct_name), struct_name.span());
let mut actor_trait = syn::Ident::new(&format!("{}Actor", struct_name), struct_name.span());
let mut recorder_trait =
syn::Ident::new(&format!("{}Recorder", struct_name), struct_name.span());
let mut envelope_name =
syn::Ident::new(&format!("{}Envelope", struct_name), struct_name.span());
let mut handle_name =
Expand All @@ -161,7 +210,12 @@ impl Config {
} else if meta.path.is_ident("actor_trait") {
let value: Lit = meta.value()?.parse()?;
if let Lit::Str(lit_str) = value {
trait_name = syn::Ident::new(&lit_str.value(), lit_str.span())
actor_trait = syn::Ident::new(&lit_str.value(), lit_str.span())
}
} else if meta.path.is_ident("recorder_trait") {
let value: Lit = meta.value()?.parse()?;
if let Lit::Str(lit_str) = value {
recorder_trait = syn::Ident::new(&lit_str.value(), lit_str.span())
}
}
Ok(())
Expand All @@ -170,7 +224,8 @@ impl Config {
}
}
Self {
trait_name,
actor_trait,
recorder_trait,
envelope_name,
handle_name,
}
Expand Down
1 change: 1 addition & 0 deletions actor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async-trait.workspace = true
tokio.workspace = true
tracing.workspace = true
ceramic-actor-macros.workspace = true
ceramic-metrics.workspace = true
snafu.workspace = true

[dev-dependencies]
Expand Down
46 changes: 39 additions & 7 deletions actor/examples/game/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::ops::AddAssign;

use async_trait::async_trait;
use ceramic_actor::{actor_envelope, Actor, ActorHandle, Error, Handler, Message};
use ceramic_actor::{actor_envelope, Actor, Error, Handler, Message, MessageEvent};
use ceramic_metrics::Recorder;
use shutdown::Shutdown;
use tracing::{instrument, Level};

Expand All @@ -20,6 +21,7 @@ impl Game {
actor_envelope! {
GameEnvelope,
GameActor,
GameRecorder,
GetScore => GetScoreMessage,
Score => ScoreMessage,
}
Expand Down Expand Up @@ -67,7 +69,12 @@ impl Handler<GetScoreMessage> for Game {

#[derive(Actor)]
// The envelope and handle types names can be explicitly named.
#[actor(envelope = "PlayerEnv", handle = "PlayerH", actor_trait = "PlayerI")]
#[actor(
envelope = "PlayerEnv",
handle = "PlayerH",
actor_trait = "PlayerI",
recorder_trait = "PlayerR"
)]
pub struct Player {
is_home: bool,
game: GameHandle,
Expand All @@ -82,6 +89,7 @@ impl Player {
actor_envelope! {
PlayerEnv,
PlayerI,
PlayerR,
Shoot => ShootMessage,
}

Expand Down Expand Up @@ -109,18 +117,42 @@ impl Handler<ShootMessage> for Player {
}
}

#[derive(Debug)]
struct NoOpRecorder;

impl Recorder<MessageEvent<GetScoreMessage>> for NoOpRecorder {
fn record(&self, _event: &MessageEvent<GetScoreMessage>) {}
}
impl Recorder<MessageEvent<ScoreMessage>> for NoOpRecorder {
fn record(&self, _event: &MessageEvent<ScoreMessage>) {}
}
impl GameRecorder for NoOpRecorder {}

impl Recorder<MessageEvent<ShootMessage>> for NoOpRecorder {
fn record(&self, _event: &MessageEvent<ShootMessage>) {}
}
impl PlayerR for NoOpRecorder {}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.pretty()
.init();
let shutdown = Shutdown::new();
let (game, _) = Game::spawn(1_000, Game::new(), shutdown.wait_fut());
let (player_home, _) =
Player::spawn(1_000, Player::new(true, game.clone()), shutdown.wait_fut());
let (player_away, _) =
Player::spawn(1_000, Player::new(false, game.clone()), shutdown.wait_fut());
let (game, _) = Game::spawn(1_000, Game::new(), NoOpRecorder, shutdown.wait_fut());
let (player_home, _) = Player::spawn(
1_000,
Player::new(true, game.clone()),
NoOpRecorder,
shutdown.wait_fut(),
);
let (player_away, _) = Player::spawn(
1_000,
Player::new(false, game.clone()),
NoOpRecorder,
shutdown.wait_fut(),
);
player_home.notify(ShootMessage).await.unwrap();
player_away.send(ShootMessage).await.unwrap();
// Send with retry without cloning the message to be sent.
Expand Down
Loading

0 comments on commit e3ab6e5

Please sign in to comment.