Skip to content

Commit

Permalink
feat: initial implementation for mania's external event dispatch mech…
Browse files Browse the repository at this point in the history
…anism
  • Loading branch information
pk5ls20 committed Feb 8, 2025
1 parent 0b7274a commit d28f0b8
Show file tree
Hide file tree
Showing 12 changed files with 382 additions and 233 deletions.
3 changes: 2 additions & 1 deletion README.MD
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<div align="center">

![Mania](https://socialify.git.ci/LagrangeDev/mania/image?description=1&descriptionEditable=An%20Implementation%20of%20NTQQ%20Protocol,%20with%20Pure%20Rust%F0%9F%A6%80,%20Derived%20from%20Lagrange.Core&font=Jost&forks=1&issues=1&logo=https%3A%2F%2Fstatic.live.moe%2Flagrange.jpg&name=1&pattern=Diagonal%20Stripes&pulls=1&stargazers=1&theme=Auto)
[![GitHub Workflow Status (with event)](https://img.shields.io/github/actions/workflow/status/LagrangeDev/mania/check.yml?logo=github)](https://github.com/LagrangeDev/mania/actions)
![nightly](https://img.shields.io/badge/toolchain-nightly-important)
![wip](https://img.shields.io/badge/develop-wip-blue)

Expand All @@ -24,7 +25,7 @@
| Windows | 🔴 | QrCode | 🟢 | BounceFace | 🔴 | Poke | 🔴 | Captcha | 🔴 |
| macOS | 🔴 | ~~Password~~ | 🔴 | Face | 🟡 [^1] | Recall | 🔴 | BotOnline | 🔴 |
| Linux | 🟢 | EasyLogin | 🟡 | File | 🟡[^1] | Leave Group | 🔴 | BotOffline | 🔴 |
| | | UnusualDevice<br/>Password | 🔴 | Forward | 🟡[^1] | Set Special Title | 🔴 | Message | 🔴 |
| | | UnusualDevice<br/>Password | 🔴 | Forward | 🟡[^1] | Set Special Title | 🔴 | Message | 🟡 |
| | | UnusualDevice<br/>Easy | 🔴 | ~~GreyTip~~ | 🔴 | Kick Member | 🔴 | Poke | 🔴 |
| | | ~~NewDeviceVerify~~ | 🔴 | GroupReaction | 🔴 | Mute Member | 🔴 | MessageRecall | 🔴 |
| | | | | Image | 🟡[^1] | Set Admin | 🔴 | GroupMemberDecrease | 🔴 |
Expand Down
23 changes: 22 additions & 1 deletion examples/multi_login.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ async fn main() {
let need_login = key_store.is_expired();
let mut client = Client::new(config, device, key_store).await.unwrap();
let operator = client.handle().operator();
let mut event_listener = operator.event_listener.clone();
tokio::spawn(async move {
client.spawn().await;
});
Expand Down Expand Up @@ -72,10 +73,30 @@ async fn main() {
panic!("Failed to set online status: {:?}", e);
}
};
tracing::info!("Login successfully!");
operator
.update_key_store()
.save("keystore.json")
.unwrap_or_else(|e| tracing::error!("Failed to save key store: {:?}", e));
tokio::spawn(async move {
loop {
tokio::select! {
_ = event_listener.bot.changed() => {
if let Some(ref be) = *event_listener.bot.borrow() {
tracing::info!("[BotEvent] {:?}", be);
}
}
_ = event_listener.friend.changed() => {
if let Some(ref fe) = *event_listener.friend.borrow() {
tracing::info!("[FriendEvent] {:?}", fe);
}
}
_ = event_listener.group.changed() => {
if let Some(ref ge) = *event_listener.group.borrow() {
tracing::info!("[GroupEvent] {:?}", ge);
}
}
}
}
});
tokio::signal::ctrl_c().await.unwrap();
}
11 changes: 11 additions & 0 deletions mania-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,14 @@ pub fn handle_event(attr: TokenStream, item: TokenStream) -> TokenStream {
};
expanded.into()
}

// TODO: auto parse & auto impl debug
#[proc_macro_derive(ManiaEvent)]
pub fn mania_event_derive(input: TokenStream) -> TokenStream {
let ast = parse_macro_input!(input as DeriveInput);
let struct_name = &ast.ident;
let stream = quote! {
impl crate::event::ManiaEvent for #struct_name {}
};
stream.into()
}
18 changes: 16 additions & 2 deletions mania/src/core/business.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::core::event::prelude::*;
use crate::core::event::resolve_event;
use crate::core::packet::SsoPacket;
use crate::core::socket::{self, PacketReceiver, PacketSender};
use crate::event::{EventDispatcher, EventListener};
use arc_swap::ArcSwap;
use dashmap::DashMap;
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -111,12 +112,16 @@ pub struct Business {
impl Business {
pub async fn new(addr: SocketAddr, context: Arc<Context>) -> BusinessResult<Self> {
let (sender, receiver) = socket::connect(addr).await?;
let event_dispatcher = EventDispatcher::new();
let event_listener = EventListener::new(&event_dispatcher);
let handle = Arc::new(BusinessHandle {
sender: ArcSwap::new(Arc::new(sender)),
reconnecting: Mutex::new(()),
pending_requests: DashMap::new(),
context,
cache: Arc::new(Cache::new()),
event_dispatcher,
event_listener,
});

Ok(Self {
Expand Down Expand Up @@ -202,6 +207,8 @@ pub struct BusinessHandle {
pending_requests: DashMap<u32, oneshot::Sender<BusinessResult<Box<dyn ServerEvent>>>>,
pub(crate) context: Arc<Context>,
pub(crate) cache: Arc<Cache>,
pub(crate) event_dispatcher: EventDispatcher,
pub event_listener: EventListener,
// TODO: (outer) event dispatcher, highway
}

Expand Down Expand Up @@ -236,8 +243,15 @@ impl BusinessHandle {
// TODO: timeout auto remove
if let Some((_, tx)) = self.pending_requests.remove(&sequence) {
tx.send(result).expect("receiver dropped");
} else {
tracing::warn!("unhandled packet: {:?}", result);
} else if let Err(e) = &result {
match e {
BusinessError::InternalEventError(inner_err @ EventError::UnsupportedEvent(_)) => {
tracing::warn!("{}", inner_err);
}
_ => {
tracing::error!("Unhandled error occurred: {}", e);
}
}
}
Ok(())
}
Expand Down
Loading

0 comments on commit d28f0b8

Please sign in to comment.