From 25ff9d994f380d180c0c579f9e2a8407671f091c Mon Sep 17 00:00:00 2001 From: pk5ls20 Date: Thu, 20 Feb 2025 01:51:29 +0800 Subject: [PATCH] feat: upload c2c image & stream sha1 --- Cargo.lock | 1 + mania/Cargo.toml | 1 + mania/src/core/crypto.rs | 1 + mania/src/core/crypto/stream_sha1.rs | 129 ++++++++++++++++++ .../core/event/message/image_c2c_upload.rs | 126 +++++++++++++++++ mania/src/core/event/message/mod.rs | 1 + mania/src/core/operation/highway_op.rs | 118 +++++++++++++--- mania/src/message/entity/image.rs | 13 ++ 8 files changed, 373 insertions(+), 17 deletions(-) create mode 100644 mania/src/core/crypto/stream_sha1.rs create mode 100644 mania/src/core/event/message/image_c2c_upload.rs diff --git a/Cargo.lock b/Cargo.lock index 112338b..932ef55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1223,6 +1223,7 @@ dependencies = [ "bytes", "chrono", "dashmap", + "digest 0.11.0-pre.9", "elliptic-curve", "flate2", "futures", diff --git a/mania/Cargo.toml b/mania/Cargo.toml index 789115a..67599ea 100644 --- a/mania/Cargo.toml +++ b/mania/Cargo.toml @@ -44,6 +44,7 @@ quick-xml = { version = "0.37.2", features = ["serialize"] } regex = "1.11.1" tokio-util = "0.7.13" sha1 = "0.11.0-pre.4" +digest = "0.11.0-pre.9" [build-dependencies] prost-build = "0.13.4" diff --git a/mania/src/core/crypto.rs b/mania/src/core/crypto.rs index 94b2041..8399bc3 100644 --- a/mania/src/core/crypto.rs +++ b/mania/src/core/crypto.rs @@ -2,6 +2,7 @@ use md5::{Digest, Md5}; use p256::{EncodedPoint, PublicKey, ecdh::EphemeralSecret}; pub mod consts; +mod stream_sha1; pub mod tea; /// The original macro that @wybxc originally wrote (b81f75b7) was perfect. diff --git a/mania/src/core/crypto/stream_sha1.rs b/mania/src/core/crypto/stream_sha1.rs new file mode 100644 index 0000000..05721d5 --- /dev/null +++ b/mania/src/core/crypto/stream_sha1.rs @@ -0,0 +1,129 @@ +use core::{mem, slice::from_ref}; +use digest::{ + array::Array, + block_buffer::{BlockBuffer, Eager}, + typenum::{U64, Unsigned}, +}; +use sha1::compress; + +#[derive(Default)] +struct StreamSha1Core { + h: [u32; 5] = [0x67452301, 0xEFCDAB89, 0x98BADCFE, 0x10325476, 0xC3D2E1F0], + block_len: u64 = 0, + buffer: BlockBuffer, +} + +impl StreamSha1Core { + #[inline] + fn hash(&self) -> [u8; 20] { + let mut digest = [0u8; 20]; + for (i, &value) in self.h.iter().enumerate() { + digest[i * 4..(i + 1) * 4].copy_from_slice(&value.to_le_bytes()); + } + digest + } + + #[inline] + pub fn update(&mut self, input: &[u8]) { + self.buffer.digest_blocks(input, |blocks| { + self.block_len += blocks.len() as u64; + let blocks_core = Array::cast_slice_to_core(blocks); + compress(&mut self.h, blocks_core); + }); + } + + #[inline] + pub fn finalize(mut self) -> [u8; 20] { + let bs = U64::U64; + let bit_len = 8 * (self.buffer.get_pos() as u64 + bs * self.block_len); + let mut h = self.h; + self.buffer + .len64_padding_be(bit_len, |b| compress(&mut h, from_ref(&b.0))); + let mut out = [0u8; 20]; + for (chunk, v) in out.chunks_exact_mut(4).zip(h.iter()) { + chunk.copy_from_slice(&v.to_be_bytes()); + } + out + } +} + +#[derive(Default)] +pub struct StreamSha1 { + hasher: StreamSha1Core, + block_size: usize = 1024 * 1024, + offset: usize = 0, + digests_stream: Vec<[u8; 20]>, +} + +impl StreamSha1 { + pub fn new() -> Self { + Self::default() + } + + pub fn new_with_size(block_size: usize) -> Self { + Self { + hasher: StreamSha1Core::default(), + block_size, + offset: 0, + digests_stream: Vec::new(), + } + } + + pub fn update(&mut self, data: &[u8]) { + if data.len() + self.offset < self.block_size { + self.hasher.update(data); + self.offset += data.len(); + } else { + let range = self.block_size - self.offset; + let block = &data[..range]; + self.hasher.update(block); + let hash = self.hasher.hash(); + self.digests_stream.push(hash); + self.offset = 0; + self.update(&data[range..]); + } + } + + pub fn finalize(mut self) -> Vec<[u8; 20]> { + let final_digest = self.hasher.finalize(); + self.digests_stream.push(final_digest); + mem::take(&mut self.digests_stream) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const EXPECTED_DIGESTS: [u8; 300] = [ + 0x15, 0x97, 0xb2, 0x42, 0x22, 0x2f, 0x46, 0x3d, 0xe7, 0xbd, 0xf4, 0xdf, 0x75, 0xd9, 0xdb, + 0xca, 0xd4, 0xc9, 0x52, 0x33, 0x41, 0x5f, 0xbb, 0x1f, 0x1f, 0x65, 0xaf, 0xee, 0x93, 0x53, + 0x31, 0x2a, 0xdb, 0x43, 0x53, 0xdd, 0x2b, 0x69, 0x8c, 0x02, 0x9d, 0x9d, 0x05, 0x86, 0x70, + 0xc3, 0x5d, 0x68, 0x00, 0xe9, 0x2d, 0xbf, 0xce, 0x6d, 0xd8, 0xb1, 0x1e, 0xfe, 0x80, 0x0f, + 0x73, 0x71, 0x8b, 0x0b, 0x61, 0x4d, 0xbb, 0x1a, 0xef, 0xcb, 0x47, 0x82, 0x8e, 0x78, 0x73, + 0x2b, 0xe0, 0x61, 0xb0, 0xca, 0x0f, 0x77, 0xd1, 0x0a, 0x70, 0xa9, 0xc0, 0x4e, 0xd8, 0x35, + 0xff, 0x45, 0x93, 0x98, 0x13, 0xfd, 0x39, 0xce, 0xee, 0x02, 0x4f, 0x9e, 0x2e, 0xc2, 0xc5, + 0x78, 0x2b, 0x5c, 0x79, 0x5c, 0xd2, 0xd8, 0xd3, 0xe3, 0xc3, 0xe4, 0x1e, 0x99, 0xf3, 0xce, + 0x81, 0xfd, 0xb4, 0x00, 0x3d, 0x89, 0x8a, 0x23, 0x01, 0xac, 0xee, 0xce, 0x13, 0x24, 0xe7, + 0x02, 0x1d, 0x53, 0x0e, 0x0a, 0x82, 0xb9, 0xaf, 0xb8, 0xcf, 0x40, 0xa1, 0xc6, 0x48, 0x43, + 0xb0, 0x01, 0xcb, 0xc9, 0x66, 0xd6, 0x18, 0x90, 0x39, 0x1a, 0xc5, 0xa0, 0xe2, 0xf3, 0xca, + 0x9d, 0x90, 0xfc, 0xa9, 0xe3, 0xd8, 0xc5, 0x2b, 0x64, 0xe1, 0xac, 0xc0, 0x19, 0x64, 0x0f, + 0x98, 0xeb, 0x7a, 0xbc, 0x36, 0x50, 0xa1, 0xab, 0x09, 0xb7, 0xc9, 0x2d, 0x9f, 0x82, 0xb9, + 0x8c, 0x6a, 0xe5, 0x89, 0x6d, 0x35, 0x1d, 0xe4, 0x38, 0xe0, 0x38, 0xdc, 0x7a, 0x73, 0x46, + 0xd1, 0xf9, 0xbf, 0x20, 0x76, 0x01, 0x77, 0xe4, 0x8b, 0x1d, 0x0b, 0x39, 0xed, 0x63, 0x76, + 0x1c, 0x8b, 0x81, 0x1d, 0x02, 0xd0, 0x02, 0x23, 0x71, 0x51, 0x8b, 0x4a, 0x11, 0xe1, 0x5e, + 0x73, 0xdb, 0xa7, 0x5f, 0xee, 0x54, 0x74, 0xb5, 0x59, 0x8d, 0x9f, 0xfe, 0x41, 0x79, 0xb1, + 0xbf, 0xe3, 0x1f, 0xaf, 0x85, 0x03, 0xde, 0x3b, 0x16, 0x29, 0xee, 0xe7, 0x2e, 0x35, 0xa2, + 0x46, 0xb7, 0x15, 0x58, 0xf9, 0x8f, 0x87, 0xa0, 0xfa, 0x1b, 0x59, 0x23, 0x63, 0x94, 0xfc, + 0xd0, 0x64, 0x11, 0x38, 0x68, 0xd6, 0x84, 0x09, 0x16, 0x8a, 0x66, 0xad, 0xb1, 0x1d, 0x19, + ]; + + #[test] + fn test_stream_sha1() { + let data = b"qwq".repeat(2233 * 2233); + let mut hasher = StreamSha1::new(); + hasher.update(&data); + let digest = hasher.finalize().concat(); + assert_eq!(digest, EXPECTED_DIGESTS); + } +} diff --git a/mania/src/core/event/message/image_c2c_upload.rs b/mania/src/core/event/message/image_c2c_upload.rs new file mode 100644 index 0000000..dc66048 --- /dev/null +++ b/mania/src/core/event/message/image_c2c_upload.rs @@ -0,0 +1,126 @@ +use crate::core::event::prelude::*; +use crate::core::protos::message::NotOnlineImage; +use crate::core::protos::service::oidb::{ + BytesPbReserveC2c, C2cUserInfo, ClientMeta, CommonHead, ExtBizInfo, FileInfo, FileType, IPv4, + MsgInfo, MultiMediaReqHead, Ntv2RichMediaReq, Ntv2RichMediaResp, PicExtBizInfo, PttExtBizInfo, + SceneInfo, UploadInfo, UploadReq, VideoExtBizInfo, +}; +use crate::utility::random_gen::RandomGenerator; + +#[derive(Debug, Default)] +pub struct ImageC2CUploadArgs { + pub uid: String, + pub size: u32, + pub md5: Bytes, + pub sha1: Bytes, + pub name: String, + pub pic_type: u32, + pub sub_type: u32, + pub height: u32, + pub width: u32, + pub summary: String, +} + +#[derive(Debug, Default)] +pub struct ImageC2CUploadRes { + pub msg_info: MsgInfo, + pub not_online_image: NotOnlineImage, + pub u_key: Option, + pub ipv4s: Vec, +} + +#[oidb_command(0x11c5, 100)] +#[derive(Debug, ServerEvent, Default)] +pub struct ImageC2CUploadEvent { + pub req: ImageC2CUploadArgs, + pub res: ImageC2CUploadRes, +} + +impl ClientEvent for ImageC2CUploadEvent { + fn build(&self, _: &Context) -> CEBuildResult { + let req = dda!(Ntv2RichMediaReq { + req_head: Some(MultiMediaReqHead { + common: Some(CommonHead { + request_id: 1, + command: 100, + }), + scene: Some(dda!(SceneInfo { + request_type: 2, + business_type: 1, + scene_type: 1, + c2c: Some(C2cUserInfo { + account_type: 2, + target_uid: self.req.uid.clone(), + }) + })), + client: Some(ClientMeta { agent_type: 2 }), + }), + upload: Some(UploadReq { + upload_info: vec![UploadInfo { + file_info: Some(FileInfo { + file_size: self.req.size, + file_hash: hex::encode(self.req.md5.clone()), + file_sha1: hex::encode(self.req.sha1.clone()), + file_name: self.req.name.to_owned(), + r#type: Some(FileType { + r#type: 1, + pic_format: self.req.pic_type, + video_format: 0, + voice_format: 0, + }), + width: self.req.width, + height: self.req.height, + time: 0, + original: 1, + }), + sub_file_type: 0, + }], + try_fast_upload_completed: true, + srv_send_msg: false, + client_random_id: RandomGenerator::rand_u64(), + compat_q_msg_scene_type: 2, + ext_biz_info: Some(dda!(ExtBizInfo { + pic: Some(dda!(PicExtBizInfo { + biz_type: self.req.sub_type, + text_summary: self.req.summary.clone(), + bytes_pb_reserve_c2c: Some(dda!(BytesPbReserveC2c { + sub_type: self.req.sub_type, + field8: self.req.summary.clone(), + })), + })), + video: Some(dda!(VideoExtBizInfo { + bytes_pb_reserve: vec![], + })), + ptt: Some(dda!(PttExtBizInfo { + bytes_reserve: vec![], + bytes_pb_reserve: vec![], + bytes_general_flags: vec![], + })), + })), + client_seq: 0, + no_need_compat_msg: false, + }), + }); + Ok(OidbPacket::new(0x11c5, 100, req.encode_to_vec(), false, true).to_binary()) + } + + fn parse(packet: Bytes, _: &Context) -> CEParseResult { + let resp = OidbPacket::parse_into::(packet)?; + let upload = resp + .upload + .ok_or_else(|| EventError::OtherError("Missing UploadResp".to_string()))?; + let msg_info = upload + .msg_info + .ok_or_else(|| EventError::OtherError("Missing MsgInfo".to_string()))?; + let not_online_image = NotOnlineImage::decode(Bytes::from(upload.compat_q_msg))?; + let ipv4s = upload.i_pv4s; + Ok(ClientResult::single(Box::new(dda!(Self { + res: ImageC2CUploadRes { + msg_info, + not_online_image, + ipv4s, + u_key: upload.u_key, + } + })))) + } +} diff --git a/mania/src/core/event/message/mod.rs b/mania/src/core/event/message/mod.rs index 3b14bd1..d856ce8 100644 --- a/mania/src/core/event/message/mod.rs +++ b/mania/src/core/event/message/mod.rs @@ -1,6 +1,7 @@ pub mod file_c2c_download; pub mod file_group_download; pub mod image_c2c_download; +pub mod image_c2c_upload; pub mod image_group_download; pub mod image_group_upload; pub mod multi_msg_download; diff --git a/mania/src/core/operation/highway_op.rs b/mania/src/core/operation/highway_op.rs index 4b8956f..d7558ce 100644 --- a/mania/src/core/operation/highway_op.rs +++ b/mania/src/core/operation/highway_op.rs @@ -1,11 +1,12 @@ use crate::core::business::BusinessHandle; use crate::core::event::downcast_major_event; +use crate::core::event::message::image_c2c_upload::{ImageC2CUploadArgs, ImageC2CUploadEvent}; use crate::core::event::message::image_group_upload::{ ImageGroupUploadArgs, ImageGroupUploadEvent, }; use crate::core::event::system::fetch_highway_ticket::FetchHighwayTicketEvent; use crate::core::highway::hw_client::HighwayClient; -use crate::core::highway::{AsyncPureStream, AsyncStream, oidb_ipv4s_to_highway_ipv4s}; +use crate::core::highway::{AsyncStream, oidb_ipv4s_to_highway_ipv4s}; use crate::core::protos::service::highway::{ NtHighwayHash, NtHighwayNetwork, Ntv2RichMediaHighwayExt, }; @@ -51,9 +52,9 @@ impl BusinessHandle { async fn resolve_image( self: &Arc, - stream_locker: AsyncStream, + stream_ctx: AsyncStream, ) -> ManiaResult<((ImageFormat, u32, u32), Bytes, Bytes)> { - let (iv, sha1_bytes, md5_bytes) = mut_stream_ctx(&stream_locker, |s| { + let (iv, sha1_bytes, md5_bytes) = mut_stream_ctx(&stream_ctx, |s| { Box::pin(async move { let mut sha1_hasher = Sha1::new(); let mut md5_hasher = Md5::new(); @@ -82,19 +83,10 @@ impl BusinessHandle { image: &mut ImageEntity, ) -> ManiaResult<()> { self.prepare_highway().await?; - let stream = match (&image.file_path, &image.image_stream, &mut image.size) { - (Some(file_path), _, size) => { - let file = tokio::fs::File::open(file_path).await?; - *size = file.metadata().await?.len() as u32; - Arc::new(tokio::sync::Mutex::new(Box::new(file) as AsyncPureStream)) - } - (_, Some(stream), _) => stream.clone(), - _ => { - return Err(ManiaError::GenericError(Cow::from( - "No image stream or file path", - ))); - } - }; + let stream = image + .resolve_stream() + .await + .ok_or(ManiaError::GenericError(Cow::from("No image stream found")))?; let (iv, sha1, md5) = self.resolve_image(stream.clone()).await?; let mut req = dda!(ImageGroupUploadEvent { @@ -171,10 +163,102 @@ impl BusinessHandle { .await?; tracing::debug!("Successfully uploaded group image!"); } else { - tracing::debug!("No u_key in response, skip upload!"); + tracing::debug!("No u_key in upload_group_image response, skip upload!"); } image.msg_info = Some(res.res.msg_info.to_owned()); image.custom_face = res.res.custom_face.to_owned(); Ok(()) } + + pub async fn upload_c2c_image( + self: &Arc, + target_uid: &str, + image: &mut ImageEntity, + ) -> ManiaResult<()> { + self.prepare_highway().await?; + let stream = image + .resolve_stream() + .await + .ok_or(ManiaError::GenericError(Cow::from("No image stream found")))?; + let (iv, sha1, md5) = self.resolve_image(stream.clone()).await?; + let mut req = dda!(ImageC2CUploadEvent { + req: ImageC2CUploadArgs { + uid: target_uid.to_string(), + size: image.size, + name: image.file_path.clone().unwrap_or_else(|| format!( + "{}.{}", + hex::encode(&sha1), + iv.0 + )), + md5, + sha1, + pic_type: iv.0 as u32, + sub_type: image.sub_type, + summary: image.summary.clone().unwrap_or("[图片]".to_string()), + width: iv.1, + height: iv.2, + }, + }); + let res = self.send_event(&mut req).await?; + let res: &ImageC2CUploadEvent = + downcast_major_event(&res).ok_or(ManiaError::InternalEventDowncastError)?; + if res.res.u_key.as_ref().is_some() { + tracing::debug!( + "uploadC2CImageReq get upload u_key: {}, need upload!", + res.res.u_key.as_ref().unwrap() + ); + let size = image.size; + let chunk_size = self.context.config.highway_chuck_size; + let msg_info_body = res.res.msg_info.msg_info_body.to_owned(); + let index_node = msg_info_body + .first() + .ok_or(ManiaError::GenericError(Cow::from( + "No index node in response", + )))? + .index + .as_ref() + .ok_or(ManiaError::GenericError(Cow::from("No index in response")))?; + let info = index_node + .info + .as_ref() + .ok_or(ManiaError::GenericError(Cow::from("No info in response")))?; + let sha1 = hex::decode(&info.file_sha1).map_err(|e| { + ManiaError::GenericError(Cow::from(format!("Hex decode error: {:?}", e))) + })?; + let md5 = hex::decode(&info.file_hash).map_err(|e| { + ManiaError::GenericError(Cow::from(format!("Hex decode error: {:?}", e))) + })?; + let extend = Ntv2RichMediaHighwayExt { + file_uuid: index_node.file_uuid.to_owned(), + u_key: res.res.u_key.to_owned().unwrap(), + network: Some(NtHighwayNetwork { + i_pv4s: oidb_ipv4s_to_highway_ipv4s(&res.res.ipv4s), + }), + msg_info_body: msg_info_body.to_owned(), + block_size: chunk_size as u32, + hash: Some({ + NtHighwayHash { + file_sha1: vec![sha1], + } + }), + } + .encode_to_vec(); + let client = self.highway.client.load(); + mut_stream_ctx(&stream, |s| { + Box::pin(async move { + client + .upload(1003, s, size, Bytes::from(md5), Bytes::from(extend)) + .await?; + Ok::<(), ManiaError>(()) + }) + }) + .await?; + tracing::debug!("Successfully uploaded c2c image!"); + } else { + tracing::debug!("No u_key in upload_c2c_image response, skip upload!"); + } + image.msg_info = Some(res.res.msg_info.to_owned()); + image.not_online_image = res.res.not_online_image.to_owned(); + Ok(()) + } } diff --git a/mania/src/message/entity/image.rs b/mania/src/message/entity/image.rs index c519ec1..86aba5c 100644 --- a/mania/src/message/entity/image.rs +++ b/mania/src/message/entity/image.rs @@ -1,5 +1,7 @@ use super::prelude::*; +use crate::core::highway::AsyncPureStream; use crate::core::protos::service::oidb::MsgInfo; +use std::sync::Arc; #[pack_content(false)] #[derive(Default)] @@ -29,6 +31,17 @@ impl ImageEntity { }, } } + + pub(crate) async fn resolve_stream(&self) -> Option { + if let Some(file_path) = &self.file_path { + let file = tokio::fs::File::open(file_path).await.ok()?; + Some(Arc::new(tokio::sync::Mutex::new( + Box::new(file) as AsyncPureStream + ))) + } else { + self.image_stream.clone() + } + } } impl Debug for ImageEntity {