Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pipeline metrics #646

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions actor-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ quote = "1.0"

[dev-dependencies]
ceramic-actor.workspace = true
ceramic-metrics.workspace = true
async-trait.workspace = true
tokio.workspace = true
75 changes: 65 additions & 10 deletions actor-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@ use syn::{parse_macro_input, Attribute, DeriveInput, GenericParam, Lit};
/// - handle: The name of the derived `ActorHandle` implementation.
/// - actor_trait: The name of the actor specific trait. This is the same as the second
/// argument to the actor_envelope! macro.
/// - recorder_trait: The name of the recorder trait. This is the thrid arugment to the
/// actor_envelope! macro.
///
/// # Example
/// ```
/// # use ceramic_actor::{Handler, Message};
/// use ceramic_actor::{Actor, actor_envelope};
///
/// #[derive(Actor)]
/// #[actor(envelope = "PlayerEnv", handle = "PlayerH", actor_trait = "PlayerI")]
/// #[actor(envelope = "PlayerEnv", handle = "PlayerH", actor_trait = "PlayerI", recorder_trait = "PlayerR")]
/// pub struct Player { }
///
/// actor_envelope!{
/// PlayerEnv,
/// PlayerI,
/// PlayerR,
/// Score => ScoreMessage,
/// }
///
Expand All @@ -50,7 +53,8 @@ pub fn actor(item: TokenStream) -> TokenStream {
let struct_name = item.ident;

let Config {
trait_name,
actor_trait,
recorder_trait,
envelope_name,
handle_name,
} = Config::from_attributes(&struct_name, &item.attrs);
Expand Down Expand Up @@ -85,35 +89,43 @@ pub fn actor(item: TokenStream) -> TokenStream {
impl #generics ceramic_actor::Actor for #struct_name < #(#generic_types,)*> {
type Envelope = #envelope_name;
}
impl #generics #trait_name for #struct_name < #(#generic_types,)*> { }
impl #generics #actor_trait for #struct_name < #(#generic_types,)*> { }

impl #generics #struct_name < #(#generic_types,)*> {
/// Start the actor returning a handle that can be easily cloned and shared.
/// The actor stops once all handles are dropped.
pub fn spawn(size: usize, actor: impl #trait_name + ::std::marker::Send + 'static, shutdown: impl ::std::future::Future<Output=()> + ::std::marker::Send + 'static) -> (#handle_name < #(#generic_types,)*>, tokio::task::JoinHandle<()>) {
pub fn spawn(
size: usize,
actor: impl #actor_trait,
recorder: impl #recorder_trait,
shutdown: impl ::std::future::Future<Output=()> + ::std::marker::Send + 'static) -> (#handle_name < #(#generic_types,)*>, tokio::task::JoinHandle<()>,
) {
let (sender, receiver) = ceramic_actor::channel(size);
let task_handle = tokio::spawn(async move { #envelope_name::run(actor, receiver, shutdown).await });

(
#handle_name {
sender,
recorder: ::std::sync::Arc::new(recorder),
#(#phantom_values,)*
},
task_handle,
)
}
}

/// Handle for [`#struct_name`].
#[doc = concat!("Handle for [`", stringify!(#actor_trait), "`].")]
#[derive(Debug)]
pub struct #handle_name #generics {
sender: ceramic_actor::Sender<#envelope_name>,
recorder: ::std::sync::Arc<dyn #recorder_trait>,
#(#phantom_fields,)*
}
impl #generics ::core::clone::Clone for #handle_name < #(#generic_types,)*> {
fn clone(&self) -> Self {
Self{
sender:self.sender.clone(),
sender: self.sender.clone(),
recorder: self.recorder.clone(),
#(#phantom_values,)*
}
}
Expand All @@ -126,21 +138,58 @@ pub fn actor(item: TokenStream) -> TokenStream {
self.sender.clone()
}
}
impl #handle_name {
/// Notify the actor of the message. Do not wait for the response.
/// Record the messsage event using the recorder provided to the handler at spawn.
pub async fn notify<Msg>(&self, msg: Msg) -> ::std::result::Result<(), ::ceramic_actor::Error<Msg>>
where
Msg: ::ceramic_actor::Message
+ ::std::convert::TryFrom<#envelope_name>
+ ::std::fmt::Debug
+ ::std::marker::Send
+ 'static,
<Msg as ::std::convert::TryFrom<#envelope_name>>::Error: ::std::fmt::Debug,
#envelope_name:
::std::convert::From<(Msg, ::tokio::sync::oneshot::Sender<Msg::Result>)>,
dyn #recorder_trait: ::ceramic_metrics::Recorder<::ceramic_actor::MessageEvent<Msg>> + Send + 'static,
{
::ceramic_actor::ActorHandle::notify(self, msg, self.recorder.clone()).await
}
/// Send a message to the actor waiting for the response.
/// Record the messsage event using the recorder provided to the handler at spawn.
pub async fn send<Msg>(&self, msg: Msg) -> ::std::result::Result<Msg::Result, ::ceramic_actor::Error<Msg>>
where
Msg: ::ceramic_actor::Message
+ ::std::convert::TryFrom<#envelope_name>
+ ::std::fmt::Debug
+ ::std::marker::Send
+ 'static,
<Msg as ::std::convert::TryFrom<#envelope_name>>::Error: ::std::fmt::Debug,
#envelope_name:
::std::convert::From<(Msg, ::tokio::sync::oneshot::Sender<Msg::Result>)>,
dyn #recorder_trait: ::ceramic_metrics::Recorder<::ceramic_actor::MessageEvent<Msg>> + Send + 'static,
{
::ceramic_actor::ActorHandle::send(self, msg, self.recorder.clone()).await
}
}

};

