From 9b5bb8d8af491f89502166d6d6c26207f918e53a Mon Sep 17 00:00:00 2001 From: pk5ls20 Date: Thu, 20 Feb 2025 10:02:35 +0800 Subject: [PATCH] feat: video upload --- mania/src/core/crypto.rs | 2 +- mania/src/core/event/message/mod.rs | 2 + .../core/event/message/video_c2c_upload.rs | 148 ++++++++ .../core/event/message/video_group_upload.rs | 147 ++++++++ mania/src/core/highway.rs | 2 + mania/src/core/highway/hw_frame_codec.rs | 5 + mania/src/core/operation/highway_op.rs | 356 +++++++++++++++++- mania/src/message/builder.rs | 55 +++ mania/src/message/entity.rs | 17 + mania/src/message/entity/record.rs | 21 +- mania/src/message/entity/video.rs | 64 +++- mania/src/utility/stream_helper.rs | 4 +- 12 files changed, 797 insertions(+), 26 deletions(-) create mode 100644 mania/src/core/event/message/video_c2c_upload.rs create mode 100644 mania/src/core/event/message/video_group_upload.rs diff --git a/mania/src/core/crypto.rs b/mania/src/core/crypto.rs index 8399bc3..3c9d080 100644 --- a/mania/src/core/crypto.rs +++ b/mania/src/core/crypto.rs @@ -2,7 +2,7 @@ use md5::{Digest, Md5}; use p256::{EncodedPoint, PublicKey, ecdh::EphemeralSecret}; pub mod consts; -mod stream_sha1; +pub mod stream_sha1; pub mod tea; /// The original macro that @wybxc originally wrote (b81f75b7) was perfect. diff --git a/mania/src/core/event/message/mod.rs b/mania/src/core/event/message/mod.rs index d856ce8..f47d4f7 100644 --- a/mania/src/core/event/message/mod.rs +++ b/mania/src/core/event/message/mod.rs @@ -10,4 +10,6 @@ pub mod record_c2c_download; pub mod record_group_download; pub mod send_message; pub mod video_c2c_download; +pub mod video_c2c_upload; pub mod video_group_download; +pub mod video_group_upload; diff --git a/mania/src/core/event/message/video_c2c_upload.rs b/mania/src/core/event/message/video_c2c_upload.rs new file mode 100644 index 0000000..633f3d5 --- /dev/null +++ b/mania/src/core/event/message/video_c2c_upload.rs @@ -0,0 +1,148 @@ +use crate::core::event::prelude::*; +use crate::core::protos::message::VideoFile; +use crate::core::protos::service::oidb::{ + C2cUserInfo, ClientMeta, CommonHead, ExtBizInfo, FileInfo, FileType, IPv4, MsgInfo, + MultiMediaReqHead, Ntv2RichMediaReq, Ntv2RichMediaResp, PicExtBizInfo, PttExtBizInfo, + SceneInfo, SubFileInfo, UploadInfo, UploadReq, VideoExtBizInfo, +}; +use crate::utility::random_gen::RandomGenerator; + +#[derive(Debug, Default)] +pub struct VideoC2CUploadArgs { + pub uid: String, + pub video_size: u32, + pub video_md5: Bytes, + pub video_sha1: Bytes, + pub video_name: String, + pub thumb_size: u32, + pub thumb_md5: Bytes, + pub thumb_sha1: Bytes, + pub thumb_name: String, + pub thumb_width: u32, + pub thumb_height: u32, + pub summary: String, +} + +#[derive(Debug, Default)] +pub struct VideoC2CUploadRes { + pub msg_info: MsgInfo, + pub video_file: VideoFile, + pub u_key: Option, + pub ipv4s: Vec, + pub sub_file_info: Vec, +} + +#[oidb_command(0x11e9, 100)] +#[derive(Debug, ServerEvent, Default)] +pub struct VideoC2CUploadEvent { + pub req: VideoC2CUploadArgs, + pub res: VideoC2CUploadRes, +} + +impl ClientEvent for VideoC2CUploadEvent { + fn build(&self, _: &Context) -> CEBuildResult { + let req = dda!(Ntv2RichMediaReq { + req_head: Some(MultiMediaReqHead { + common: Some(CommonHead { + request_id: 3, + command: 100, + }), + scene: Some(dda!(SceneInfo { + request_type: 2, + business_type: 2, + scene_type: 1, + c2c: Some(C2cUserInfo { + account_type: 2, + target_uid: self.req.uid.to_owned(), + }) + })), + client: Some(ClientMeta { agent_type: 2 }), + }), + upload: Some(UploadReq { + upload_info: vec![ + UploadInfo { + file_info: Some(FileInfo { + file_size: self.req.video_size, + file_hash: hex::encode(self.req.video_md5.clone()), + file_sha1: hex::encode(self.req.video_sha1.clone()), + file_name: self.req.video_name.to_owned(), + r#type: Some(FileType { + r#type: 2, + pic_format: 0, + video_format: 0, + voice_format: 0, + }), + width: 0, + height: 0, + time: 0, + original: 0, + }), + sub_file_type: 0, + }, + UploadInfo { + file_info: Some(FileInfo { + file_size: self.req.thumb_size, + file_hash: hex::encode(self.req.thumb_md5.clone()), + file_sha1: hex::encode(self.req.thumb_sha1.clone()), + file_name: self.req.thumb_name.to_owned(), + r#type: Some(FileType { + r#type: 1, + pic_format: 0, + video_format: 0, + voice_format: 0, + }), + width: self.req.thumb_width, + height: self.req.thumb_height, + time: 0, + original: 0, + }), + sub_file_type: 100, + } + ], + try_fast_upload_completed: true, + srv_send_msg: false, + client_random_id: RandomGenerator::rand_u64(), + compat_q_msg_scene_type: 2, + ext_biz_info: Some(dda!(ExtBizInfo { + pic: Some(dda!(PicExtBizInfo { + biz_type: 0, + text_summary: self.req.summary.to_owned(), + })), + video: Some(dda!(VideoExtBizInfo { + bytes_pb_reserve: vec![0x80, 0x01, 0x00], + })), + ptt: Some(dda!(PttExtBizInfo { + bytes_reserve: vec![], + bytes_pb_reserve: vec![], + bytes_general_flags: vec![], + })), + })), + client_seq: 0, + no_need_compat_msg: false, + }), + }); + Ok(OidbPacket::new(0x11e9, 100, req.encode_to_vec(), false, true).to_binary()) + } + + fn parse(packet: Bytes, _: &Context) -> CEParseResult { + let resp = OidbPacket::parse_into::(packet)?; + let upload = resp + .upload + .ok_or_else(|| EventError::OtherError("Missing UploadResp".to_string()))?; + let msg_info = upload + .msg_info + .ok_or_else(|| EventError::OtherError("Missing MsgInfo".to_string()))?; + let sub_file_info = upload.sub_file_infos; + let video_file = VideoFile::decode(Bytes::from(upload.compat_q_msg))?; + let ipv4s = upload.i_pv4s; + Ok(ClientResult::single(Box::new(dda!(Self { + res: VideoC2CUploadRes { + msg_info, + video_file, + ipv4s, + u_key: upload.u_key, + sub_file_info + } + })))) + } +} diff --git a/mania/src/core/event/message/video_group_upload.rs b/mania/src/core/event/message/video_group_upload.rs new file mode 100644 index 0000000..14f4809 --- /dev/null +++ b/mania/src/core/event/message/video_group_upload.rs @@ -0,0 +1,147 @@ +use crate::core::event::prelude::*; +use crate::core::protos::message::VideoFile; +use crate::core::protos::service::oidb::{ + ClientMeta, CommonHead, ExtBizInfo, FileInfo, FileType, IPv4, MsgInfo, MultiMediaReqHead, + NtGroupInfo, Ntv2RichMediaReq, Ntv2RichMediaResp, PicExtBizInfo, PttExtBizInfo, SceneInfo, + SubFileInfo, UploadInfo, UploadReq, VideoExtBizInfo, +}; +use crate::utility::random_gen::RandomGenerator; + +#[derive(Debug, Default)] +pub struct VideoGroupUploadArgs { + pub group_uin: u32, + pub video_size: u32, + pub video_md5: Bytes, + pub video_sha1: Bytes, + pub video_name: String, + pub thumb_size: u32, + pub thumb_md5: Bytes, + pub thumb_sha1: Bytes, + pub thumb_name: String, + pub thumb_width: u32, + pub thumb_height: u32, + pub summary: String, +} + +#[derive(Debug, Default)] +pub struct VideoGroupUploadRes { + pub msg_info: MsgInfo, + pub video_file: VideoFile, + pub u_key: Option, + pub ipv4s: Vec, + pub sub_file_info: Vec, +} + +#[oidb_command(0x11ea, 100)] +#[derive(Debug, ServerEvent, Default)] +pub struct VideoGroupUploadEvent { + pub req: VideoGroupUploadArgs, + pub res: VideoGroupUploadRes, +} + +impl ClientEvent for VideoGroupUploadEvent { + fn build(&self, _: &Context) -> CEBuildResult { + let req = dda!(Ntv2RichMediaReq { + req_head: Some(MultiMediaReqHead { + common: Some(CommonHead { + request_id: 3, + command: 100, + }), + scene: Some(dda!(SceneInfo { + request_type: 2, + business_type: 2, + scene_type: 2, + group: Some(NtGroupInfo { + group_uin: self.req.group_uin, + }), + })), + client: Some(ClientMeta { agent_type: 2 }), + }), + upload: Some(UploadReq { + upload_info: vec![ + UploadInfo { + file_info: Some(FileInfo { + file_size: self.req.video_size, + file_hash: hex::encode(self.req.video_md5.clone()), + file_sha1: hex::encode(self.req.video_sha1.clone()), + file_name: self.req.video_name.to_owned(), + r#type: Some(FileType { + r#type: 2, + pic_format: 0, + video_format: 0, + voice_format: 0, + }), + width: 0, + height: 0, + time: 0, + original: 0, + }), + sub_file_type: 0, + }, + UploadInfo { + file_info: Some(FileInfo { + file_size: self.req.thumb_size, + file_hash: hex::encode(self.req.thumb_md5.clone()), + file_sha1: hex::encode(self.req.thumb_sha1.clone()), + file_name: self.req.thumb_name.to_owned(), + r#type: Some(FileType { + r#type: 1, + pic_format: 0, + video_format: 0, + voice_format: 0, + }), + width: self.req.thumb_width, + height: self.req.thumb_height, + time: 0, + original: 0, + }), + sub_file_type: 100, + } + ], + try_fast_upload_completed: true, + srv_send_msg: false, + client_random_id: RandomGenerator::rand_u64(), + compat_q_msg_scene_type: 2, + ext_biz_info: Some(dda!(ExtBizInfo { + pic: Some(dda!(PicExtBizInfo { + biz_type: 0, + text_summary: self.req.summary.to_owned(), + })), + video: Some(dda!(VideoExtBizInfo { + bytes_pb_reserve: vec![0x80, 0x01, 0x00], + })), + ptt: Some(dda!(PttExtBizInfo { + bytes_reserve: vec![], + bytes_pb_reserve: vec![], + bytes_general_flags: vec![], + })), + })), + client_seq: 0, + no_need_compat_msg: false, + }), + }); + Ok(OidbPacket::new(0x11ea, 100, req.encode_to_vec(), false, true).to_binary()) + } + + fn parse(packet: Bytes, _: &Context) -> CEParseResult { + let resp = OidbPacket::parse_into::(packet)?; + let upload = resp + .upload + .ok_or_else(|| EventError::OtherError("Missing UploadResp".to_string()))?; + let msg_info = upload + .msg_info + .ok_or_else(|| EventError::OtherError("Missing MsgInfo".to_string()))?; + let sub_file_info = upload.sub_file_infos; + let video_file = VideoFile::decode(Bytes::from(upload.compat_q_msg))?; + let ipv4s = upload.i_pv4s; + Ok(ClientResult::single(Box::new(dda!(Self { + res: VideoGroupUploadRes { + msg_info, + video_file, + ipv4s, + u_key: upload.u_key, + sub_file_info + } + })))) + } +} diff --git a/mania/src/core/highway.rs b/mania/src/core/highway.rs index 10d3855..b1949b8 100644 --- a/mania/src/core/highway.rs +++ b/mania/src/core/highway.rs @@ -46,6 +46,8 @@ pub enum HighwayError { IoError(#[from] io::Error), #[error("Upload error! code={0}")] UploadError(u32), + #[error("Hex decode error!")] + HexError(#[from] hex::FromHexError), #[error("An error occurred in highway: {0}")] OtherError(Cow<'static, str>), } diff --git a/mania/src/core/highway/hw_frame_codec.rs b/mania/src/core/highway/hw_frame_codec.rs index 0461133..7b0284e 100644 --- a/mania/src/core/highway/hw_frame_codec.rs +++ b/mania/src/core/highway/hw_frame_codec.rs @@ -39,11 +39,16 @@ impl Decoder for HighwayFrameCodec { fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if src.len() < 1 + 4 + 4 + 1 { + tracing::trace!("Not enough data for frame (stage 1)!"); return Ok(None); } let start = src.get_u8(); let head_length = src.get_u32() as usize; let body_length = src.get_u32() as usize; + if src.len() < head_length + body_length + 1 { + tracing::trace!("Not enough data for frame (stage 2)!"); + return Ok(None); + } let head = src.split_to(head_length); let body = src.split_to(body_length); let end = src.get_u8(); diff --git a/mania/src/core/operation/highway_op.rs b/mania/src/core/operation/highway_op.rs index d7558ce..3d8d312 100644 --- a/mania/src/core/operation/highway_op.rs +++ b/mania/src/core/operation/highway_op.rs @@ -1,16 +1,22 @@ use crate::core::business::BusinessHandle; +use crate::core::crypto::stream_sha1::StreamSha1; use crate::core::event::downcast_major_event; use crate::core::event::message::image_c2c_upload::{ImageC2CUploadArgs, ImageC2CUploadEvent}; use crate::core::event::message::image_group_upload::{ ImageGroupUploadArgs, ImageGroupUploadEvent, }; +use crate::core::event::message::video_c2c_upload::{VideoC2CUploadArgs, VideoC2CUploadEvent}; +use crate::core::event::message::video_group_upload::{ + VideoGroupUploadArgs, VideoGroupUploadEvent, +}; use crate::core::event::system::fetch_highway_ticket::FetchHighwayTicketEvent; use crate::core::highway::hw_client::HighwayClient; -use crate::core::highway::{AsyncStream, oidb_ipv4s_to_highway_ipv4s}; +use crate::core::highway::{AsyncStream, HighwayError, oidb_ipv4s_to_highway_ipv4s}; use crate::core::protos::service::highway::{ NtHighwayHash, NtHighwayNetwork, Ntv2RichMediaHighwayExt, }; use crate::message::entity::image::ImageEntity; +use crate::message::entity::video::VideoEntity; use crate::utility::image_resolver::{ImageFormat, resolve_image_metadata}; use crate::utility::stream_helper::{mut_stream_ctx, stream_pipeline}; use crate::{ManiaError, ManiaResult, dda}; @@ -41,7 +47,7 @@ impl BusinessHandle { None => self.fetch_sig_session().await?, }; self.highway.client.store(Arc::new(HighwayClient::new( - "htdata3.qq.com:80", + "htdata3.qq.com:80", // TODO: Configurable & dynamic 60, sig, **self.context.key_store.uin.load(), @@ -130,12 +136,8 @@ impl BusinessHandle { .info .as_ref() .ok_or(ManiaError::GenericError(Cow::from("No info in response")))?; - let sha1 = hex::decode(&info.file_sha1).map_err(|e| { - ManiaError::GenericError(Cow::from(format!("Hex decode error: {:?}", e))) - })?; - let md5 = hex::decode(&info.file_hash).map_err(|e| { - ManiaError::GenericError(Cow::from(format!("Hex decode error: {:?}", e))) - })?; + let sha1 = hex::decode(&info.file_sha1).map_err(HighwayError::HexError)?; + let md5 = hex::decode(&info.file_hash).map_err(HighwayError::HexError)?; let extend = Ntv2RichMediaHighwayExt { file_uuid: index_node.file_uuid.to_owned(), u_key: res.res.u_key.to_owned().unwrap(), @@ -222,12 +224,8 @@ impl BusinessHandle { .info .as_ref() .ok_or(ManiaError::GenericError(Cow::from("No info in response")))?; - let sha1 = hex::decode(&info.file_sha1).map_err(|e| { - ManiaError::GenericError(Cow::from(format!("Hex decode error: {:?}", e))) - })?; - let md5 = hex::decode(&info.file_hash).map_err(|e| { - ManiaError::GenericError(Cow::from(format!("Hex decode error: {:?}", e))) - })?; + let sha1 = hex::decode(&info.file_sha1).map_err(HighwayError::HexError)?; + let md5 = hex::decode(&info.file_hash).map_err(HighwayError::HexError)?; let extend = Ntv2RichMediaHighwayExt { file_uuid: index_node.file_uuid.to_owned(), u_key: res.res.u_key.to_owned().unwrap(), @@ -261,4 +259,334 @@ impl BusinessHandle { image.not_online_image = res.res.not_online_image.to_owned(); Ok(()) } + + async fn resolve_video( + self: &Arc, + video_stream_ctx: AsyncStream, + video_thumb_stream_ctx: AsyncStream, + ) -> ManiaResult<(Bytes, Bytes, Vec>, Bytes, Bytes)> { + let (file_md5, file_sha1, file_stream_sha1) = mut_stream_ctx(&video_stream_ctx, |s| { + Box::pin(async move { + let mut md5_hasher = Md5::new(); + let mut sha1_hasher = Sha1::new(); + let mut stream_sha1_hasher = StreamSha1::new(); + stream_pipeline(s, |chunk| { + md5_hasher.update(chunk); + sha1_hasher.update(chunk); + stream_sha1_hasher.update(chunk); + }) + .await?; + let md5 = Bytes::from(md5_hasher.finalize().to_vec()); + let sha1 = Bytes::from(sha1_hasher.finalize().to_vec()); + let stream_sha1 = stream_sha1_hasher.finalize(); + let stream_sha1 = stream_sha1.into_iter().map(|arr| arr.to_vec()).collect(); + Ok::<(Bytes, Bytes, Vec>), ManiaError>((md5, sha1, stream_sha1)) + }) + }) + .await?; + let (thumb_md5, thumb_sha1) = mut_stream_ctx(&video_thumb_stream_ctx, |s| { + Box::pin(async move { + let mut md5_hasher = Md5::new(); + let mut sha1_hasher = Sha1::new(); + stream_pipeline(s, |chunk| { + md5_hasher.update(chunk); + sha1_hasher.update(chunk); + }) + .await?; + let md5 = Bytes::from(md5_hasher.finalize().to_vec()); + let sha1 = Bytes::from(sha1_hasher.finalize().to_vec()); + Ok::<(Bytes, Bytes), ManiaError>((md5, sha1)) + }) + }) + .await?; + Ok((file_md5, file_sha1, file_stream_sha1, thumb_md5, thumb_sha1)) + } + + pub async fn upload_group_video( + self: &Arc, + group_uin: u32, + video: &mut VideoEntity, + ) -> ManiaResult<()> { + self.prepare_highway().await?; + let (vs, is) = video.resolve_stream().await; + let vs = vs.ok_or(ManiaError::GenericError(Cow::from("No video stream found")))?; + let is = is.ok_or(ManiaError::GenericError(Cow::from("No image stream found")))?; + let (file_md5, file_sha1, file_stream_sha1, thumb_md5, thumb_sha1) = + self.resolve_video(vs.clone(), is.clone()).await?; + let mut req = dda!(VideoGroupUploadEvent { + req: VideoGroupUploadArgs { + group_uin, + video_size: video.video_size as u32, + video_name: video + .video_path + .clone() + .unwrap_or_else(|| { format!("{}.mp4", hex::encode(&file_sha1)) }), + video_md5: file_md5, + video_sha1: file_sha1, + thumb_size: video.video_thumb_size as u32, + thumb_name: video + .video_thumb_path + .clone() + .unwrap_or_else(|| { format!("{}.jpg", hex::encode(&thumb_sha1)) }), + thumb_md5, + thumb_sha1, + thumb_width: video.video_thumb_width as u32, + thumb_height: video.video_thumb_height as u32, + summary: "[视频]".to_string(), + } + }); + let res = self.send_event(&mut req).await?; + let res: &VideoGroupUploadEvent = + downcast_major_event(&res).ok_or(ManiaError::InternalEventDowncastError)?; + let chunk_size = self.context.config.highway_chuck_size; + if res.res.u_key.as_ref().is_some() { + tracing::debug!( + "uploadGroupVideoReq (Video) get upload u_key: {}, need upload!", + res.res.u_key.as_ref().unwrap() + ); + let size = video.video_size as u32; + let msg_info_body = res.res.msg_info.msg_info_body.clone(); + let index_node = msg_info_body + .first() + .ok_or(ManiaError::GenericError(Cow::from( + "No index node in response", + )))? + .index + .as_ref() + .ok_or(ManiaError::GenericError(Cow::from("No index in response")))?; + let info = index_node + .info + .as_ref() + .ok_or(ManiaError::GenericError(Cow::from("No info in response")))?; + let md5 = hex::decode(&info.file_hash).map_err(HighwayError::HexError)?; + let extend = Ntv2RichMediaHighwayExt { + file_uuid: index_node.file_uuid.to_owned(), + u_key: res.res.u_key.to_owned().unwrap(), + network: Some(NtHighwayNetwork { + i_pv4s: oidb_ipv4s_to_highway_ipv4s(&res.res.ipv4s), + }), + msg_info_body: msg_info_body.to_owned(), + block_size: chunk_size as u32, + hash: Some({ + NtHighwayHash { + file_sha1: file_stream_sha1, + } + }), + } + .encode_to_vec(); + let client = self.highway.client.load(); + mut_stream_ctx(&vs, |s| { + Box::pin(async move { + client + .upload(1005, s, size, Bytes::from(md5), Bytes::from(extend)) + .await?; + Ok::<(), ManiaError>(()) + }) + }) + .await?; + } else { + tracing::debug!("No u_key in upload_group_video (Video) response, skip upload!"); + } + if let Some(sub_file) = res.res.sub_file_info.first() + && !sub_file.u_key.is_empty() + { + tracing::debug!( + "uploadGroupVideoReq (Thumb) get upload u_key: {}, need upload!", + sub_file.u_key + ); + let msg_info_body = res.res.msg_info.msg_info_body.to_owned(); + let index = res + .res + .msg_info + .msg_info_body + .get(1) + .ok_or(ManiaError::GenericError(Cow::from( + "No index node in response", + )))? + .index + .as_ref() + .ok_or(ManiaError::GenericError(Cow::from("No index in response")))?; + let info = index + .info + .as_ref() + .ok_or(ManiaError::GenericError(Cow::from("No info in response")))?; + let md5 = hex::decode(&info.file_hash).map_err(HighwayError::HexError)?; + let sha1 = hex::decode(&info.file_sha1).map_err(HighwayError::HexError)?; + let size = video.video_thumb_size as u32; + let extend = Ntv2RichMediaHighwayExt { + file_uuid: index.file_uuid.to_owned(), + u_key: sub_file.u_key.to_owned(), + network: Some(NtHighwayNetwork { + i_pv4s: oidb_ipv4s_to_highway_ipv4s(&res.res.ipv4s), + }), + msg_info_body, + block_size: chunk_size as u32, + hash: Some({ + NtHighwayHash { + file_sha1: vec![sha1], + } + }), + } + .encode_to_vec(); + let client = self.highway.client.load(); + mut_stream_ctx(&is, |s| { + Box::pin(async move { + client + .upload(1006, s, size, Bytes::from(md5), Bytes::from(extend)) + .await?; + Ok::<(), ManiaError>(()) + }) + }) + .await?; + } else { + tracing::debug!("No u_key in upload_group_video (Thumb) response, skip upload!"); + } + video.msg_info = Some(res.res.msg_info.to_owned()); + video.compat = Some(res.res.video_file.to_owned()); + Ok(()) + } + + pub async fn upload_c2c_video( + self: &Arc, + target_uid: &str, + video: &mut VideoEntity, + ) -> ManiaResult<()> { + self.prepare_highway().await?; + let (vs, is) = video.resolve_stream().await; + let vs = vs.ok_or(ManiaError::GenericError(Cow::from("No video stream found")))?; + let is = is.ok_or(ManiaError::GenericError(Cow::from("No image stream found")))?; + let (file_md5, file_sha1, file_stream_sha1, thumb_md5, thumb_sha1) = + self.resolve_video(vs.clone(), is.clone()).await?; + let mut req = dda!(VideoC2CUploadEvent { + req: VideoC2CUploadArgs { + uid: target_uid.to_string(), + video_size: video.video_size as u32, + video_name: video + .video_path + .clone() + .unwrap_or_else(|| { format!("{}.mp4", hex::encode(&file_sha1)) }), + video_md5: file_md5, + video_sha1: file_sha1, + thumb_size: video.video_thumb_size as u32, + thumb_name: video + .video_thumb_path + .clone() + .unwrap_or_else(|| { format!("{}.jpg", hex::encode(&thumb_sha1)) }), + thumb_md5, + thumb_sha1, + thumb_width: video.video_thumb_width as u32, + thumb_height: video.video_thumb_height as u32, + summary: "[视频]".to_string(), + } + }); + let res = self.send_event(&mut req).await?; + let res: &VideoC2CUploadEvent = + downcast_major_event(&res).ok_or(ManiaError::InternalEventDowncastError)?; + let chunk_size = self.context.config.highway_chuck_size; + if res.res.u_key.as_ref().is_some() { + tracing::debug!( + "uploadC2CVideoReq (Video) get upload u_key: {}, need upload!", + res.res.u_key.as_ref().unwrap() + ); + let size = video.video_size as u32; + let msg_info_body = res.res.msg_info.msg_info_body.clone(); + let index_node = msg_info_body + .first() + .ok_or(ManiaError::GenericError(Cow::from( + "No index node in response", + )))? + .index + .as_ref() + .ok_or(ManiaError::GenericError(Cow::from("No index in response")))?; + let info = index_node + .info + .as_ref() + .ok_or(ManiaError::GenericError(Cow::from("No info in response")))?; + let md5 = hex::decode(&info.file_hash).map_err(HighwayError::HexError)?; + let extend = Ntv2RichMediaHighwayExt { + file_uuid: index_node.file_uuid.to_owned(), + u_key: res.res.u_key.to_owned().unwrap(), + network: Some(NtHighwayNetwork { + i_pv4s: oidb_ipv4s_to_highway_ipv4s(&res.res.ipv4s), + }), + msg_info_body: msg_info_body.to_owned(), + block_size: chunk_size as u32, + hash: Some({ + NtHighwayHash { + file_sha1: file_stream_sha1, + } + }), + } + .encode_to_vec(); + let client = self.highway.client.load(); + mut_stream_ctx(&vs, |s| { + Box::pin(async move { + client + .upload(1001, s, size, Bytes::from(md5), Bytes::from(extend)) + .await?; + Ok::<(), ManiaError>(()) + }) + }) + .await?; + } else { + tracing::debug!("No u_key in upload_c2c_video (Video) response, skip upload!"); + } + if let Some(sub_file) = res.res.sub_file_info.first() + && !sub_file.u_key.is_empty() + { + tracing::debug!( + "uploadC2CVideoReq (Thumb) get upload u_key: {}, need upload!", + sub_file.u_key + ); + let msg_info_body = res.res.msg_info.msg_info_body.to_owned(); + let index = res + .res + .msg_info + .msg_info_body + .get(1) + .ok_or(ManiaError::GenericError(Cow::from( + "No index node in response", + )))? + .index + .as_ref() + .ok_or(ManiaError::GenericError(Cow::from("No index in response")))?; + let info = index + .info + .as_ref() + .ok_or(ManiaError::GenericError(Cow::from("No info in response")))?; + let md5 = hex::decode(&info.file_hash).map_err(HighwayError::HexError)?; + let sha1 = hex::decode(&info.file_sha1).map_err(HighwayError::HexError)?; + let size = video.video_thumb_size as u32; + let extend = Ntv2RichMediaHighwayExt { + file_uuid: index.file_uuid.to_owned(), + u_key: sub_file.u_key.to_owned(), + network: Some(NtHighwayNetwork { + i_pv4s: oidb_ipv4s_to_highway_ipv4s(&res.res.ipv4s), + }), + msg_info_body, + block_size: chunk_size as u32, + hash: Some({ + NtHighwayHash { + file_sha1: vec![sha1], + } + }), + } + .encode_to_vec(); + let client = self.highway.client.load(); + mut_stream_ctx(&is, |s| { + Box::pin(async move { + client + .upload(1002, s, size, Bytes::from(md5), Bytes::from(extend)) + .await?; + Ok::<(), ManiaError>(()) + }) + }) + .await?; + } else { + tracing::debug!("No u_key in upload_c2c_video (Thumb) response, skip upload!"); + } + video.msg_info = Some(res.res.msg_info.to_owned()); + video.compat = Some(res.res.video_file.to_owned()); + Ok(()) + } } diff --git a/mania/src/message/builder.rs b/mania/src/message/builder.rs index 0e8d088..d76e89e 100644 --- a/mania/src/message/builder.rs +++ b/mania/src/message/builder.rs @@ -4,6 +4,7 @@ use crate::message::chain::MessageChain; use crate::message::entity::Entity; use crate::message::entity::image::ImageEntity; use crate::message::entity::text::TextEntity; +use crate::message::entity::video::VideoEntity; use std::sync::Arc; use tokio::sync::Mutex; @@ -53,6 +54,60 @@ impl MessageChainBuilder { self } + pub fn video(&mut self, video_path: &str, video_length: i32) -> &mut Self { + self.chains.entities.push(Entity::Video(dda!(VideoEntity { + video_path: Some(video_path.to_string()), + video_length, + }))); + self + } + + pub fn video_with_thumb( + &mut self, + video_path: &str, + video_length: i32, + thumb_path: &str, + ) -> &mut Self { + self.chains.entities.push(Entity::Video(dda!(VideoEntity { + video_path: Some(video_path.to_string()), + video_length, + video_thumb_path: Some(thumb_path.to_string()), + }))); + self + } + + pub fn video_stream( + &mut self, + video_stream: impl AsyncPureStreamTrait + 'static, + video_length: i32, + ) -> &mut Self { + self.chains.entities.push(Entity::Video(dda!(VideoEntity { + video_stream: Some(Arc::new(Mutex::new( + Box::new(video_stream) as AsyncPureStream + ))), + video_length + }))); + self + } + + pub fn video_stream_with_thumb( + &mut self, + video_stream: impl AsyncPureStreamTrait + 'static, + video_length: i32, + thumb_stream: impl AsyncPureStreamTrait + 'static, + ) -> &mut Self { + self.chains.entities.push(Entity::Video(dda!(VideoEntity { + video_stream: Some(Arc::new(Mutex::new( + Box::new(video_stream) as AsyncPureStream + ))), + video_thumb_stream: Some(Arc::new(Mutex::new( + Box::new(thumb_stream) as AsyncPureStream + ))), + video_length + }))); + self + } + pub fn build(&mut self) -> MessageChain { std::mem::take(&mut self.chains) } diff --git a/mania/src/message/entity.rs b/mania/src/message/entity.rs index b5f572c..ea6287e 100644 --- a/mania/src/message/entity.rs +++ b/mania/src/message/entity.rs @@ -33,9 +33,11 @@ pub use video::VideoEntity as Video; pub use xml::XmlEntity as Xml; use crate::Context; +use crate::core::highway::{AsyncPureStream, AsyncStream}; use crate::core::protos::message::Elem; use bytes::Bytes; use std::fmt::{Debug, Display}; +use std::sync::Arc; pub trait MessageContentImplChecker { fn need_pack(&self) -> bool; @@ -52,6 +54,21 @@ pub trait MessageEntity: Debug + Display + MessageContentImpl { Self: Sized; } +impl dyn MessageEntity { + async fn resolve_stream(file_path: &Option) -> Option<(AsyncStream, u32)> { + if let Some(file_path) = file_path { + let file = tokio::fs::File::open(file_path).await.ok()?; + let size = file.metadata().await.ok()?.len() as u32; + Some(( + Arc::new(tokio::sync::Mutex::new(Box::new(file) as AsyncPureStream)), + size, + )) + } else { + None + } + } +} + #[allow(clippy::large_enum_variant)] // FIXME: do we need refactoring? pub enum Entity { Text(text::TextEntity), diff --git a/mania/src/message/entity/record.rs b/mania/src/message/entity/record.rs index 3fe5a95..09c9b73 100644 --- a/mania/src/message/entity/record.rs +++ b/mania/src/message/entity/record.rs @@ -1,5 +1,7 @@ use super::prelude::*; +use crate::core::highway::AsyncPureStream; use crate::core::protos::service::oidb::MsgInfo; +use std::sync::Arc; #[pack_content(false)] #[derive(Default)] pub struct RecordEntity { @@ -7,13 +9,30 @@ pub struct RecordEntity { pub audio_md5: Bytes, pub audio_name: String, pub audio_url: String, - // TODO: stream + pub file_path: Option, + pub file_size: u32, + pub audio_stream: Option, pub(crate) audio_uuid: Option, pub(crate) file_sha1: Option, pub(crate) msg_info: Option, pub(crate) compat: Option, } +impl RecordEntity { + pub(crate) async fn resolve_stream(&mut self) -> Option { + if let Some(file_path) = &self.file_path { + let file = tokio::fs::File::open(file_path).await.ok()?; + let size = file.metadata().await.ok()?.len() as u32; + self.file_size = size; + Some(Arc::new(tokio::sync::Mutex::new( + Box::new(file) as AsyncPureStream + ))) + } else { + self.audio_stream.clone() + } + } +} + impl Debug for RecordEntity { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { write!(f, "[Record]: {}", self.audio_url) diff --git a/mania/src/message/entity/video.rs b/mania/src/message/entity/video.rs index 355c776..0c22bc6 100644 --- a/mania/src/message/entity/video.rs +++ b/mania/src/message/entity/video.rs @@ -1,5 +1,8 @@ use super::prelude::*; +use crate::core::highway::AsyncPureStream; use crate::core::protos::service::oidb::{IndexNode, MsgInfo}; +use std::io::Cursor; +use std::sync::Arc; const DEFAULT_THUMB: [u8; 2643] = [ 0x1F, 0x8B, 0x08, 0x08, 0x0B, 0x68, 0xA6, 0x67, 0x02, 0xFF, 0x6F, 0x75, 0x74, 0x2E, 0x62, 0x69, @@ -179,15 +182,65 @@ pub struct VideoEntity { pub width: i32, pub video_size: i32, pub video_length: i32, + pub video_thumb_size: i32, + pub video_thumb_height: i32, + pub video_thumb_width: i32, pub video_url: String, - // TODO: stream (video, thumb) - // TODO: maybe we can support video preview pic? + pub video_path: Option, + pub video_stream: Option, + pub video_thumb_path: Option, + pub video_thumb_stream: Option, pub(crate) node: Option, // for download, 2025/02/08 pub(crate) video_uuid: Option, pub(crate) msg_info: Option, pub(crate) compat: Option, } +impl VideoEntity { + pub(crate) async fn resolve_stream(&mut self) -> (Option, Option) { + let load_stream = |path: String| async move { + let file = tokio::fs::File::open(path).await.ok()?; + let metadata = file.metadata().await.ok()?; + let size = metadata.len() as i32; + let stream = Arc::new(tokio::sync::Mutex::new(Box::new(file) as AsyncPureStream)); + Some((stream, size)) + }; + + let video_stream = if let Some(video_path) = self.video_path.as_ref() { + if let Some((stream, size)) = load_stream(video_path.clone()).await { + self.video_size = size; + Some(stream) + } else { + None + } + } else { + self.video_stream.clone() + }; + + let video_thumb_stream = if let Some(thumb_path) = self.video_thumb_path.as_ref() { + if let Some((stream, size)) = load_stream(thumb_path.clone()).await { + self.video_thumb_size = size; + Some(stream) + } else { + None + } + } else { + match self.video_thumb_stream.as_ref() { + Some(stream) => Some(stream.clone()), + None => { + let cursor = Cursor::new(DEFAULT_THUMB.to_vec()); + let stream = + Arc::new(tokio::sync::Mutex::new(Box::new(cursor) as AsyncPureStream)); + self.video_thumb_size = DEFAULT_THUMB.len() as i32; + Some(stream) + } + } + }; + + (video_stream, video_thumb_stream) + } +} + impl Debug for VideoEntity { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { write!( @@ -207,12 +260,7 @@ impl Display for VideoEntity { impl MessageEntity for VideoEntity { fn pack_element(&self, _: &Context) -> Vec { let common = self.msg_info.as_ref().map_or_else( - || { - MsgInfo { - ..Default::default() - } - .encode_to_vec() - }, + || MsgInfo::default().encode_to_vec(), |msg_info| msg_info.encode_to_vec(), ); vec![dda!(Elem { diff --git a/mania/src/utility/stream_helper.rs b/mania/src/utility/stream_helper.rs index a5b5687..c00854e 100644 --- a/mania/src/utility/stream_helper.rs +++ b/mania/src/utility/stream_helper.rs @@ -5,8 +5,8 @@ use tokio::sync::Mutex; pub async fn mut_stream_ctx(lock: &Mutex, f: F) -> Result where - T: Send + 'static, - F: for<'a> FnOnce(&'a mut T) -> Pin> + Send + 'a>>, + T: Send + Sync + 'static, + F: for<'a> FnOnce(&'a mut T) -> Pin> + Send + Sync + 'a>>, { let mut guard = lock.lock().await; f(&mut *guard).await