Skip to content

Commit 1490c9c

Browse files
committed
feat(s2n-quic-dc): accept linger parameter instead of always setting it
1 parent 00e3371 commit 1490c9c

File tree

6 files changed

+53
-9
lines changed

6 files changed

+53
-9
lines changed

dc/s2n-quic-dc/src/stream/client/tokio.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
socket::Protocol,
1212
},
1313
};
14-
use std::{io, net::SocketAddr};
14+
use std::{io, net::SocketAddr, time::Duration};
1515
use tokio::net::TcpStream;
1616

1717
/// Connects using the UDP transport layer
@@ -54,6 +54,7 @@ pub async fn connect_tcp<H, Sub>(
5454
acceptor_addr: SocketAddr,
5555
env: &Environment<Sub>,
5656
subscriber: Sub,
57+
linger: Option<Duration>,
5758
) -> io::Result<Stream<Sub>>
5859
where
5960
H: core::future::Future<Output = io::Result<secret::map::Peer>>,
@@ -64,7 +65,10 @@ where
6465

6566
// Make sure TCP_NODELAY is set
6667
let _ = socket.set_nodelay(true);
67-
let _ = socket.set_linger(Some(core::time::Duration::ZERO));
68+
69+
if linger.is_some() {
70+
let _ = socket.set_linger(linger);
71+
}
6872

6973
// if the acceptor_ip isn't known, then ask the socket to resolve it for us
7074
let peer_addr = if acceptor_addr.ip().is_unspecified() {

dc/s2n-quic-dc/src/stream/server/tokio/tcp.rs

+5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
};
1010
use core::{future::poll_fn, task::Poll};
1111
use s2n_quic_core::{inet::SocketAddress, time::Clock};
12+
use std::time::Duration;
1213
use tokio::net::TcpListener;
1314
use tracing::debug;
1415

@@ -26,6 +27,7 @@ where
2627
secrets: secret::Map,
2728
backlog: usize,
2829
accept_flavor: accept::Flavor,
30+
linger: Option<Duration>,
2931
subscriber: Sub,
3032
}
3133

