-
Notifications
You must be signed in to change notification settings - Fork 124
/
Copy pathtesting.rs
366 lines (301 loc) · 10.2 KB
/
testing.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
//! A model that ensures stream data is correctly sent and received between peers
use crate::buffer::{reader, writer};
use bytes::Bytes;
#[cfg(any(test, feature = "generator"))]
use bolero_generator::prelude::*;
static DATA: Bytes = {
static INNER: [u8; DATA_LEN] = {
let mut data = [0; DATA_LEN];
let mut idx = 0;
while idx < DATA_LEN {
data[idx] = idx as u8;
idx += 1;
}
data
};
Bytes::from_static(&INNER)
};
// when running with miri, set the values lower to make execution less expensive
const DATA_LEN: usize = (DEFAULT_STREAM_LEN as usize) * if cfg!(miri) { 1 } else { 128 };
const DEFAULT_STREAM_LEN: u64 = if cfg!(miri) { DATA_MOD as _ } else { 1024 };
const DATA_MOD: usize = 256; // Only the first 256 offsets of DATA are unique
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd, Eq, Ord)]
pub struct Data {
offset: u64,
final_offset: Option<u64>,
buffered_len: u64,
}
#[cfg(any(feature = "generator", test))]
impl TypeGenerator for Data {
fn generate<D: bolero_generator::Driver>(driver: &mut D) -> Option<Self> {
let offset = produce::<u64>().generate(driver)?;
let final_offset = produce::<Option<u64>>()
.with()
.value(offset..)
.generate(driver)?;
let max_buffered_len = if let Some(end) = final_offset {
let remaining = end - offset;
remaining.min(DEFAULT_STREAM_LEN)
} else {
DEFAULT_STREAM_LEN
};
let buffered_len = (..max_buffered_len).generate(driver)?;
Some(Data {
offset,
final_offset,
buffered_len,
})
}
}
impl Default for Data {
fn default() -> Self {
Self::new(DEFAULT_STREAM_LEN)
}
}
impl Data {
/// The maximum length of a chunk of data for the stream
pub const MAX_CHUNK_LEN: usize = DATA_LEN;
pub const fn new(len: u64) -> Self {
Self {
buffered_len: len,
final_offset: Some(len),
offset: 0,
}
}
/// Notifies the data that a set of chunks were received
pub fn receive<Chunk: AsRef<[u8]>>(&mut self, chunks: &[Chunk]) {
Self::receive_check(&mut self.offset, self.final_offset, chunks)
}
/// Notifies the data that a set of chunks were received
pub fn receive_at<Chunk: AsRef<[u8]>>(&self, mut start: u64, chunks: &[Chunk]) {
Self::receive_check(&mut start, self.final_offset, chunks)
}
fn receive_check<Chunk: AsRef<[u8]>>(
start: &mut u64,
final_offset: Option<u64>,
chunks: &[Chunk],
) {
for mut chunk in chunks.iter().map(AsRef::as_ref) {
while !chunk.is_empty() {
let offset = ((*start) % DATA_MOD as u64) as usize;
let len = chunk.len().min(DATA.len() - offset);
assert_eq!(
&chunk[..len],
&DATA[offset..offset + len],
"receive stream data at offset {} has been corrupted",
*start,
);
*start += len as u64;
chunk = &chunk[len..];
}
}
if let Some(final_offset) = final_offset {
assert!(
final_offset >= (*start),
"receive stream data has exceeded beyond the expected amount by {}",
(*start) - final_offset
);
}
}
/// Returns `true` if the stream is finished reading/writing
pub fn is_finished(&self) -> bool {
if let Some(fin) = self.final_offset {
fin <= self.offset
} else {
false
}
}
/// Asks the data what chunks should be sent next
///
/// Returns the number of chunks that were filled
pub fn send(&mut self, mut amount: usize, chunks: &mut [Bytes]) -> Option<usize> {
if self.is_finished() {
return None;
}
let mut count = 0;
for chunk in chunks.iter_mut() {
if let Some(data) = self.send_one(amount).filter(|data| !data.is_empty()) {
amount -= data.len();
count += 1;
*chunk = data;
} else {
break;
}
}
Some(count)
}
/// Sends a single chunk of data, up to the provided `amount`
pub fn send_one(&mut self, amount: usize) -> Option<Bytes> {
if self.is_finished() {
return None;
}
let amount = (self.buffered_len as usize).min(amount);
let chunk = Self::send_one_at(self.offset, amount);
self.seek_forward(chunk.len() as u64);
Some(chunk)
}
pub fn send_one_at(offset: u64, amount: usize) -> Bytes {
let offset = (offset % DATA_MOD as u64) as usize;
let to_send = amount.min(DATA.len() - offset);
if to_send == 0 {
return Bytes::new();
}
DATA.slice(offset..offset + to_send)
}
/// Returns the current offset being received or sent
pub fn offset(&self) -> u64 {
self.offset
}
/// Moves the current offset forward by the provided `len`
pub fn seek_forward(&mut self, len: u64) {
let len = self.buffered_len.min(len);
self.buffered_len -= len;
if let Some(new_offset) = self.offset.checked_add(len) {
self.offset = new_offset;
} else {
self.final_offset = Some(u64::MAX);
self.buffered_len = 0;
}
}
}
impl reader::Storage for Data {
type Error = core::convert::Infallible;
#[inline]
fn buffered_len(&self) -> usize {
self.buffered_len as usize
}
#[inline]
fn read_chunk(&mut self, watermark: usize) -> Result<reader::storage::Chunk, Self::Error> {
if let Some(chunk) = self.send_one(watermark) {
return Ok(chunk.into());
}
Ok(Default::default())
}
#[inline]
fn partial_copy_into<Dest: writer::Storage + ?Sized>(
&mut self,
dest: &mut Dest,
) -> Result<reader::storage::Chunk, Self::Error> {
while let Some(chunk) = self.send_one(dest.remaining_capacity()) {
// if the chunk matches the destination then return it instead of copying
if chunk.len() == dest.remaining_capacity() || self.is_finished() {
return Ok(chunk.into());
}
// otherwise copy the chunk into the destination
dest.put_bytes(chunk);
}
Ok(Default::default())
}
}
impl reader::Reader for Data {
#[inline]
fn current_offset(&self) -> crate::varint::VarInt {
self.offset()
.try_into()
.unwrap_or(crate::varint::VarInt::MAX)
}
#[inline]
fn final_offset(&self) -> Option<crate::varint::VarInt> {
self.final_offset.and_then(|v| v.try_into().ok())
}
}
impl writer::Storage for Data {
#[inline]
fn put_slice(&mut self, slice: &[u8]) {
self.receive(&[slice]);
}
#[inline]
fn remaining_capacity(&self) -> usize {
// return max so readers don't know where the stream is expected to end and we can make an
// assertion when they write
usize::MAX
}
}
impl writer::Writer for Data {
#[inline]
fn read_from<R>(&mut self, reader: &mut R) -> Result<(), crate::buffer::Error<R::Error>>
where
R: reader::Reader + ?Sized,
{
// no need to specialize on anything here
reader.copy_into(self)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use bolero::check;
#[test]
fn receive_test() {
let mut receiver = Data::new(10);
assert!(!receiver.is_finished());
receiver.receive(&[&[0, 1, 2]]);
assert!(!receiver.is_finished());
receiver.receive(&[&[3, 4, 5]]);
assert!(!receiver.is_finished());
receiver.receive(&[&[6, 7, 8, 9]]);
assert!(receiver.is_finished());
}
#[test]
#[should_panic]
fn receive_corruption_test() {
let mut receiver = Data::new(10);
receiver.receive(&[&[1, 2, 3]]);
}
#[test]
#[should_panic]
fn receive_overflow_test() {
let mut receiver = Data::new(2);
receiver.receive(&[&[0, 1, 2]]);
}
#[test]
fn send_test() {
let mut sender = Data::new(10);
assert!(!sender.is_finished());
let mut output = vec![Bytes::new(); 10];
assert_eq!(sender.send(3, &mut output), Some(1));
assert_eq!(output[0].as_ref(), &[0, 1, 2][..]);
assert!(!sender.is_finished());
assert_eq!(sender.send(4, &mut output), Some(1));
assert_eq!(output[0].as_ref(), &[3, 4, 5, 6][..]);
assert!(!sender.is_finished());
assert_eq!(sender.send(3, &mut output), Some(1));
assert_eq!(output[0].as_ref(), &[7, 8, 9][..]);
assert!(sender.is_finished());
assert_eq!(sender.send(1, &mut output), None);
}
#[test]
#[cfg_attr(miri, ignore)] // This test breaks in CI but can't be reproduced locally - https://github.com/aws/s2n-quic/issues/867
fn send_receive() {
let g = (
(1..(DEFAULT_STREAM_LEN * 16)),
(1..(DEFAULT_STREAM_LEN * 2)),
);
check!()
.with_generator(g)
.cloned()
.for_each(|(stream_len, send_amount)| {
let mut sender = Data::new(stream_len);
let mut receiver = Data::new(stream_len);
let mut buf = vec![Bytes::new(); 5];
while let Some(count) = sender.send(send_amount as usize, &mut buf[..]) {
assert_ne!(count, 0, "sender should return None if no chunks were sent");
receiver.receive(&buf[..count]);
}
assert!(sender.is_finished());
assert!(receiver.is_finished());
})
}
#[test]
fn buffer_trait_test() {
use writer::Writer as _;
let mut reader = Data::new(10);
let mut writer = reader;
writer.read_from(&mut reader).unwrap();
assert!(reader.is_finished());
assert!(writer.is_finished());
}
}