Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d5f1a74

Browse files
author
iHsin
committedMar 23, 2024·
perf: replace Mutex with RwLock
1 parent e2beca8 commit d5f1a74

File tree

5 files changed

+43
-37
lines changed

5 files changed

+43
-37
lines changed
 

‎Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[workspace]
22
members = ["tuic", "tuic-quinn", "tuic-server", "tuic-client"]
3+
resolver = "2"
34

45
[profile.release]
56
opt-level = 3

‎tuic-client/src/connection/mod.rs

+12-13
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use crate::{
55
};
66
use crossbeam_utils::atomic::AtomicCell;
77
use once_cell::sync::OnceCell;
8-
use parking_lot::Mutex;
98
use quinn::{
109
congestion::{BbrConfig, CubicConfig, NewRenoConfig},
1110
ClientConfig, Connection as QuinnConnection, Endpoint as QuinnEndpoint, EndpointConfig,
@@ -18,18 +17,17 @@ use std::{
1817
sync::{atomic::AtomicU32, Arc},
1918
time::Duration,
2019
};
21-
use tokio::{
22-
sync::{Mutex as AsyncMutex, OnceCell as AsyncOnceCell},
23-
time,
24-
};
20+
use tokio::sync::RwLock as AsyncRwLock;
21+
use tokio::{sync::OnceCell as AsyncOnceCell, time};
22+
2523
use tuic_quinn::{side, Connection as Model};
2624
use uuid::Uuid;
2725

2826
mod handle_stream;
2927
mod handle_task;
3028

31-
static ENDPOINT: OnceCell<Mutex<Endpoint>> = OnceCell::new();
32-
static CONNECTION: AsyncOnceCell<AsyncMutex<Connection>> = AsyncOnceCell::const_new();
29+
static ENDPOINT: OnceCell<AsyncRwLock<Endpoint>> = OnceCell::new();
30+
static CONNECTION: AsyncOnceCell<AsyncRwLock<Connection>> = AsyncOnceCell::const_new();
3331
static TIMEOUT: AtomicCell<Duration> = AtomicCell::new(Duration::from_secs(0));
3432

3533
pub const ERROR_CODE: VarInt = VarInt::from_u32(0);
@@ -117,7 +115,7 @@ impl Connection {
117115
};
118116

119117
ENDPOINT
120-
.set(Mutex::new(ep))
118+
.set(AsyncRwLock::new(ep))
121119
.map_err(|_| "endpoint already initialized")
122120
.unwrap();
123121

@@ -131,21 +129,22 @@ impl Connection {
131129
ENDPOINT
132130
.get()
133131
.unwrap()
134-
.lock()
132+
.read()
133+
.await
135134
.connect()
136135
.await
137-
.map(AsyncMutex::new)
136+
.map(AsyncRwLock::new)
138137
};
139138

140139
let try_get_conn = async {
141140
let mut conn = CONNECTION
142141
.get_or_try_init(|| try_init_conn)
143142
.await?
144-
.lock()
143+
.write()
145144
.await;
146145

147146
if conn.is_closed() {
148-
let new_conn = ENDPOINT.get().unwrap().lock().connect().await?;
147+
let new_conn = ENDPOINT.get().unwrap().read().await.connect().await?;
149148
*conn = new_conn;
150149
}
151150

@@ -254,7 +253,7 @@ struct Endpoint {
254253
}
255254

256255
impl Endpoint {
257-
async fn connect(&mut self) -> Result<Connection, Error> {
256+
async fn connect(&self) -> Result<Connection, Error> {
258257
let mut last_err = None;
259258

260259
for addr in self.server.resolve().await? {

‎tuic-server/src/connection/handle_task.rs

+20-14
Original file line numberDiff line numberDiff line change
@@ -119,18 +119,24 @@ impl Connection {
119119
src_addr = addr,
120120
);
121121

122-
let session = match self.udp_sessions.lock().entry(assoc_id) {
123-
Entry::Occupied(entry) => entry.get().clone(),
124-
Entry::Vacant(entry) => {
125-
let session = UdpSession::new(
126-
self.clone(),
127-
assoc_id,
128-
self.udp_relay_ipv6,
129-
self.max_external_pkt_size,
130-
)?;
131-
entry.insert(session.clone());
132-
session
133-
}
122+
let guard = self.udp_sessions.read().await;
123+
let session = guard.get(&assoc_id).map(|v| v.to_owned());
124+
drop(guard);
125+
let session = match session {
126+
Some(v) => v,
127+
None => match self.udp_sessions.write().await.entry(assoc_id) {
128+
Entry::Occupied(entry) => entry.get().clone(),
129+
Entry::Vacant(entry) => {
130+
let session = UdpSession::new(
131+
self.clone(),
132+
assoc_id,
133+
self.udp_relay_ipv6,
134+
self.max_external_pkt_size,
135+
)?;
136+
entry.insert(session.clone());
137+
session
138+
}
139+
},
134140
};
135141

136142
let Some(socket_addr) = resolve_dns(&addr).await?.next() else {
@@ -162,8 +168,8 @@ impl Connection {
162168
user = self.auth,
163169
);
164170

165-
if let Some(session) = self.udp_sessions.lock().remove(&assoc_id) {
166-
session.close();
171+
if let Some(session) = self.udp_sessions.write().await.remove(&assoc_id) {
172+
session.close().await;
167173
}
168174
}
169175

‎tuic-server/src/connection/mod.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use self::{authenticated::Authenticated, udp_session::UdpSession};
22
use crate::{error::Error, utils::UdpRelayMode};
33
use crossbeam_utils::atomic::AtomicCell;
4-
use parking_lot::Mutex;
54
use quinn::{Connecting, Connection as QuinnConnection, VarInt};
65
use register_count::Counter;
76
use std::{
87
collections::HashMap,
98
sync::{atomic::AtomicU32, Arc},
109
time::Duration,
1110
};
11+
use tokio::sync::RwLock as AsyncRwLock;
1212
use tokio::time;
1313
use tuic_quinn::{side, Authenticate, Connection as Model};
1414
use uuid::Uuid;
@@ -29,7 +29,7 @@ pub struct Connection {
2929
udp_relay_ipv6: bool,
3030
auth: Authenticated,
3131
task_negotiation_timeout: Duration,
32-
udp_sessions: Arc<Mutex<HashMap<u16, UdpSession>>>,
32+
udp_sessions: Arc<AsyncRwLock<HashMap<u16, UdpSession>>>,
3333
udp_relay_mode: Arc<AtomicCell<Option<UdpRelayMode>>>,
3434
max_external_pkt_size: usize,
3535
remote_uni_stream_cnt: Counter,
@@ -147,7 +147,7 @@ impl Connection {
147147
udp_relay_ipv6,
148148
auth: Authenticated::new(),
149149
task_negotiation_timeout,
150-
udp_sessions: Arc::new(Mutex::new(HashMap::new())),
150+
udp_sessions: Arc::new(AsyncRwLock::new(HashMap::new())),
151151
udp_relay_mode: Arc::new(AtomicCell::new(None)),
152152
max_external_pkt_size,
153153
remote_uni_stream_cnt: Counter::new(),
@@ -157,15 +157,15 @@ impl Connection {
157157
}
158158
}
159159

160-
fn authenticate(&self, auth: &Authenticate) -> Result<(), Error> {
160+
async fn authenticate(&self, auth: &Authenticate) -> Result<(), Error> {
161161
if self.auth.get().is_some() {
162162
Err(Error::DuplicatedAuth)
163163
} else if self
164164
.users
165165
.get(&auth.uuid())
166166
.map_or(false, |password| auth.validate(password))
167167
{
168-
self.auth.set(auth.uuid());
168+
self.auth.set(auth.uuid()).await;
169169
Ok(())
170170
} else {
171171
Err(Error::AuthFailed(auth.uuid()))

‎tuic-server/src/connection/udp_session.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use super::Connection;
22
use crate::error::Error;
33
use bytes::Bytes;
4-
use parking_lot::Mutex;
54
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
65
use std::{
76
io::Error as IoError,
87
net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket as StdUdpSocket},
98
sync::Arc,
109
};
10+
use tokio::sync::RwLock as AsyncRwLock;
1111
use tokio::{
1212
net::UdpSocket,
1313
sync::oneshot::{self, Sender},
@@ -23,7 +23,7 @@ struct UdpSessionInner {
2323
socket_v4: UdpSocket,
2424
socket_v6: Option<UdpSocket>,
2525
max_pkt_size: usize,
26-
close: Mutex<Option<Sender<()>>>,
26+
close: AsyncRwLock<Option<Sender<()>>>,
2727
}
2828

2929
impl UdpSession {
@@ -89,7 +89,7 @@ impl UdpSession {
8989
socket_v4,
9090
socket_v6,
9191
max_pkt_size,
92-
close: Mutex::new(Some(tx)),
92+
close: AsyncRwLock::new(Some(tx)),
9393
}));
9494

9595
let session_listening = session.clone();
@@ -161,7 +161,7 @@ impl UdpSession {
161161
}
162162
}
163163

164-
pub fn close(&self) {
165-
let _ = self.0.close.lock().take().unwrap().send(());
164+
pub async fn close(&self) {
165+
let _ = self.0.close.write().await.take().unwrap().send(());
166166
}
167167
}

0 commit comments

Comments
 (0)
Please sign in to comment.