@@ -42,6 +44,7 @@ where
4244
secrets: &secret::Map,
4345
backlog: usize,
4446
accept_flavor: accept::Flavor,
47+
linger: Option<Duration>,
4548
subscriber: Sub,
4649
) -> Self {
4750
let acceptor = Self {
@@ -51,6 +54,7 @@ where
5154
secrets: secrets.clone(),
5255
backlog,
5356
accept_flavor,
57+
linger,
5458
subscriber,
5559
};
5660

@@ -98,6 +102,7 @@ where
98102
workers.insert(
99103
remote_address,
100104
socket,
105+
self.linger,
101106
&mut context,
102107
subscriber_ctx,
103108
&publisher,

dc/s2n-quic-dc/src/stream/server/tokio/tcp/manager.rs

+3
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ where
153153
&mut self,
154154
remote_address: SocketAddress,
155155
stream: W::Stream,
156+
linger: Option<Duration>,
156157
cx: &mut W::Context,
157158
connection_context: W::ConnectionContext,
158159
publisher: &Pub,
@@ -179,6 +180,7 @@ where
179180
self.inner.workers[idx].worker.replace(
180181
remote_address,
181182
stream,
183+
linger,
182184
connection_context,
183185
publisher,
184186
clock,
@@ -377,6 +379,7 @@ pub trait Worker {
377379
&mut self,
378380
remote_address: SocketAddress,
379381
stream: Self::Stream,
382+
linger: Option<Duration>,
380383
connection_context: Self::ConnectionContext,
381384
publisher: &Pub,
382385
clock: &C,

dc/s2n-quic-dc/src/stream/server/tokio/tcp/manager/tests.rs

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ impl super::Worker for Worker {
6464
&mut self,
6565
_remote_address: SocketAddress,
6666
_stream: Self::Stream,
67+
_linger: Option<Duration>,
6768
_connection_context: Self::ConnectionContext,
6869
_publisher: &Pub,
6970
clock: &C,
@@ -160,6 +161,7 @@ impl Harness {
160161
self.manager.insert(
161162
SocketAddress::default(),
162163
(),
164+
None,
163165
&mut (),
164166
(),
165167
&publisher(&self.subscriber, &self.clock),

dc/s2n-quic-dc/src/stream/server/tokio/tcp/worker.rs

+18-4
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ where
9898
&mut self,
9999
remote_address: SocketAddress,
100100
stream: TcpStream,
101+
linger: Option<Duration>,
101102
subscriber_ctx: Self::ConnectionContext,
102103
publisher: &Pub,
103104
clock: &C,
@@ -107,7 +108,10 @@ where
107108
{
108109
// Make sure TCP_NODELAY is set
109110
let _ = stream.set_nodelay(true);
110-
let _ = stream.set_linger(Some(Duration::ZERO));
111+
112+
if linger.is_some() {
113+
let _ = stream.set_linger(linger);
114+
}
111115

112116
let now = clock.get_time();
113117

@@ -331,6 +335,10 @@ impl WorkerState {
331335
error: error.error,
332336
};
333337
continue;
338+
} else {
339+
// close the stream immediately and send a reset to the client
340+
let _ = socket.set_linger(Some(Duration::ZERO));
341+
drop(socket);
334342
}
335343
}
336344
return Err(Some(error.error)).into();
@@ -381,16 +389,15 @@ impl WorkerState {
381389
}
382390

383391
#[inline]
384-
fn poll_initial_packet<S, Pub>(
392+
fn poll_initial_packet<Pub>(
385393
cx: &mut task::Context,
386-
stream: &mut S,
394+
stream: &mut TcpStream,
387395
remote_address: &SocketAddress,
388396
recv_buffer: &mut msg::recv::Message,
389397
sojourn_time: Duration,
390398
publisher: &Pub,
391399
) -> Poll<Result<server::InitialPacket, Option<io::Error>>>
392400
where
393-
S: Socket,
394401
Pub: EndpointPublisher,
395402
{
396403
loop {
@@ -403,6 +410,10 @@ impl WorkerState {
403410
sojourn_time,
404411
},
405412
);
413+
414+
// close the stream immediately and send a reset to the client
415+
let _ = stream.set_linger(Some(Duration::ZERO));
416+
406417
return Err(None).into();
407418
}
408419

@@ -437,6 +448,9 @@ impl WorkerState {
437448
},
438449
);
439450

451+
// close the stream immediately and send a reset to the client
452+
let _ = stream.set_linger(Some(Duration::ZERO));
453+
440454
return Err(None).into();
441455
}
442456
}

dc/s2n-quic-dc/src/stream/testing.rs

+19-3
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,14 @@ impl Client {
6868

6969
match server.protocol {
7070
Protocol::Tcp => {
71-
stream_client::connect_tcp(handshake, server.local_addr, &self.env, subscriber)
72-
.await
71+
stream_client::connect_tcp(
72+
handshake,
73+
server.local_addr,
74+
&self.env,
75+
subscriber,
76+
None,
77+
)
78+
.await
7379
}
7480
Protocol::Udp => {
7581
stream_client::connect_udp(handshake, server.local_addr, &self.env, subscriber)
@@ -181,6 +187,8 @@ mod drop_handle {
181187
}
182188

183189
pub mod server {
190+
use std::time::Duration;
191+
184192
use super::*;
185193

186194
#[derive(Clone)]
@@ -201,6 +209,7 @@ pub mod server {
201209
flavor: accept::Flavor,
202210
protocol: Protocol,
203211
map_capacity: usize,
212+
linger: Option<Duration>,
204213
subscriber: event::testing::Subscriber,
205214
}
206215

@@ -211,6 +220,7 @@ pub mod server {
211220
flavor: accept::Flavor::default(),
212221
protocol: Protocol::Tcp,
213222
map_capacity: 16,
223+
linger: None,
214224
subscriber: event::testing::Subscriber::no_snapshot(),
215225
}
216226
}
@@ -255,6 +265,11 @@ pub mod server {
255265
self
256266
}
257267

268+
pub fn linger(mut self, linger: Duration) -> Self {
269+
self.linger = Some(linger);
270+
self
271+
}
272+
258273
pub fn subscriber(mut self, subscriber: event::testing::Subscriber) -> Self {
259274
self.subscriber = subscriber;
260275
self
@@ -266,6 +281,7 @@ pub mod server {
266281
flavor,
267282
protocol,
268283
map_capacity,
284+
linger,
269285
subscriber,
270286
} = self;
271287

@@ -291,7 +307,7 @@ pub mod server {
291307
let socket = tokio::net::TcpListener::from_std(socket).unwrap();
292308

293309
let acceptor = stream_server::tcp::Acceptor::new(
294-
0, socket, &sender, &env, &map, backlog, flavor, subscriber,
310+
0, socket, &sender, &env, &map, backlog, flavor, linger, subscriber,
295311
);
296312
let acceptor = drop_handle_receiver.wrap(acceptor.run());
297313
let acceptor = acceptor.instrument(tracing::info_span!("tcp"));

0 commit comments

Comments
 (0)