Skip to content

Commit 7b807be

Browse files
committed
test(kvsd_protocol): fix connection test
1 parent 98c8865 commit 7b807be

File tree

4 files changed

+33
-11
lines changed

4 files changed

+33
-11
lines changed

crates/synd_kvsd_protocol/src/message/frame.rs

+25-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use thiserror::Error;
55
use crate::message::{cursor::Cursor, ioext::MessageWriteExt, spec, MessageError, MessageType};
66

77
mod prefix {
8-
// pub(super) const MESSAGE_FRAMES: u8 = b'*';
8+
pub(super) const MESSAGE_START: u8 = b'*';
9+
pub(super) const FRAME_LENGTH: u8 = b'@';
910
pub(super) const MESSAGE_TYPE: u8 = b'#';
1011
pub(super) const STRING: u8 = b'+';
1112
pub(super) const BYTES: u8 = b'$';
@@ -22,6 +23,8 @@ pub enum FrameError {
2223
InvalidMessageType(#[from] MessageError),
2324
#[error("invalid frame: {0}")]
2425
Invalid(String),
26+
#[error("invalid u64: {0}")]
27+
InvalidU64(String),
2528
// Other(common::Error),
2629
}
2730

@@ -30,6 +33,8 @@ pub(crate) type Time = chrono::DateTime<chrono::Utc>;
3033

3134
#[derive(Debug, Clone, PartialEq)]
3235
pub(crate) enum Frame {
36+
MessageStart,
37+
Length(u64),
3338
MessageType(MessageType),
3439
String(String),
3540
Bytes(Vec<u8>),
@@ -68,6 +73,11 @@ impl Frame {
6873

6974
fn read(src: &mut Cursor) -> Result<Frame, FrameError> {
7075
match src.u8()? {
76+
prefix::MESSAGE_START => Ok(Frame::MessageStart),
77+
prefix::FRAME_LENGTH => {
78+
let len = src.u64()?;
79+
Ok(Frame::Length(len))
80+
}
7181
prefix::MESSAGE_TYPE => MessageType::try_from(src.u8()?)
7282
.map_err(FrameError::InvalidMessageType)
7383
.map(Frame::MessageType),
@@ -120,6 +130,11 @@ impl Frame {
120130
W: MessageWriteExt,
121131
{
122132
match self {
133+
Frame::MessageStart => writer.write_u8(prefix::MESSAGE_START).await,
134+
Frame::Length(len) => {
135+
writer.write_u8(prefix::FRAME_LENGTH).await?;
136+
writer.write_u64m(len).await
137+
}
123138
Frame::MessageType(mt) => {
124139
writer.write_u8(prefix::MESSAGE_TYPE).await?;
125140
writer.write_u8(mt.into()).await
@@ -160,15 +175,15 @@ impl IntoIterator for MessageFrames {
160175

161176
impl MessageFrames {
162177
pub(super) fn new(mt: MessageType, capasity: usize) -> Self {
163-
let mut v = Vec::with_capacity(capasity + 1);
178+
let mut v = Vec::with_capacity(capasity + 3);
179+
let message_len = capasity + 1;
180+
181+
v.push(Frame::MessageStart);
182+
v.push(Frame::Length(message_len as u64));
164183
v.push(Frame::MessageType(mt));
165184
MessageFrames(v)
166185
}
167186

168-
pub(super) fn len(&self) -> usize {
169-
self.0.len()
170-
}
171-
172187
pub(crate) fn check_parse(src: &mut Cursor) -> Result<(), FrameError> {
173188
let frames_len = MessageFrames::frames_len(src)?;
174189

@@ -199,9 +214,12 @@ impl MessageFrames {
199214
}
200215

201216
fn frames_len(src: &mut Cursor) -> Result<u64, FrameError> {
202-
if src.u8()? != spec::MESSAGE_START {
217+
if src.u8()? != prefix::MESSAGE_START {
203218
return Err(FrameError::Invalid("message frames prefix expected".into()));
204219
}
220+
if src.u8()? != prefix::FRAME_LENGTH {
221+
return Err(FrameError::Invalid("message frame length expected".into()));
222+
}
205223
src.u64()
206224
}
207225

crates/synd_kvsd_protocol/src/message/mod.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ impl From<Message> for MessageFrames {
7878
impl Message {
7979
pub(crate) fn parse(frames: MessageFrames) -> Result<Message, MessageError> {
8080
let mut parse = Parse::new(frames);
81+
// skip message_start and frame_length
82+
parse.skip(2);
8183
let message_type = parse.message_type().ok_or(MessageError::ParseFrame {
8284
message: "message type not found",
8385
})?;
@@ -100,9 +102,6 @@ impl Message {
100102
{
101103
let frames: MessageFrames = self.into();
102104

103-
writer.write_u8(spec::MESSAGE_START).await?;
104-
writer.write_u64m(frames.len() as u64).await?;
105-
106105
for frame in frames {
107106
frame.write(&mut writer).await?;
108107
}

crates/synd_kvsd_protocol/src/message/parse.rs

+6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ impl Parse {
2222
}
2323
}
2424

25+
pub(super) fn skip(&mut self, n: usize) {
26+
for _ in 0..n {
27+
self.frames.next();
28+
}
29+
}
30+
2531
pub(super) fn message_type(&mut self) -> Option<MessageType> {
2632
self.next().ok().and_then(|frame| match frame {
2733
Frame::MessageType(mt) => Some(mt),
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
11
pub(super) const DELIMITER: &[u8] = b"\r\n";
2-
pub(super) const MESSAGE_START: u8 = b'*';

0 commit comments

Comments
 (0)