Skip to content

Commit 621f250

Browse files
authored
Use MessageWriter downstairs (#1286)
While staring at flamegraphs, I noticed a `memcpy` in the flamegraph. The whole point of #1206 was to reorganize messages so that bulk data could be sent down the wire in-place, without an intermediate copy. However, it turns out that I forgot to actually use the new `MessageWriter` in the Crucible Downstairs! These changes switch to using the `MessageWriter` downstairs, removing the suspicious `memcpy`. It's hard to tell if there's a performance impact, but this still seems worthwhile – if only to remove another big chunk of the flamegraph.
1 parent 62cc2cf commit 621f250

File tree

1 file changed

+15
-15
lines changed

1 file changed

+15
-15
lines changed

downstairs/src/lib.rs

+15-15
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,20 @@ use crucible_common::{
1919
verbose_timeout, Block, CrucibleError, RegionDefinition, MAX_BLOCK_SIZE,
2020
};
2121
use crucible_protocol::{
22-
BlockContext, CrucibleDecoder, CrucibleEncoder, JobId, Message,
23-
ReadRequest, ReconciliationId, SnapshotDetails, CRUCIBLE_MESSAGE_VERSION,
22+
BlockContext, CrucibleDecoder, JobId, Message, MessageWriter, ReadRequest,
23+
ReconciliationId, SnapshotDetails, CRUCIBLE_MESSAGE_VERSION,
2424
};
2525
use repair_client::Client;
2626

2727
use anyhow::{bail, Result};
2828
use bytes::BytesMut;
29-
use futures::{SinkExt, StreamExt};
29+
use futures::StreamExt;
3030
use rand::prelude::*;
3131
use slog::{debug, error, info, o, warn, Logger};
3232
use tokio::net::TcpListener;
3333
use tokio::sync::{mpsc, oneshot};
3434
use tokio::time::{sleep_until, Instant};
35-
use tokio_util::codec::{FramedRead, FramedWrite};
35+
use tokio_util::codec::FramedRead;
3636
use uuid::Uuid;
3737

3838
pub mod admin;
@@ -594,15 +594,15 @@ async fn proc_stream(
594594
let (read, write) = sock.into_split();
595595

596596
let fr = FramedRead::new(read, CrucibleDecoder::new());
597-
let fw = FramedWrite::new(write, CrucibleEncoder::new());
597+
let fw = MessageWriter::new(write);
598598

599599
proc(ads, fr, fw).await
600600
}
601601
WrappedStream::Https(stream) => {
602602
let (read, write) = tokio::io::split(stream);
603603

604604
let fr = FramedRead::new(read, CrucibleDecoder::new());
605-
let fw = FramedWrite::new(write, CrucibleEncoder::new());
605+
let fw = MessageWriter::new(write);
606606

607607
proc(ads, fr, fw).await
608608
}
@@ -626,7 +626,7 @@ pub struct UpstairsConnection {
626626
async fn proc<RT, WT>(
627627
ads: &Arc<Mutex<Downstairs>>,
628628
mut fr: FramedRead<RT, CrucibleDecoder>,
629-
mut fw: FramedWrite<WT, CrucibleEncoder>,
629+
mut fw: MessageWriter<WT>,
630630
) -> Result<()>
631631
where
632632
RT: tokio::io::AsyncRead + std::marker::Unpin + std::marker::Send,
@@ -1074,7 +1074,7 @@ where
10741074

10751075
async fn reply_task<WT>(
10761076
mut resp_channel_rx: mpsc::UnboundedReceiver<Message>,
1077-
mut fw: FramedWrite<WT, CrucibleEncoder>,
1077+
mut fw: MessageWriter<WT>,
10781078
) -> Result<()>
10791079
where
10801080
WT: tokio::io::AsyncWrite
@@ -1096,7 +1096,7 @@ where
10961096
async fn resp_loop<RT, WT>(
10971097
ads: &Arc<Mutex<Downstairs>>,
10981098
mut fr: FramedRead<RT, CrucibleDecoder>,
1099-
fw: FramedWrite<WT, CrucibleEncoder>,
1099+
fw: MessageWriter<WT>,
11001100
mut another_upstairs_active_rx: oneshot::Receiver<UpstairsConnection>,
11011101
upstairs_connection: UpstairsConnection,
11021102
) -> Result<()>
@@ -1120,7 +1120,7 @@ where
11201120
* Create tasks for:
11211121
* Doing the work then sending the ACK
11221122
* Pulling work off the socket and putting on the work queue.
1123-
* Sending messages back on the FramedWrite
1123+
* Sending messages back on the MessageWriter
11241124
*
11251125
* These tasks and this function must be able to handle the
11261126
* Upstairs connection going away at any time, as well as a forced
@@ -6174,7 +6174,7 @@ mod test {
61746174
let tcp = start_ds_and_connect(5555, 5556).await.unwrap();
61756175
let (read, write) = tcp.into_split();
61766176
let mut fr = FramedRead::new(read, CrucibleDecoder::new());
6177-
let mut fw = FramedWrite::new(write, CrucibleEncoder::new());
6177+
let mut fw = MessageWriter::new(write);
61786178

61796179
// Our downstairs version is CRUCIBLE_MESSAGE_VERSION
61806180
let m = Message::HereIAm {
@@ -6212,7 +6212,7 @@ mod test {
62126212
let tcp = start_ds_and_connect(5557, 5558).await.unwrap();
62136213
let (read, write) = tcp.into_split();
62146214
let mut fr = FramedRead::new(read, CrucibleDecoder::new());
6215-
let mut fw = FramedWrite::new(write, CrucibleEncoder::new());
6215+
let mut fw = MessageWriter::new(write);
62166216

62176217
// Our downstairs version is CRUCIBLE_MESSAGE_VERSION
62186218
let m = Message::HereIAm {
@@ -6246,7 +6246,7 @@ mod test {
62466246
let tcp = start_ds_and_connect(5579, 5560).await.unwrap();
62476247
let (read, write) = tcp.into_split();
62486248
let mut fr = FramedRead::new(read, CrucibleDecoder::new());
6249-
let mut fw = FramedWrite::new(write, CrucibleEncoder::new());
6249+
let mut fw = MessageWriter::new(write);
62506250

62516251
// Our downstairs version is CRUCIBLE_MESSAGE_VERSION
62526252
let m = Message::HereIAm {
@@ -6280,7 +6280,7 @@ mod test {
62806280
let tcp = start_ds_and_connect(5561, 5562).await.unwrap();
62816281
let (read, write) = tcp.into_split();
62826282
let mut fr = FramedRead::new(read, CrucibleDecoder::new());
6283-
let mut fw = FramedWrite::new(write, CrucibleEncoder::new());
6283+
let mut fw = MessageWriter::new(write);
62846284

62856285
// Our downstairs version is CRUCIBLE_MESSAGE_VERSION
62866286
let m = Message::HereIAm {
@@ -6321,7 +6321,7 @@ mod test {
63216321
let tcp = start_ds_and_connect(5563, 5564).await.unwrap();
63226322
let (read, write) = tcp.into_split();
63236323
let mut fr = FramedRead::new(read, CrucibleDecoder::new());
6324-
let mut fw = FramedWrite::new(write, CrucibleEncoder::new());
6324+
let mut fw = MessageWriter::new(write);
63256325

63266326
// Our downstairs version is CRUCIBLE_MESSAGE_VERSION
63276327
let m = Message::HereIAm {

0 commit comments

Comments
 (0)