Skip to content

Commit 8fcf0e5

Browse files
committed
feat(s2n-quic-core): add buffer reader/writer traits
1 parent 577f2d4 commit 8fcf0e5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+3055
-211
lines changed

quic/s2n-quic-bench/src/buffer.rs

+12-30
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
// SPDX-License-Identifier: Apache-2.0
33

44
use criterion::{black_box, BenchmarkId, Criterion, Throughput};
5-
use s2n_quic_core::{buffer::ReceiveBuffer, varint::VarInt};
5+
use s2n_quic_core::{
6+
buffer::{reader::Storage as _, writer, Reassembler},
7+
varint::VarInt,
8+
};
69

710
pub fn benchmarks(c: &mut Criterion) {
811
let mut group = c.benchmark_group("buffer");
@@ -13,19 +16,21 @@ pub fn benchmarks(c: &mut Criterion) {
1316
group.throughput(Throughput::Bytes(input.len() as _));
1417

1518
group.bench_with_input(BenchmarkId::new("skip", size), &input, |b, _input| {
16-
let mut buffer = ReceiveBuffer::new();
19+
let mut buffer = Reassembler::new();
20+
let size = VarInt::try_from(size).unwrap();
1721
b.iter(move || {
1822
buffer.skip(black_box(size)).unwrap();
1923
});
2024
});
2125

2226
group.bench_with_input(BenchmarkId::new("write_at", size), &input, |b, input| {
23-
let mut buffer = ReceiveBuffer::new();
27+
let mut buffer = Reassembler::new();
2428
let mut offset = VarInt::from_u8(0);
2529
let len = VarInt::new(input.len() as _).unwrap();
2630
b.iter(move || {
2731
buffer.write_at(offset, input).unwrap();
28-
buffer.copy_into_buf(&mut NoOpBuf);
32+
// Avoid oversampling the `pop` implementation
33+
buffer.copy_into(&mut writer::storage::Discard).unwrap();
2934
offset += len;
3035
});
3136
});
@@ -36,42 +41,19 @@ pub fn benchmarks(c: &mut Criterion) {
3641
BenchmarkId::new("write_at_fragmented", size),
3742
&input,
3843
|b, input| {
39-
let mut buffer = ReceiveBuffer::new();
44+
let mut buffer = Reassembler::new();
4045
let mut offset = VarInt::from_u8(0);
4146
let len = VarInt::new(input.len() as _).unwrap();
4247
b.iter(move || {
4348
let first_offset = offset + len;
4449
buffer.write_at(first_offset, input).unwrap();
4550
let second_offset = offset;
4651
buffer.write_at(second_offset, input).unwrap();
47-
buffer.copy_into_buf(&mut NoOpBuf);
52+
// Avoid oversampling the `pop` implementation
53+
buffer.copy_into(&mut writer::storage::Discard).unwrap();
4854
offset = first_offset + len;
4955
});
5056
},
5157
);
5258
}
5359
}
54-
55-
/// A BufMut implementation that doesn't actually copy data into it
56-
///
57-
/// This is used to avoid oversampling the `pop` implementation for
58-
/// `write_at` benchmarks.
59-
struct NoOpBuf;
60-
61-
unsafe impl bytes::BufMut for NoOpBuf {
62-
#[inline]
63-
fn remaining_mut(&self) -> usize {
64-
usize::MAX
65-
}
66-
67-
#[inline]
68-
unsafe fn advance_mut(&mut self, _cnt: usize) {}
69-
70-
#[inline]
71-
fn put_slice(&mut self, _slice: &[u8]) {}
72-
73-
#[inline]
74-
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
75-
unimplemented!()
76-
}
77-
}

