Skip to content

Commit

Permalink
feat & refactor: Internal & External Events
Browse files Browse the repository at this point in the history
- feat: Further enhance the external event dispatch skeleton
- feat: Add more group events
- feat: Refine the `derive_mania_event` macro (it now prefers `display` over `debug`, except for `String` and `&str`)
- refactor: Internal events (add support for transmitting extra (dummy) internal events and the `derive_dummy_event` macro)
- chore: Rename `multi_login` to `mania_multi_login`
  • Loading branch information
pk5ls20 committed Feb 9, 2025
1 parent c0fafa0 commit f8aa6a2
Show file tree
Hide file tree
Showing 50 changed files with 462 additions and 139 deletions.
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ default = []
tokio-tracing = ["console-subscriber"]

[[example]]
name = "multi_login"
name = "mania_multi_login"
path = "multi_login.rs"
74 changes: 60 additions & 14 deletions mania-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ pub fn command(attr: TokenStream, item: TokenStream) -> TokenStream {
let ident = &input_struct.ident;
let expanded = quote! {
#input_struct
impl CECommandMarker for #ident {
impl crate::core::event::CECommandMarker for #ident {
const COMMAND: &'static str = #command_value;
}
inventory::submit! {
ClientEventRegistry {
command: <#ident as CECommandMarker>::COMMAND,
parse_fn: <#ident as ClientEvent>::parse,
crate::core::event::ClientEventRegistry {
command: <#ident as crate::core::event::CECommandMarker>::COMMAND,
parse_fn: <#ident as crate::core::event::ClientEvent>::parse,
}
}
};
Expand Down Expand Up @@ -64,13 +64,13 @@ pub fn oidb_command(attr: TokenStream, item: TokenStream) -> TokenStream {
let ident = &input_struct.ident;
let expanded = quote! {
#input_struct
impl CECommandMarker for #ident {
impl crate::core::event::CECommandMarker for #ident {
const COMMAND: &'static str = #command_value;
}
inventory::submit! {
ClientEventRegistry {
command: <#ident as CECommandMarker>::COMMAND,
parse_fn: <#ident as ClientEvent>::parse,
crate::core::event::ClientEventRegistry {
command: <#ident as crate::core::event::CECommandMarker>::COMMAND,
parse_fn: <#ident as crate::core::event::ClientEvent>::parse,
}
}
};
Expand All @@ -82,7 +82,7 @@ pub fn derive_server_event(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
let struct_name = input.ident;
let expanded = quote! {
impl ServerEvent for #struct_name {
impl crate::core::event::ServerEvent for #struct_name {
fn as_any(&self) -> &dyn std::any::Any {
self
}
Expand All @@ -94,6 +94,39 @@ pub fn derive_server_event(input: TokenStream) -> TokenStream {
TokenStream::from(expanded)
}

#[proc_macro_derive(DummyEvent)]
pub fn derive_dummy_event(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
let struct_name = &input.ident;

let expanded = quote! {
impl crate::core::event::CECommandMarker for #struct_name {
const COMMAND: &'static str = "";
}

impl crate::core::event::ServerEvent for #struct_name {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
}

impl crate::core::event::ClientEvent for #struct_name {
fn build(&self, _: &crate::core::context::Context) -> crate::core::event::CEBuildResult {
unreachable!("DummyEvent should not be built");
}

fn parse(_: bytes::Bytes, _: &crate::core::context::Context) -> crate::core::event::CEParseResult {
unreachable!("DummyEvent should not be parsed");
}
}
};

TokenStream::from(expanded)
}

#[proc_macro_attribute]
pub fn handle_event(attr: TokenStream, item: TokenStream) -> TokenStream {
let input_fn = parse_macro_input!(item as ItemFn);
Expand Down Expand Up @@ -166,14 +199,14 @@ pub fn handle_event(attr: TokenStream, item: TokenStream) -> TokenStream {

#[derive(Debug)]
struct ManiaEventPreferOptions {
debug: bool,
display: bool,
}

impl Parse for ManiaEventPreferOptions {
fn parse(input: ParseStream) -> Result<Self, syn::Error> {
let ident: Ident = input.parse()?;
Ok(ManiaEventPreferOptions {
debug: ident == "debug",
display: ident == "display",
})
}
}
Expand All @@ -187,20 +220,33 @@ pub fn derive_mania_event(input: proc_macro::TokenStream) -> proc_macro::TokenSt
impl crate::event::ManiaEvent for #struct_name {}
};

let debug_impl = match &input.data {
let display_impl = match &input.data {
Data::Struct(data_struct) => match &data_struct.fields {
Fields::Named(fields_named) => {
let field_entries: Vec<String> = fields_named
.named
.iter()
.map(|field| {
let field_name = field.ident.as_ref().unwrap().to_string();
let field_type = &field.ty;
let placeholder = field
.attrs
.iter()
.find(|attr| attr.path().is_ident("prefer"))
.and_then(|attr| attr.parse_args::<ManiaEventPreferOptions>().ok())
.map_or("{}", |opts| if opts.debug { "{:?}" } else { "{}" });
.map_or_else(
|| {
if let syn::Type::Path(path) = field_type {
if let Some(segment) = path.path.segments.first() {
if segment.ident == "String" || segment.ident == "str" {
return "{}";
}
}
}
"{:?}"
},
|opts| if opts.display { "{}" } else { "{:?}" },
);
format!("{}: {}", field_name, placeholder)
})
.collect();
Expand All @@ -224,7 +270,7 @@ pub fn derive_mania_event(input: proc_macro::TokenStream) -> proc_macro::TokenSt

let expanded = quote! {
#mania_event_impl
#debug_impl
#display_impl
};
expanded.into()
}
28 changes: 16 additions & 12 deletions mania/src/core/business.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::time::Duration;
use crate::core::cache::Cache;
use crate::core::context::Context;
use crate::core::event::prelude::*;
use crate::core::event::resolve_event;
use crate::core::event::{CEParse, resolve_event};
use crate::core::packet::SsoPacket;
use crate::core::socket::{self, PacketReceiver, PacketSender};
use crate::event::{EventDispatcher, EventListener};
Expand Down Expand Up @@ -157,8 +157,6 @@ impl Business {
tokio::spawn(async move {
if let Err(e) = handle.dispatch_sso_packet(packet).await {
tracing::error!("Unhandled error occurred when handling packet: {}", e);
// FIXME: Non-required reconnections, except for serious errors
// self.reconnect().await;
}
});
}
Expand Down Expand Up @@ -204,12 +202,12 @@ impl Business {
pub struct BusinessHandle {
sender: ArcSwap<PacketSender>,
reconnecting: Mutex<()>,
pending_requests: DashMap<u32, oneshot::Sender<BusinessResult<Box<dyn ServerEvent>>>>,
pending_requests: DashMap<u32, oneshot::Sender<BusinessResult<CEParse>>>,
pub(crate) context: Arc<Context>,
pub(crate) cache: Arc<Cache>,
pub(crate) event_dispatcher: EventDispatcher,
pub event_listener: EventListener,
// TODO: (outer) event dispatcher, highway
// TODO: highway
}

impl BusinessHandle {
Expand All @@ -233,10 +231,16 @@ impl BusinessHandle {

async fn dispatch_sso_packet(self: &Arc<Self>, packet: SsoPacket) -> BusinessResult<()> {
let sequence = packet.sequence();
let result: BusinessResult<Box<dyn ServerEvent>> = async {
let mut event = resolve_event(packet, &self.context).await?;
dispatch_logic(&mut *event, self.clone(), LogicFlow::InComing).await;
Ok(event)
let result: BusinessResult<CEParse> = async {
let (mut major_event, mut extra_events) = resolve_event(packet, &self.context).await?;
let svc = self.clone();
dispatch_logic(major_event.as_mut(), svc.clone(), LogicFlow::InComing).await;
if let Some(ref mut events) = extra_events {
for event in events.iter_mut() {
dispatch_logic(event.as_mut(), svc.clone(), LogicFlow::InComing).await;
}
}
Ok((major_event, extra_events))
}
.await;
// Lagrange.Core.Internal.Context.BusinessContext.HandleIncomingEvent
Expand Down Expand Up @@ -266,7 +270,7 @@ impl BusinessHandle {
pub async fn send_event(
self: &Arc<Self>,
event: &mut (impl ClientEvent + ServerEvent),
) -> BusinessResult<Box<dyn ServerEvent>> {
) -> BusinessResult<CEParse> {
// Lagrange.Core.Internal.Context.BusinessContext.HandleOutgoingEvent
dispatch_logic(
event as &mut dyn ServerEvent,
Expand All @@ -285,10 +289,10 @@ impl BusinessHandle {
Ok(self.sender.load().send(packet).await?)
}

async fn send_packet(&self, packet: SsoPacket) -> BusinessResult<Box<dyn ServerEvent>> {
async fn send_packet(&self, packet: SsoPacket) -> BusinessResult<CEParse> {
tracing::debug!("sending packet: {:?}", packet);
let sequence = packet.sequence();
let (tx, rx) = oneshot::channel::<BusinessResult<Box<dyn ServerEvent>>>();
let (tx, rx) = oneshot::channel::<BusinessResult<CEParse>>();
self.pending_requests.insert(sequence, tx);
self.post_packet(packet).await?;
rx.await.expect("response not received")
Expand Down
13 changes: 9 additions & 4 deletions mania/src/core/business/messaging_logic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::core::business::LogicRegistry;
use crate::core::business::{BusinessHandle, LogicFlow};
use crate::core::event::message::push_msg::PushMessageEvent;
use crate::core::event::notify::group_sys_request_join::GroupSysRequestJoinEvent;
use crate::core::event::prelude::*;
use crate::event::group::{GroupEvent, group_message};
use crate::message::chain::{MessageChain, MessageType};
Expand All @@ -9,7 +10,7 @@ use crate::message::entity::file::FileUnique;
use mania_macros::handle_event;
use std::sync::Arc;

#[handle_event(PushMessageEvent)]
#[handle_event(PushMessageEvent, GroupSysRequestJoinEvent)]
async fn messaging_logic(
event: &mut dyn ServerEvent,
handle: Arc<BusinessHandle>,
Expand All @@ -22,7 +23,6 @@ async fn messaging_logic(
}
}

#[allow(clippy::single_match)] // TODO: remove when finally implemented
async fn messaging_logic_incoming(
event: &mut dyn ServerEvent,
handle: Arc<BusinessHandle>,
Expand All @@ -39,7 +39,7 @@ async fn messaging_logic_incoming(
handle
.event_dispatcher
.group
.send(Some(GroupEvent::GroupMessageEvent(
.send(Some(GroupEvent::GroupMessage(
group_message::GroupMessageEvent { chain },
)))
.expect("Failed to send group event");
Expand All @@ -51,6 +51,12 @@ async fn messaging_logic_incoming(
tracing::warn!("Empty message chain in PushMessageEvent");
}
}
_ if let Some(event) = event
.as_any_mut()
.downcast_mut::<GroupSysRequestJoinEvent>() =>
{
tracing::debug!("Handling GroupSysRequestJoinEvent: {:?}", event); // TODO: dispatch
}
_ => {}
}
event
Expand Down Expand Up @@ -189,7 +195,6 @@ async fn resolve_incoming_chain(chain: &mut MessageChain, handle: Arc<BusinessHa
let node = video.node.clone();
let download_result = match &chain.typ {
MessageType::Group(grp) => {
// TODO: old impl (0x11e9_200?)
let uid = handle
.resolve_uid(Some(grp.group_uin), chain.friend_uin)
.await;
Expand Down
48 changes: 37 additions & 11 deletions mania/src/core/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,42 @@ pub trait CECommandMarker: Send + Sync {
}
}

pub type CEBuildResult = Result<BinaryPacket, EventError>;
pub type CEParse = (Box<dyn ServerEvent>, Option<Vec<Box<dyn ServerEvent>>>);
pub type CEParseResult = Result<CEParse, EventError>;
pub type ParseEventFn = fn(Bytes, &Context) -> CEParseResult;

pub trait ClientEvent: CECommandMarker {
fn packet_type(&self) -> PacketType {
PacketType::T12 // most common packet type
}
fn build(&self, _: &Context) -> Result<BinaryPacket, EventError>;
fn parse(packet: Bytes, context: &Context) -> Result<Box<dyn ServerEvent>, EventError>;
fn build(&self, _: &Context) -> CEBuildResult;
fn parse(packet: Bytes, context: &Context) -> CEParseResult;
}

type ParseEvent = fn(Bytes, &Context) -> Result<Box<dyn ServerEvent>, EventError>;
pub struct ClientResult;

impl ClientResult {
pub fn single(event: Box<dyn ServerEvent>) -> CEParse {
(event, None)
}

pub fn with_extra(
event: Box<dyn ServerEvent>,
extra: Option<Vec<Box<dyn ServerEvent>>>,
) -> CEParse {
(event, extra)
}
}

pub struct ClientEventRegistry {
pub command: &'static str,
pub parse_fn: ParseEvent,
pub parse_fn: ParseEventFn,
}

inventory::collect!(ClientEventRegistry);

type EventMap = HashMap<&'static str, ParseEvent>;
type EventMap = HashMap<&'static str, ParseEventFn>;
static EVENT_MAP: Lazy<EventMap> = Lazy::new(|| {
let mut map = HashMap::new();
for item in inventory::iter::<ClientEventRegistry> {
Expand All @@ -52,10 +70,7 @@ static EVENT_MAP: Lazy<EventMap> = Lazy::new(|| {
map
});

pub async fn resolve_event(
packet: SsoPacket,
context: &Arc<Context>,
) -> Result<Box<dyn ServerEvent>, EventError> {
pub async fn resolve_event(packet: SsoPacket, context: &Arc<Context>) -> CEParseResult {
// Lagrange.Core.Internal.Context.ServiceContext.ResolveEventByPacket
let payload = PacketReader::new(packet.payload()).section(|p| p.bytes());
tracing::debug!(
Expand All @@ -74,10 +89,20 @@ pub fn downcast_event<T: ServerEvent + 'static>(event: &impl AsRef<dyn ServerEve
event.as_ref().as_any().downcast_ref::<T>()
}

pub fn downcast_major_event<T: ServerEvent + 'static>(event: &CEParse) -> Option<&T> {
let (se, _) = event;
se.as_any().downcast_ref::<T>()
}

pub fn downcast_mut_event<T: ServerEvent + 'static>(event: &mut dyn ServerEvent) -> Option<&mut T> {
event.as_any_mut().downcast_mut::<T>()
}

pub fn downcast_mut_major_event<T: ServerEvent + 'static>(event: &mut CEParse) -> Option<&mut T> {
let (se, _) = event;
se.as_any_mut().downcast_mut::<T>()
}

#[derive(Debug, Error)]
pub enum EventError {
#[error("unsupported event, commend: {0}")]
Expand Down Expand Up @@ -105,7 +130,8 @@ pub enum EventError {
pub(crate) mod prelude {
pub use crate::core::context::Context;
pub use crate::core::event::{
CECommandMarker, ClientEvent, ClientEventRegistry, EventError, ServerEvent,
CEBuildResult, CECommandMarker, CEParseResult, ClientEvent, ClientResult, EventError,
ServerEvent,
};
pub use crate::core::packet::{
BinaryPacket, OidbPacket, PREFIX_LENGTH_ONLY, PREFIX_U8, PREFIX_U16, PREFIX_WITH,
Expand All @@ -114,7 +140,7 @@ pub(crate) mod prelude {
pub use crate::dda;
pub use bytes::Bytes;
pub use inventory;
pub use mania_macros::{ServerEvent, command, oidb_command};
pub use mania_macros::{DummyEvent, ServerEvent, command, oidb_command};
pub use num_enum::TryFromPrimitive;
pub use prost::Message;
pub use std::convert::TryFrom;
Expand Down
Loading

0 comments on commit f8aa6a2

Please sign in to comment.