TokenStream::from(expanded)
}

struct Config {
trait_name: syn::Ident,
actor_trait: syn::Ident,
recorder_trait: syn::Ident,
envelope_name: syn::Ident,
handle_name: syn::Ident,
}

impl Config {
fn from_attributes(struct_name: &syn::Ident, attrs: &[Attribute]) -> Self {
let mut trait_name = syn::Ident::new(&format!("{}Actor", struct_name), struct_name.span());
let mut actor_trait = syn::Ident::new(&format!("{}Actor", struct_name), struct_name.span());
let mut recorder_trait =
syn::Ident::new(&format!("{}Recorder", struct_name), struct_name.span());
let mut envelope_name =
syn::Ident::new(&format!("{}Envelope", struct_name), struct_name.span());
let mut handle_name =
Expand All @@ -161,7 +210,12 @@ impl Config {
} else if meta.path.is_ident("actor_trait") {
let value: Lit = meta.value()?.parse()?;
if let Lit::Str(lit_str) = value {
trait_name = syn::Ident::new(&lit_str.value(), lit_str.span())
actor_trait = syn::Ident::new(&lit_str.value(), lit_str.span())
}
} else if meta.path.is_ident("recorder_trait") {
let value: Lit = meta.value()?.parse()?;
if let Lit::Str(lit_str) = value {
recorder_trait = syn::Ident::new(&lit_str.value(), lit_str.span())
}
}
Ok(())
Expand All @@ -170,7 +224,8 @@ impl Config {
}
}
Self {
trait_name,
actor_trait,
recorder_trait,
envelope_name,
handle_name,
}
Expand Down
1 change: 1 addition & 0 deletions actor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async-trait.workspace = true
tokio.workspace = true
tracing.workspace = true
ceramic-actor-macros.workspace = true
ceramic-metrics.workspace = true
snafu.workspace = true

[dev-dependencies]
Expand Down
46 changes: 39 additions & 7 deletions actor/examples/game/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::ops::AddAssign;

use async_trait::async_trait;
use ceramic_actor::{actor_envelope, Actor, ActorHandle, Error, Handler, Message};
use ceramic_actor::{actor_envelope, Actor, Error, Handler, Message, MessageEvent};
use ceramic_metrics::Recorder;
use shutdown::Shutdown;
use tracing::{instrument, Level};

Expand All @@ -20,6 +21,7 @@ impl Game {
actor_envelope! {
GameEnvelope,
GameActor,
GameRecorder,
GetScore => GetScoreMessage,
Score => ScoreMessage,
}
Expand Down Expand Up @@ -67,7 +69,12 @@ impl Handler<GetScoreMessage> for Game {

#[derive(Actor)]
// The envelope and handle types names can be explicitly named.
#[actor(envelope = "PlayerEnv", handle = "PlayerH", actor_trait = "PlayerI")]
#[actor(
envelope = "PlayerEnv",
handle = "PlayerH",
actor_trait = "PlayerI",
recorder_trait = "PlayerR"
)]
pub struct Player {
is_home: bool,
game: GameHandle,
Expand All @@ -82,6 +89,7 @@ impl Player {
actor_envelope! {
PlayerEnv,
PlayerI,
PlayerR,
Shoot => ShootMessage,
}

Expand Down Expand Up @@ -109,18 +117,42 @@ impl Handler<ShootMessage> for Player {
}
}

#[derive(Debug)]
struct NoOpRecorder;

impl Recorder<MessageEvent<GetScoreMessage>> for NoOpRecorder {
fn record(&self, _event: &MessageEvent<GetScoreMessage>) {}
}
impl Recorder<MessageEvent<ScoreMessage>> for NoOpRecorder {
fn record(&self, _event: &MessageEvent<ScoreMessage>) {}
}
impl GameRecorder for NoOpRecorder {}

impl Recorder<MessageEvent<ShootMessage>> for NoOpRecorder {
fn record(&self, _event: &MessageEvent<ShootMessage>) {}
}
impl PlayerR for NoOpRecorder {}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.pretty()
.init();
let shutdown = Shutdown::new();
let (game, _) = Game::spawn(1_000, Game::new(), shutdown.wait_fut());
let (player_home, _) =
Player::spawn(1_000, Player::new(true, game.clone()), shutdown.wait_fut());
let (player_away, _) =
Player::spawn(1_000, Player::new(false, game.clone()), shutdown.wait_fut());
let (game, _) = Game::spawn(1_000, Game::new(), NoOpRecorder, shutdown.wait_fut());
let (player_home, _) = Player::spawn(
1_000,
Player::new(true, game.clone()),
NoOpRecorder,
shutdown.wait_fut(),
);
let (player_away, _) = Player::spawn(
1_000,
Player::new(false, game.clone()),
NoOpRecorder,
shutdown.wait_fut(),
);
player_home.notify(ShootMessage).await.unwrap();
player_away.send(ShootMessage).await.unwrap();
// Send with retry without cloning the message to be sent.
Expand Down
Loading
Loading