Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-dc): import 10/17/24 #2351

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dc/s2n-quic-dc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ bytes = "1"
crossbeam-channel = "0.5"
crossbeam-epoch = "0.9"
crossbeam-queue = { version = "0.3" }
event-listener-strategy = "0.5"
flurry = "0.5"
libc = "0.2"
num-rational = { version = "0.4", default-features = false }
22 changes: 10 additions & 12 deletions dc/s2n-quic-dc/src/credentials.rs
Original file line number Diff line number Diff line change
@@ -16,18 +16,7 @@ pub use s2n_quic_core::varint::VarInt as KeyId;
pub mod testing;

#[derive(
Clone,
Copy,
Default,
PartialEq,
Eq,
Hash,
AsBytes,
FromBytes,
FromZeroes,
Unaligned,
PartialOrd,
Ord,
Clone, Copy, Default, PartialEq, Eq, AsBytes, FromBytes, FromZeroes, Unaligned, PartialOrd, Ord,
)]
#[cfg_attr(
any(test, feature = "testing"),
@@ -36,6 +25,15 @@ pub mod testing;
#[repr(C)]
pub struct Id([u8; 16]);

impl std::hash::Hash for Id {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
// The ID has very high quality entropy already, so write just one half of it to keep hash
// costs as low as possible. For the main use of the Hash impl in the fixed-size ID map
// this translates to just directly using these bytes for the indexing.
state.write_u64(u64::from_ne_bytes(self.0[..8].try_into().unwrap()));
}
}

