Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add per-client backpressure #1181

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,22 @@ use crate::{
use crucible_common::x509::TLSContext;
use crucible_protocol::{ReconciliationId, CRUCIBLE_MESSAGE_VERSION};

use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};
use std::{
collections::BTreeSet,
net::SocketAddr,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
};

use futures::StreamExt;
use slog::{debug, error, info, o, warn, Logger};
use tokio::{
io::AsyncWriteExt,
net::{TcpSocket, TcpStream},
sync::{mpsc, oneshot},
time::sleep_until,
time::{sleep_until, Duration},
};
use tokio_util::codec::{Encoder, FramedRead};
use uuid::Uuid;
Expand Down Expand Up @@ -216,6 +223,9 @@ pub(crate) struct DownstairsClient {

/// Session ID for a clients connection to a downstairs.
connection_id: ConnectionId,

/// Per-client delay, shared with the [`DownstairsClient`]
client_delay_us: Arc<AtomicU64>,
}

impl DownstairsClient {
Expand All @@ -226,6 +236,7 @@ impl DownstairsClient {
log: Logger,
tls_context: Option<Arc<crucible_common::x509::TLSContext>>,
) -> Self {
let client_delay_us = Arc::new(AtomicU64::new(0));
Self {
cfg,
client_task: Self::new_io_task(
Expand All @@ -234,6 +245,7 @@ impl DownstairsClient {
false, // do not start the task until GoActive
client_id,
tls_context.clone(),
client_delay_us.clone(),
&log,
),
client_id,
Expand All @@ -253,6 +265,7 @@ impl DownstairsClient {
repair_info: None,
io_state_count: ClientIOStateCount::new(),
connection_id: ConnectionId(0),
client_delay_us,
}
}

Expand All @@ -262,6 +275,7 @@ impl DownstairsClient {
/// client will disappear into the void.
#[cfg(test)]
fn test_default() -> Self {
let client_delay_us = Arc::new(AtomicU64::new(0));
let cfg = Arc::new(UpstairsConfig {
encryption_context: None,
upstairs_id: Uuid::new_v4(),
Expand Down Expand Up @@ -290,6 +304,7 @@ impl DownstairsClient {
repair_info: None,
io_state_count: ClientIOStateCount::new(),
connection_id: ConnectionId(0),
client_delay_us,
}
}

Expand Down Expand Up @@ -628,6 +643,7 @@ impl DownstairsClient {
}

self.connection_id.update();

// Restart with a short delay
self.start_task(true, auto_promote);
}
Expand All @@ -652,6 +668,7 @@ impl DownstairsClient {
connect,
self.client_id,
self.tls_context.clone(),
self.client_delay_us.clone(),
&self.log,
);
}
Expand All @@ -662,6 +679,7 @@ impl DownstairsClient {
connect: bool,
client_id: ClientId,
tls_context: Option<Arc<TLSContext>>,
client_delay_us: Arc<AtomicU64>,
log: &Logger,
) -> ClientTaskHandle {
#[cfg(test)]
Expand All @@ -672,6 +690,7 @@ impl DownstairsClient {
connect,
client_id,
tls_context,
client_delay_us,
log,
)
} else {
Expand All @@ -685,6 +704,7 @@ impl DownstairsClient {
connect,
client_id,
tls_context,
client_delay_us,
log,
)
}
Expand All @@ -695,6 +715,7 @@ impl DownstairsClient {
connect: bool,
client_id: ClientId,
tls_context: Option<Arc<TLSContext>>,
client_delay_us: Arc<AtomicU64>,
log: &Logger,
) -> ClientTaskHandle {
// These channels must support at least MAX_ACTIVE_COUNT messages;
Expand Down Expand Up @@ -730,6 +751,7 @@ impl DownstairsClient {
log: log.clone(),
},
delay,
client_delay_us,
log,
};
c.run().await
Expand Down Expand Up @@ -1242,6 +1264,8 @@ impl DownstairsClient {
return false;
}

job.reply_time[self.client_id] = Some(std::time::Instant::now());

let mut jobs_completed_ok = job.state_count().completed_ok();
let mut ackable = false;

Expand Down Expand Up @@ -2220,6 +2244,16 @@ impl DownstairsClient {
None
}
}

/// Sets the per-client delay
pub(crate) fn set_delay_us(&self, delay: u64) {
self.client_delay_us.store(delay, Ordering::Relaxed);
}

/// Looks up the per-client delay
pub(crate) fn get_delay_us(&self) -> u64 {
self.client_delay_us.load(Ordering::Relaxed)
}
}

/// How to handle "promote to active" requests
Expand Down Expand Up @@ -2420,6 +2454,9 @@ struct ClientIoTask {
/// Handle for the rx task
recv_task: ClientRxTask,

/// Shared handle to receive per-client backpressure delay
client_delay_us: Arc<AtomicU64>,

log: Logger,
}

Expand Down Expand Up @@ -2692,6 +2729,24 @@ impl ClientIoTask {
+ std::marker::Send
+ 'static,
{
// Delay communication with this client based on backpressure, to keep
// the three clients relatively in sync with each other.
//
// We don't need to delay writes, because they're already constrained by
// the global backpressure system and cannot build up an unbounded
// queue. This is admittedly quite subtle; see crucible#1167 for
// discussions and graphs.
if !matches!(
m,
ClientRequest::Message(Message::Write { .. })
| ClientRequest::RawMessage(RawMessage::Write { .. }, ..)
) {
let d = self.client_delay_us.load(Ordering::Relaxed);
if d > 0 {
tokio::time::sleep(Duration::from_micros(d)).await;
}
}

// There's some duplication between this function and `cmd_loop` above,
// but it's not obvious whether there's a cleaner way to organize stuff.
tokio::select! {
Expand Down
37 changes: 37 additions & 0 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,11 @@ impl Downstairs {
// Restart the IO task for that specific client
self.clients[client_id].reinitialize(auto_promote);

for i in ClientId::iter() {
// Clear per-client delay, because we're starting a new session
self.clients[i].set_delay_us(0);
}

// Special-case: if a Downstairs goes away midway through initial
// reconciliation, then we have to manually abort reconciliation.
if self.clients.iter().any(|c| c.state() == DsState::Reconcile) {
Expand Down Expand Up @@ -1326,6 +1331,7 @@ impl Downstairs {
guest_id: gw_id,
work: noop_ioop,
state: ClientData::new(IOState::New),
reply_time: ClientData::new(None),
acked: false,
replay: false,
data: None,
Expand Down Expand Up @@ -1449,6 +1455,7 @@ impl Downstairs {
guest_id: gw_id,
work: repair_ioop,
state: ClientData::new(IOState::New),
reply_time: ClientData::new(None),
acked: false,
replay: false,
data: None,
Expand Down Expand Up @@ -1589,6 +1596,7 @@ impl Downstairs {
guest_id: gw_id,
work: reopen_ioop,
state: ClientData::new(IOState::New),
reply_time: ClientData::new(None),
acked: false,
replay: false,
data: None,
Expand Down Expand Up @@ -1662,6 +1670,7 @@ impl Downstairs {
guest_id: gw_id,
work: aread,
state: ClientData::new(IOState::New),
reply_time: ClientData::new(None),
acked: false,
replay: false,
data: None,
Expand Down Expand Up @@ -1732,6 +1741,7 @@ impl Downstairs {
guest_id: gw_id,
work: awrite,
state: ClientData::new(IOState::New),
reply_time: ClientData::new(None),
acked: false,
replay: false,
data: None,
Expand Down Expand Up @@ -1762,6 +1772,7 @@ impl Downstairs {
guest_id: gw_id,
work: close_ioop,
state: ClientData::new(IOState::New),
reply_time: ClientData::new(None),
acked: false,
replay: false,
data: None,
Expand Down Expand Up @@ -2087,6 +2098,7 @@ impl Downstairs {
guest_id: gw_id,
work: flush,
state: ClientData::new(IOState::New),
reply_time: ClientData::new(None),
acked: false,
replay: false,
data: None,
Expand Down Expand Up @@ -2211,6 +2223,7 @@ impl Downstairs {
guest_id,
work: aread,
state: ClientData::new(IOState::New),
reply_time: ClientData::new(None),
acked: false,
replay: false,
data: None,
Expand Down Expand Up @@ -3166,6 +3179,30 @@ impl Downstairs {
self.ackable_work.insert(ds_id);
}

if job.reply_time.iter().all(Option::is_some) && !job.replay {
let fastest_id = ClientId::iter()
.min_by_key(|i| job.reply_time[*i].unwrap())
.unwrap();
let slowest_time = *job.reply_time.iter().flatten().max().unwrap();

// Apply a delay to the fastest client, and clear the delay for the
// other two clients.
for i in ClientId::iter() {
let delay_time_us: u64 = if i == fastest_id {
// 0 delay below 10ms of lead time, then linearly increasing
// to a maximum of 10 ms. These are all roughly-eyeballed
// numbers!
let dt = slowest_time - job.reply_time[i].unwrap();
let lead_time_us: u64 = dt.as_micros().try_into().unwrap();
(lead_time_us.saturating_sub(10_000) / 100).min(10_000)
} else {
0
};

self.clients[i].set_delay_us(delay_time_us);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way we can see this from the outside? It would be interesting to be able
to track this and watch how it changes, but I'm not sure how to bubble it out to
where we could grab it with the rest of the collect_stats().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to the existing up-status DTrace probe in 597b950

(The string is now 768 bytes long with all zeros, so still well below the strsize=1k limit)

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're not setting the per-client time, should we clear it? What if one Downstairs is Faulted, or in replay - the previously set delays will still apply.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're not setting the per-client time, should we clear it?

I don't think so. If we did that, then the first two replies to a job would clear the delay, then the third reply would set it (before it's cleared again by the first reply to the next job).

What if one Downstairs is Faulted, or in replay - the previously set delays will still apply.

I added logic to clear the per-client delay in Client::reinitialize (1190fa3).

Maybe we should clear it if any of the clients restart? Otherwise, I could imagine a situation where

  • DS1 is temporarily slower than DS2, so we delay DS2
  • Then DS3 returns an error, so we restart it
  • We're still delaying DS2, and that won't change until DS3 comes back online

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. If we did that, then the first two replies to a job would clear the delay, then the third reply would set it (before it's cleared again by the first reply to the next job).

👍

Maybe we should clear it if any of the clients restart?

That sounds like a good idea, yeah.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, done in 80b6f73


/*
* If all 3 jobs are done, we can check here to see if we can
* remove this job from the DS list. If we have completed the ack
Expand Down
5 changes: 5 additions & 0 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,9 @@ struct DownstairsIO {
/// Map of work status, tracked on a per-client basis
state: ClientData<IOState>,

/// At what time did we hear a reply?
reply_time: ClientData<Option<std::time::Instant>>,

/*
* Has this been acked to the guest yet?
*/
Expand Down Expand Up @@ -2024,6 +2027,8 @@ pub struct Arg {
pub ds_extents_repaired: [usize; 3],
/// Times we have live confirmed an extent on this downstairs.
pub ds_extents_confirmed: [usize; 3],
/// Per-client delay to keep them roughly in sync
pub ds_delay_us: [usize; 3],
/// Times we skipped repairing a downstairs because we are read_only.
pub ds_ro_lr_skipped: [usize; 3],
}
Expand Down
3 changes: 3 additions & 0 deletions upstairs/src/upstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,8 @@ impl Upstairs {
self.downstairs.collect_stats(|c| c.stats.extents_repaired);
let ds_extents_confirmed =
self.downstairs.collect_stats(|c| c.stats.extents_confirmed);
let ds_delay_us =
self.downstairs.collect_stats(|c| c.get_delay_us() as usize);
let ds_ro_lr_skipped =
self.downstairs.collect_stats(|c| c.stats.ro_lr_skipped);

Expand All @@ -762,6 +764,7 @@ impl Upstairs {
ds_flow_control,
ds_extents_repaired,
ds_extents_confirmed,
ds_delay_us,
ds_ro_lr_skipped,
};
("stats", arg)
Expand Down