quic/s2n-quic-core/src/buffer.rs

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
pub mod duplex;
5+
mod error;
6+
pub mod reader;
7+
pub mod reassembler;
8+
pub mod writer;
9+
10+
pub use duplex::Duplex;
11+
pub use error::Error;
12+
pub use reader::Reader;
13+
pub use reassembler::Reassembler;
14+
pub use writer::Writer;
+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use super::{Error, Reader, Writer};
5+
use crate::varint::VarInt;
6+
7+
mod split;
8+
9+
pub use split::Split;
10+
11+
pub trait Duplex: Reader + Writer {}
12+
13+
impl<T: Reader + Writer> Duplex for T {}
14+
15+
pub trait Skip: Duplex {
16+
fn skip(&mut self, len: VarInt, final_offset: Option<VarInt>) -> Result<(), Error>;
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::{
5+
buffer::{
6+
duplex,
7+
reader::{self, storage::Infallible as _, Reader, Storage as _},
8+
writer::{self, Writer},
9+
Error,
10+
},
11+
varint::VarInt,
12+
};
13+
use core::convert::Infallible;
14+
15+
pub struct Split<'a, C, D>
16+
where
17+
C: writer::Storage + ?Sized,
18+
D: duplex::Skip<Error = Infallible> + ?Sized,
19+
{
20+
chunk: &'a mut C,
21+
duplex: &'a mut D,
22+
}
23+
24+
impl<'a, C, D> Split<'a, C, D>
25+
where
26+
C: writer::Storage + ?Sized,
27+
D: duplex::Skip<Error = Infallible> + ?Sized,
28+
{
29+
#[inline]
30+
pub fn new(chunk: &'a mut C, duplex: &'a mut D) -> Self {
31+
Self { chunk, duplex }
32+
}
33+
}
34+
35+
/// Delegates to the inner Duplex
36+
impl<'a, C, D> reader::Storage for Split<'a, C, D>
37+
where
38+
C: writer::Storage + ?Sized,
39+
D: duplex::Skip<Error = Infallible> + ?Sized,
40+
{
41+
type Error = D::Error;
42+
43+
#[inline]
44+
fn buffered_len(&self) -> usize {
45+
self.duplex.buffered_len()
46+
}
47+
48+
#[inline]
49+
fn buffer_is_empty(&self) -> bool {
50+
self.duplex.buffer_is_empty()
51+
}
52+
53+
#[inline]
54+
fn read_chunk(&mut self, watermark: usize) -> Result<reader::storage::Chunk<'_>, Self::Error> {
55+
self.duplex.read_chunk(watermark)
56+
}
57+
58+
#[inline]
59+
fn partial_copy_into<Dest>(
60+
&mut self,
61+
dest: &mut Dest,
62+
) -> Result<reader::storage::Chunk<'_>, Self::Error>
63+
where
64+
Dest: crate::buffer::writer::Storage + ?Sized,
65+
{
66+
self.duplex.partial_copy_into(dest)
67+
}
68+
69+
#[inline]
70+
fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<(), Self::Error>
71+
where
72+
Dest: crate::buffer::writer::Storage + ?Sized,
73+
{
74+
self.duplex.copy_into(dest)
75+
}
76+
}
77+
78+
/// Delegates to the inner Duplex
79+
impl<'a, C, D> Reader for Split<'a, C, D>
80+
where
81+
C: writer::Storage + ?Sized,
82+
D: duplex::Skip<Error = Infallible> + ?Sized,
83+
{
84+
#[inline]
85+
fn current_offset(&self) -> VarInt {
86+
self.duplex.current_offset()
87+
}
88+
89+
#[inline]
90+
fn final_offset(&self) -> Option<VarInt> {
91+
self.duplex.final_offset()
92+
}
93+
94+
#[inline]
95+
fn has_buffered_fin(&self) -> bool {
96+
self.duplex.has_buffered_fin()
97+
}
98+
99+
#[inline]
100+
fn is_consumed(&self) -> bool {
101+
self.duplex.is_consumed()
102+
}
103+
}
104+
105+
impl<'a, C, D> Writer for Split<'a, C, D>
106+
where
107+
C: writer::Storage + ?Sized,
108+
D: duplex::Skip<Error = Infallible> + ?Sized,
109+
{
110+
#[inline]
111+
fn copy_from<R>(&mut self, reader: &mut R) -> Result<(), Error<R::Error>>
112+
where
113+
R: Reader + ?Sized,
114+
{
115+
// enable reader checks
116+
let mut reader = reader.with_checks();
117+
let reader = &mut reader;
118+
119+
let final_offset = reader.final_offset();
120+
121+
{
122+
// if the chunk specializes writing zero-copy Bytes/BytesMut, then just write to the
123+
// receive buffer, since that's what it stores
124+
let mut should_delegate = C::SPECIALIZES_BYTES || C::SPECIALIZES_BYTES_MUT;
125+
126+
// if this packet is non-contiguous, then delegate to the wrapped writer
127+
should_delegate |= reader.current_offset() != self.duplex.current_offset();
128+
129+
// if the chunk doesn't have any remaining capacity, then delegate
130+
should_delegate |= !self.chunk.has_remaining_capacity();
131+
132+
if should_delegate {
133+
self.duplex.copy_from(reader)?;
134+
135+
// try pulling bytes out of the duplex if we can
136+
if !self.duplex.buffer_is_empty() && self.chunk.has_remaining_capacity() {
137+
self.duplex.infallible_copy_into(self.chunk);
138+
}
139+
140+
return Ok(());
141+
}
142+
}
143+
144+
debug_assert!(
145+
self.chunk.has_remaining_capacity(),
146+
"this code should only be executed if the chunk is empty"
147+
);
148+
149+
{
150+
// track the number of consumed bytes
151+
let mut reader = reader.tracked();
152+
153+
reader.copy_into(self.chunk)?;
154+
155+
let write_len = reader.consumed_len();
156+
let write_len = VarInt::try_from(write_len).map_err(|_| Error::OutOfRange)?;
157+
158+
// notify the duplex that we bypassed it and should skip
159+
self.duplex
160+
.skip(write_len, final_offset)
161+
.map_err(Error::mapped)?;
162+
}
163+
164+
// if we still have some remaining bytes consume the rest in the duplex
165+
if !reader.buffer_is_empty() {
166+
self.duplex.copy_from(reader)?;
167+
}
168+
169+
Ok(())
170+
}
171+
}
+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
5+
pub enum Error<Reader = core::convert::Infallible> {
6+
/// An invalid data range was provided
7+
OutOfRange,
8+
/// The provided final size was invalid for the buffer's state
9+
InvalidFin,
10+
/// The provided reader failed
11+
ReaderError(Reader),
12+
}
13+
14+
impl<Reader> From<Reader> for Error<Reader> {
15+
#[inline]
16+
fn from(reader: Reader) -> Self {
17+
Self::ReaderError(reader)
18+
}
19+
}
20+
21+
impl Error {
22+
#[inline]
23+
pub fn mapped<Reader>(error: Error) -> Error<Reader> {
24+
match error {
25+
Error::OutOfRange => Error::OutOfRange,
26+
Error::InvalidFin => Error::InvalidFin,
27+
Error::ReaderError(_) => unreachable!(),
28+
}
29+
}
30+
}
31+
32+
#[cfg(feature = "std")]
33+
impl<Reader: std::error::Error> std::error::Error for Error<Reader> {}
34+
35+
impl<Reader: core::fmt::Display> core::fmt::Display for Error<Reader> {
36+
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
37+
match self {
38+
Self::OutOfRange => write!(f, "write extends out of the maximum possible offset"),
39+
Self::InvalidFin => write!(
40+
f,
41+
"write modifies the final offset in a non-compliant manner"
42+
),
43+
Self::ReaderError(reader) => write!(f, "the provided reader failed with: {reader}"),
44+
}
45+
}
46+
}
47+
48+
#[cfg(feature = "std")]
49+
impl<Reader: 'static + std::error::Error + Send + Sync> From<Error<Reader>> for std::io::Error {
50+
#[inline]
51+
fn from(error: Error<Reader>) -> Self {
52+
let kind = match &error {
53+
Error::OutOfRange => std::io::ErrorKind::InvalidData,
54+
Error::InvalidFin => std::io::ErrorKind::InvalidData,
55+
Error::ReaderError(_) => std::io::ErrorKind::Other,
56+
};
57+
Self::new(kind, error)
58+
}
59+
}

quic/s2n-quic-core/src/buffer/mod.rs

-6
This file was deleted.

0 commit comments

Comments
 (0)