impl fmt::Debug for Id {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
format_args!("{:#01x}", u128::from_be_bytes(self.0)).fmt(f)
9 changes: 7 additions & 2 deletions dc/s2n-quic-dc/src/fixed_map.rs
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
//! extent possible) reducing the likelihood.

use core::{
fmt::Debug,
hash::Hash,
sync::atomic::{AtomicU8, Ordering},
};
@@ -21,7 +22,7 @@ pub struct Map<K, V, S = RandomState> {

impl<K, V, S> Map<K, V, S>
where
K: Hash + Eq,
K: Hash + Eq + Debug,
S: BuildHasher,
{
pub fn with_capacity(entries: usize, hasher: S) -> Self {
@@ -108,7 +109,7 @@ struct Slot<K, V> {

impl<K, V> Slot<K, V>
where
K: Hash + Eq,
K: Hash + Eq + Debug,
{
fn new() -> Self {
Slot {
@@ -139,6 +140,10 @@ where
// If `new_key` isn't already in this slot, replace one of the existing entries with the
// new key. For now we rotate through based on `next_write`.
let replacement = self.next_write.fetch_add(1, Ordering::Relaxed) as usize % SLOT_CAPACITY;
tracing::trace!(
"evicting {:?} - bucket overflow",
values[replacement].as_mut().unwrap().0
);
values[replacement] = Some((new_key, new_value));
None
}
1 change: 1 addition & 0 deletions dc/s2n-quic-dc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ pub mod random;
pub mod recovery;
pub mod socket;
pub mod stream;
pub mod sync;
pub mod task;

#[cfg(any(test, feature = "testing"))]
10 changes: 10 additions & 0 deletions dc/s2n-quic-dc/src/path/secret.rs
Original file line number Diff line number Diff line change
@@ -12,3 +12,13 @@ pub mod stateless_reset;

pub use key::{open, seal};
pub use map::Map;

/// The handshake operation may return immediately if state for the target is already cached,
/// or perform an actual handshake if not.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum HandshakeKind {
/// Handshake was skipped because a secret was already present in the cache
Cached,
/// Handshake was performed to generate a new secret
Fresh,
}
44 changes: 38 additions & 6 deletions dc/s2n-quic-dc/src/path/secret/map.rs
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ use s2n_quic_core::{
};
use std::{
fmt,
hash::{BuildHasherDefault, Hasher},
net::{Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
@@ -51,6 +52,24 @@ pub struct Map {
pub(super) state: Arc<State>,
}

#[derive(Default)]
pub(super) struct NoopIdHasher(Option<u64>);

impl Hasher for NoopIdHasher {
fn finish(&self) -> u64 {
self.0.unwrap()
}

fn write(&mut self, _bytes: &[u8]) {
unimplemented!()
}

fn write_u64(&mut self, x: u64) {
debug_assert!(self.0.is_none());
self.0 = Some(x);
}
}

// # Managing memory consumption
//
// For regular rotation with live peers, we retain at most two secrets: one derived from the most
@@ -93,7 +112,7 @@ pub(super) struct State {
pub(super) requested_handshakes: flurry::HashSet<SocketAddr>,

// All known entries.
pub(super) ids: fixed_map::Map<Id, Arc<Entry>>,
pub(super) ids: fixed_map::Map<Id, Arc<Entry>, BuildHasherDefault<NoopIdHasher>>,

pub(super) signer: stateless_reset::Signer,

@@ -232,7 +251,7 @@ impl State {
}

impl Map {
pub fn new(signer: stateless_reset::Signer) -> Self {
pub fn new(signer: stateless_reset::Signer, capacity: usize) -> Self {
// FIXME: Avoid unwrap and the whole socket.
//
// We only ever send on this socket - but we really should be sending on the same
@@ -244,11 +263,11 @@ impl Map {
control_socket.set_nonblocking(true).unwrap();
let state = State {
// This is around 500MB with current entry size.
max_capacity: 500_000,
max_capacity: capacity,
// FIXME: Allow configuring the rehandshake_period.
rehandshake_period: Duration::from_secs(3600 * 24),
peers: fixed_map::Map::with_capacity(500_000, Default::default()),
ids: fixed_map::Map::with_capacity(500_000, Default::default()),
peers: fixed_map::Map::with_capacity(capacity, Default::default()),
ids: fixed_map::Map::with_capacity(capacity, Default::default()),
requested_handshakes: Default::default(),
cleaner: Cleaner::new(),
signer,
@@ -301,6 +320,19 @@ impl Map {
Some((sealer, credentials, state.parameters.clone()))
}

/// Retrieve a sealer by path secret ID.
///
/// Generally callers should prefer to use one of the `pair` APIs; this is primarily useful for
/// "response" datagrams which want to be bound to the exact same shared secret.
///
/// Note that unlike by-IP lookup this should typically not be done significantly after the
/// original secret was used for decryption.
pub fn seal_once_id(&self, id: Id) -> Option<(seal::Once, Credentials, ApplicationParams)> {
let state = self.state.ids.get_by_key(&id)?;
let (sealer, credentials) = state.uni_sealer();
Some((sealer, credentials, state.parameters.clone()))
}

pub fn open_once(
&self,
credentials: &Credentials,
@@ -485,7 +517,7 @@ impl Map {
pub fn for_test_with_peers(
peers: Vec<(schedule::Ciphersuite, dc::Version, SocketAddr)>,
) -> (Self, Vec<Id>) {
let provider = Self::new(stateless_reset::Signer::random());
let provider = Self::new(stateless_reset::Signer::random(), peers.len() * 3);
let mut secret = [0; 32];
aws_lc_rs::rand::fill(&mut secret).unwrap();
let mut stateless_reset = [0; control::TAG_LEN];
19 changes: 12 additions & 7 deletions dc/s2n-quic-dc/src/path/secret/map/test.rs
Original file line number Diff line number Diff line change
@@ -32,7 +32,7 @@ fn fake_entry(peer: u16) -> Arc<Entry> {
#[test]
fn cleans_after_delay() {
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 50);

// Stop background processing. We expect to manually invoke clean, and a background worker
// might interfere with our state.
@@ -60,7 +60,7 @@ fn cleans_after_delay() {
#[test]
fn thread_shutdown() {
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 10);
let state = Arc::downgrade(&map.state);
drop(map);

@@ -263,7 +263,7 @@ fn check_invariants() {

let mut model = Model::default();
let signer = stateless_reset::Signer::new(b"secret");
let mut map = Map::new(signer);
let mut map = Map::new(signer, 10_000);

// Avoid background work interfering with testing.
map.state.cleaner.stop();
@@ -293,7 +293,7 @@ fn check_invariants_no_overflow() {

let mut model = Model::default();
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 10_000);

// Avoid background work interfering with testing.
map.state.cleaner.stop();
@@ -316,7 +316,7 @@ fn check_invariants_no_overflow() {
#[ignore = "memory growth takes a long time to run"]
fn no_memory_growth() {
let signer = stateless_reset::Signer::new(b"secret");
let map = Map::new(signer);
let map = Map::new(signer, 100_000);
map.state.cleaner.stop();
for idx in 0..500_000 {
// FIXME: this ends up 2**16 peers in the `peers` map
@@ -325,10 +325,15 @@ fn no_memory_growth() {
}

#[test]
#[cfg(all(target_pointer_width = "64", target_os = "linux"))]
fn entry_size() {
let mut should_check = true;

should_check &= cfg!(target_pointer_width = "64");
should_check &= cfg!(target_os = "linux");
should_check &= std::env::var("S2N_QUIC_RUN_VERSION_SPECIFIC_TESTS").is_ok();

// This gates to running only on specific GHA to reduce false positives.
if std::env::var("S2N_QUIC_RUN_VERSION_SPECIFIC_TESTS").is_ok() {
if should_check {
assert_eq!(fake_entry(0).size(), 238);
}
}
1 change: 1 addition & 0 deletions dc/s2n-quic-dc/src/stream.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ pub const DEFAULT_INFLIGHT_TIMEOUT: Duration = Duration::from_secs(5);
pub const MAX_DATAGRAM_SIZE: usize = 1 << 15; // 32k

pub mod application;
pub mod client;
pub mod crypto;
pub mod endpoint;
pub mod environment;
4 changes: 4 additions & 0 deletions dc/s2n-quic-dc/src/stream/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

pub mod tokio;
121 changes: 121 additions & 0 deletions dc/s2n-quic-dc/src/stream/client/tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
path::secret,
stream::{
application::Stream,
endpoint,
environment::tokio::{self as env, Environment},
socket::Protocol,
},
};
use std::{io, net::SocketAddr};
use tokio::net::TcpStream;

/// Connects using the UDP transport layer
#[inline]
pub async fn connect_udp<H>(
handshake_addr: SocketAddr,
handshake: H,
acceptor_addr: SocketAddr,
env: &Environment,
map: &secret::Map,
) -> io::Result<Stream>
where
H: core::future::Future<Output = io::Result<secret::HandshakeKind>>,
{
// ensure we have a secret for the peer
handshake.await?;

let stream = endpoint::open_stream(
env,
handshake_addr.into(),
env::UdpUnbound(acceptor_addr.into()),
map,
None,
)?;

// build the stream inside the application context
let mut stream = stream.build()?;

debug_assert_eq!(stream.protocol(), Protocol::Udp);

write_prelude(&mut stream).await?;

Ok(stream)
}

/// Connects using the TCP transport layer
#[inline]
pub async fn connect_tcp<H>(
handshake_addr: SocketAddr,
handshake: H,
acceptor_addr: SocketAddr,
env: &Environment,
map: &secret::Map,
) -> io::Result<Stream>
where
H: core::future::Future<Output = io::Result<secret::HandshakeKind>>,
{
// Race TCP handshake with the TLS handshake
let (socket, _) = tokio::try_join!(TcpStream::connect(acceptor_addr), handshake,)?;

let stream = endpoint::open_stream(
env,
handshake_addr.into(),
env::TcpRegistered(socket),
map,
None,
)?;

// build the stream inside the application context
let mut stream = stream.build()?;

debug_assert_eq!(stream.protocol(), Protocol::Tcp);

write_prelude(&mut stream).await?;

Ok(stream)
}

/// Connects with a pre-existing TCP stream
///
/// # Note
///
/// The provided `map` must contain a shared secret for the `handshake_addr`
#[inline]
pub async fn connect_tcp_with(
handshake_addr: SocketAddr,
stream: TcpStream,
env: &Environment,
map: &secret::Map,
) -> io::Result<Stream> {
let stream = endpoint::open_stream(
env,
handshake_addr.into(),
env::TcpRegistered(stream),
map,
None,
)?;

// build the stream inside the application context
let mut stream = stream.build()?;

debug_assert_eq!(stream.protocol(), Protocol::Tcp);

write_prelude(&mut stream).await?;

Ok(stream)
}

#[inline]
async fn write_prelude(stream: &mut Stream) -> io::Result<()> {
// TODO should we actually write the prelude here or should we do late sealer binding on
// the first packet to reduce secret reordering on the peer

stream
.write_from(&mut s2n_quic_core::buffer::reader::storage::Empty)
.await
.map(|_| ())
}
Loading

Unchanged files with check annotations Beta

rm -rf /var/cache/yum; \
curl https://sh.rustup.rs -sSf | bash -s -- -y;
WORKDIR app

Check warning on line 13 in quic/s2n-quic-qns/etc/Dockerfile

GitHub Actions / qns

Relative workdir without an absolute workdir declared within the build can have unexpected results if the base image changes

WorkdirRelativePath: Relative workdir "app" can have unexpected results if the base image changes More info: https://docs.docker.com/go/dockerfile/rule/workdir-relative-path/
RUN set -eux; \
source /root/.cargo/env; \