From 8ce58cd7a9a2f10503f128af3912ab96dfe36df6 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Fri, 26 Jan 2024 17:16:02 -0500 Subject: [PATCH 1/4] Move guest to a module (broken) --- upstairs/src/guest.rs | 918 +++++++++++++++++++++++++++++++++++++++ upstairs/src/lib.rs | 896 +------------------------------------- upstairs/src/upstairs.rs | 4 +- 3 files changed, 925 insertions(+), 893 deletions(-) create mode 100644 upstairs/src/guest.rs diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs new file mode 100644 index 000000000..bbbb853bf --- /dev/null +++ b/upstairs/src/guest.rs @@ -0,0 +1,918 @@ +// Copyright 2023 Oxide Computer Company +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; + +use crate::{ + BlockIO, BlockOp, BlockReq, BlockReqReply, BlockReqWaiter, BlockRes, + Buffer, JobId, ReplaceResult, UpstairsAction, +}; +use crucible_common::{build_logger, crucible_bail, Block, CrucibleError}; +use crucible_protocol::{ReadResponse, SnapshotDetails}; + +use async_trait::async_trait; +use bytes::Bytes; +use ringbuffer::{AllocRingBuffer, RingBuffer}; +use slog::{info, warn, Logger}; +use tokio::sync::{mpsc, Mutex}; +use tracing::{instrument, span, Level}; +use uuid::Uuid; + +/* + * This structure is for tracking the underlying storage side operations + * that map to a single Guest IO request. G to S stands for Guest + * to Storage. + * + * The submitted hashmap is indexed by the request number (ds_id) for the + * downstairs requests issued on behalf of this request. + */ +#[derive(Debug)] +struct GtoS { + /* + * Job we sent on to the downstairs. + */ + ds_id: JobId, + + /* + * Buffer provided by the guest request. If this is a read, + * data will be written here. + */ + guest_buffer: Option, + + /* + * Notify the caller waiting on the job to finish. + * This is an Option for the case where we want to send an IO on behalf + * of the Upstairs (not guest driven). Right now the only case where we + * need that is to flush data to downstairs when the guest has not sent + * us a flush in some time. This allows us to free internal buffers. + * If the sender is None, we know it's a request from the Upstairs and + * we don't have to ACK it to anyone. + */ + res: Option, +} + +impl GtoS { + /// Create a new GtoS object where one Guest IO request maps to one + /// downstairs operation. + pub fn new( + ds_id: JobId, + guest_buffer: Option, + res: Option, + ) -> GtoS { + GtoS { + ds_id, + guest_buffer, + res, + } + } + + /* + * When all downstairs jobs have completed, and all buffers have been + * attached to the GtoS struct, we can do the final copy of the data + * from upstairs memory back to the guest's memory. Notify corresponding + * BlockReqWaiter if required + */ + #[instrument] + fn transfer_and_notify( + self, + downstairs_responses: Option>, + result: Result<(), CrucibleError>, + ) { + let guest_buffer = if let Some(mut guest_buffer) = self.guest_buffer { + if let Some(downstairs_responses) = downstairs_responses { + let mut offset = 0; + + // XXX don't do if result.is_err()? + for response in &downstairs_responses { + // Copy over into guest memory. + { + let _ignored = + span!(Level::TRACE, "copy to guest buffer") + .entered(); + + guest_buffer.write_read_response(offset, response); + offset += response.data.len(); + } + } + } else { + /* + * Should this panic? If the caller is requesting a transfer, + * the guest_buffer should exist. If it does not exist, then + * either there is a real problem, or the operation was a write + * or flush and why are we requesting a transfer for those. + * + * However, dropping a Guest before receiving a downstairs + * response will trigger this, so eat it for now. + */ + } + + Some(guest_buffer) + } else { + None + }; + + /* + * If present, send the result to the guest. If this is a flush + * issued on behalf of crucible, then there is no place to send + * a result to. + * + * XXX: If the guest is no longer listening and this returns an + * error, do we care? This could happen if the guest has + * given up because an IO took too long, or other possible + * guest side reasons. + */ + if let Some(res) = self.res { + match result { + Ok(_) => match guest_buffer { + Some(guest_buffer) => res.send_ok_with_buffer(guest_buffer), + None => res.send_ok(), + }, + + Err(e) => match guest_buffer { + Some(guest_buffer) => { + res.send_err_with_buffer(guest_buffer, e) + } + None => res.send_err(e), + }, + } + } + } +} + +/// Strongly-typed ID for guest work (stored in the [`GuestWork`] map) +#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)] +pub(crate) struct GuestWorkId(pub u64); + +impl std::fmt::Display for GuestWorkId { + fn fmt( + &self, + f: &mut std::fmt::Formatter<'_>, + ) -> Result<(), std::fmt::Error> { + std::fmt::Display::fmt(&self.0, f) + } +} + +/** + * This structure keeps track of work that Crucible has accepted from the + * "Guest", aka, Propolis. + * + * The active is a hashmap of GtoS structures for all I/Os that are + * outstanding. Either just created or in progress operations. The key + * for a new job comes from next_gw_id and should always increment. + * + * Once we have decided enough downstairs requests are finished, we remove + * the entry from the active and add the gw_id to the completed vec. + * + * TODO: The completed needs to implement some notify back to the Guest, and + * it should probably be a ring buffer. + */ +#[derive(Debug)] +struct GuestWork { + active: HashMap, + next_gw_id: u64, + completed: AllocRingBuffer, +} + +impl GuestWork { + fn is_empty(&self) -> bool { + self.active.is_empty() + } + + fn next_gw_id(&mut self) -> GuestWorkId { + let id = self.next_gw_id; + self.next_gw_id += 1; + GuestWorkId(id) + } + + /* + * When the required number of completions for a downstairs + * ds_id have arrived, we call this method on the parent GuestWork + * that requested them and include the Option from the IO. + * + * If this operation was a read, then we attach the Bytes read to the + * GtoS struct for later transfer. + * + * A single GtoS job may have multiple downstairs jobs it created, so + * we may not be done yet. When the required number of completions have + * arrived from all the downstairs jobs we created, then we + * can move forward with finishing up the guest work operation. + * This may include moving data buffers from completed reads. + */ + #[instrument] + async fn gw_ds_complete( + &mut self, + gw_id: GuestWorkId, + ds_id: JobId, + data: Option>, + result: Result<(), CrucibleError>, + log: &Logger, + ) { + if let Some(gtos_job) = self.active.remove(&gw_id) { + assert_eq!(gtos_job.ds_id, ds_id); + + /* + * Copy (if present) read data back to the guest buffer they + * provided to us, and notify any waiters. + */ + gtos_job.transfer_and_notify(data, result); + + self.completed.push(gw_id); + } else { + /* + * XXX This is just so I can see if ever does happen. + */ + panic!("gw_id {} for job {} not on active list", gw_id, ds_id); + } + } +} + +impl Default for GuestWork { + fn default() -> Self { + Self { + active: HashMap::new(), // GtoS + next_gw_id: 1, + completed: AllocRingBuffer::new(2048), + } + } +} + +/// IO handles used by the guest uses to pass work into Crucible proper +/// +/// This data structure is the counterpart to the [`GuestIoHandle`], which +/// receives work from the guest and is exclusively owned by the +/// [`upstairs::Upstairs`] +/// +/// Requests from the guest are put into the `req_tx` queue by the guest, and +/// received by the [`GuestIoHandle::req_rx`] side. +#[derive(Debug)] +pub struct Guest { + /// New requests from outside go into this queue + req_tx: mpsc::Sender, + + /// Local cache for block size + /// + /// This is 0 when unpopulated, and non-zero otherwise; storing it locally + /// saves a round-trip through the `reqs` queue, and using an atomic means + /// it can be read from a `&self` reference. + block_size: AtomicU64, + + /// Backpressure is implemented as a delay on host write operations + /// + /// It is stored in an `Arc` so that the `GuestIoHandle` can update it from + /// the IO task. + backpressure_us: Arc, + + /// Lock held during backpressure delay + /// + /// Without this lock, multiple tasks could submit jobs to the upstairs and + /// wait in parallel, which defeats the purpose of backpressure (since you + /// could send arbitrarily many jobs at high speed by sending them from + /// different tasks). + backpressure_lock: Mutex<()>, + + /// Logger for the guest + log: Logger, +} + +/// Configuration for host-side backpressure +/// +/// Backpressure adds an artificial delay to host write messages (which are +/// otherwise acked immediately, before actually being complete). The delay is +/// varied based on two metrics: +/// +/// - number of write bytes outstanding +/// - queue length as a fraction (where 1.0 is full) +/// +/// These two metrics are used for quadratic backpressure, picking the larger of +/// the two delays. +#[derive(Copy, Clone, Debug)] +struct BackpressureConfig { + /// When should backpressure start (in bytes)? + bytes_start: u64, + /// Scale for byte-based quadratic backpressure + bytes_scale: f64, + + /// When should queue-based backpressure start? + queue_start: f64, + /// Maximum queue-based delay + queue_max_delay: Duration, +} + +/* + * These methods are how to add or checking for new work on the Guest struct + */ +impl Guest { + pub fn new(log: Option) -> (Guest, GuestIoHandle) { + let log = log.unwrap_or_else(build_logger); + + // The channel size is chosen arbitrarily here. The `req_rx` side + // is running independently and will constantly be processing messages, + // so we don't expect the queue to become full. The `req_tx` side is + // only ever used in `Guest::send`, which waits for acknowledgement from + // the other side of the queue; there are no places where we put stuff + // into the queue without awaiting a response. + // + // Together, these facts mean that the queue should remain relatively + // small. The exception is if someone spawns a zillion tasks, all of + // which call `Guest` APIs simultaneously. In that case, having the + // queue be full will just look like another source of backpressure (and + // will in fact be invisible to the caller, since they can't distinguish + // time spent waiting for the queue versus time spent in Upstairs code). + let (req_tx, req_rx) = mpsc::channel(500); + + let backpressure_us = Arc::new(AtomicU64::new(0)); + let limits = GuestLimits { + iop_limit: None, + bw_limit: None, + }; + let io = GuestIoHandle { + req_rx, + req_head: None, + req_limited: false, + limits, + + guest_work: GuestWork { + active: HashMap::new(), // GtoS + next_gw_id: 1, + completed: AllocRingBuffer::new(2048), + }, + + iop_tokens: 0, + bw_tokens: 0, + backpressure_us: backpressure_us.clone(), + backpressure_config: BackpressureConfig { + bytes_start: 1024u64.pow(3), // Start at 1 GiB + bytes_scale: 9.3e-8, // Delay of 10ms at 2 GiB in-flight + queue_start: 0.05, + queue_max_delay: Duration::from_millis(5), + }, + log: log.clone(), + }; + let guest = Guest { + req_tx, + + block_size: AtomicU64::new(0), + + backpressure_us, + backpressure_lock: Mutex::new(()), + log, + }; + (guest, io) + } + + /* + * This is used to submit a new BlockOp IO request to Crucible. + */ + async fn send(&self, op: BlockOp) -> BlockReqWaiter { + let (brw, res) = BlockReqWaiter::pair(); + if let Err(e) = self.req_tx.send(BlockReq { op, res }).await { + // This could happen during shutdown, if the up_main task is + // destroyed while the Guest is still trying to do work. + // + // If this happens, then the BlockReqWaiter will immediately return + // with CrucibleError::RecvDisconnected (since the oneshot::Sender + // will have been dropped into the void). + warn!(self.log, "failed to send op to guest: {e}"); + } + brw + } + + async fn send_and_wait(&self, op: BlockOp) -> BlockReqReply { + let brw = self.send(op).await; + brw.wait(&self.log).await + } + + pub async fn query_extent_size(&self) -> Result { + let data = Arc::new(Mutex::new(Block::new(0, 9))); + let extent_query = BlockOp::QueryExtentSize { data: data.clone() }; + + let reply = self.send_and_wait(extent_query).await; + assert!(reply.buffer.is_none()); + reply.result?; + + let es = *data.lock().await; + Ok(es) + } + + pub async fn query_work_queue(&self) -> Result { + let wc = WQCounts { + up_count: 0, + ds_count: 0, + active_count: 0, + }; + + let data = Arc::new(Mutex::new(wc)); + let qwq = BlockOp::QueryWorkQueue { data: data.clone() }; + + let reply = self.send_and_wait(qwq).await; + assert!(reply.buffer.is_none()); + reply.result?; + + let wc = data.lock().await; + Ok(*wc) + } + + // Maybe this can just be a guest specific thing, not a BlockIO + pub async fn activate_with_gen( + &self, + gen: u64, + ) -> Result<(), CrucibleError> { + let waiter = self.send(BlockOp::GoActiveWithGen { gen }).await; + info!( + self.log, + "The guest has requested activation with gen:{}", gen + ); + + let reply = waiter.wait(&self.log).await; + assert!(reply.buffer.is_none()); + reply.result?; + + info!( + self.log, + "The guest has finished waiting for activation with:{}", gen + ); + + Ok(()) + } + + async fn backpressure_sleep(&self) { + let bp = + Duration::from_micros(self.backpressure_us.load(Ordering::SeqCst)); + if bp > Duration::ZERO { + let _guard = self.backpressure_lock.lock().await; + tokio::time::sleep(bp).await; + drop(_guard); + } + } +} + +#[async_trait] +impl BlockIO for Guest { + async fn activate(&self) -> Result<(), CrucibleError> { + let waiter = self.send(BlockOp::GoActive).await; + info!(self.log, "The guest has requested activation"); + + let reply = waiter.wait(&self.log).await; + assert!(reply.buffer.is_none()); + reply.result?; + + info!(self.log, "The guest has finished waiting for activation"); + Ok(()) + } + + /// Disable any more IO from this guest and deactivate the downstairs. + async fn deactivate(&self) -> Result<(), CrucibleError> { + let reply = self.send_and_wait(BlockOp::Deactivate).await; + assert!(reply.buffer.is_none()); + reply.result + } + + async fn query_is_active(&self) -> Result { + let data = Arc::new(Mutex::new(false)); + let active_query = BlockOp::QueryGuestIOReady { data: data.clone() }; + + let reply = self.send_and_wait(active_query).await; + assert!(reply.buffer.is_none()); + reply.result?; + + let is_active = *data.lock().await; + Ok(is_active) + } + + async fn total_size(&self) -> Result { + let data = Arc::new(Mutex::new(0)); + let size_query = BlockOp::QueryTotalSize { data: data.clone() }; + + let reply = self.send_and_wait(size_query).await; + assert!(reply.buffer.is_none()); + reply.result?; + + let total_size = *data.lock().await; + Ok(total_size) + } + + async fn get_block_size(&self) -> Result { + let bs = self.block_size.load(std::sync::atomic::Ordering::Relaxed); + if bs == 0 { + let data = Arc::new(Mutex::new(0)); + let size_query = BlockOp::QueryBlockSize { data: data.clone() }; + + let reply = self.send_and_wait(size_query).await; + assert!(reply.buffer.is_none()); + reply.result?; + + let bs = *data.lock().await; + self.block_size + .store(bs, std::sync::atomic::Ordering::Relaxed); + Ok(bs) + } else { + Ok(bs) + } + } + + async fn get_uuid(&self) -> Result { + let data = Arc::new(Mutex::new(Uuid::default())); + let uuid_query = BlockOp::QueryUpstairsUuid { data: data.clone() }; + + let reply = self.send_and_wait(uuid_query).await; + assert!(reply.buffer.is_none()); + reply.result?; + + let uuid = *data.lock().await; + Ok(uuid) + } + + async fn read( + &self, + offset: Block, + data: &mut Buffer, + ) -> Result<(), CrucibleError> { + let bs = self.check_data_size(data.len()).await?; + + if offset.block_size_in_bytes() as u64 != bs { + crucible_bail!(BlockSizeMismatch); + } + + if data.is_empty() { + return Ok(()); + } + + let buffer = std::mem::take(data); + let rio = BlockOp::Read { + offset, + data: buffer, + }; + + // We've replaced `data` with a blank Buffer, and sent it over the + // channel. Replace it regardless of the outcome of the read so that the + // caller will not have to reallocate it. + let reply = self.send_and_wait(rio).await; + *data = reply.buffer.unwrap(); + + reply.result + } + + async fn write( + &self, + offset: Block, + data: Bytes, + ) -> Result<(), CrucibleError> { + let bs = self.check_data_size(data.len()).await?; + + if offset.block_size_in_bytes() as u64 != bs { + crucible_bail!(BlockSizeMismatch); + } + + if data.is_empty() { + return Ok(()); + } + let wio = BlockOp::Write { offset, data }; + + self.backpressure_sleep().await; + + let reply = self.send_and_wait(wio).await; + assert!(reply.buffer.is_none()); + reply.result + } + + async fn write_unwritten( + &self, + offset: Block, + data: Bytes, + ) -> Result<(), CrucibleError> { + let bs = self.check_data_size(data.len()).await?; + + if offset.block_size_in_bytes() as u64 != bs { + crucible_bail!(BlockSizeMismatch); + } + + if data.is_empty() { + return Ok(()); + } + let wio = BlockOp::WriteUnwritten { offset, data }; + + self.backpressure_sleep().await; + let reply = self.send_and_wait(wio).await; + assert!(reply.buffer.is_none()); + reply.result + } + + async fn flush( + &self, + snapshot_details: Option, + ) -> Result<(), CrucibleError> { + let reply = self + .send_and_wait(BlockOp::Flush { snapshot_details }) + .await; + assert!(reply.buffer.is_none()); + reply.result + } + + async fn show_work(&self) -> Result { + // Note: for this implementation, BlockOp::ShowWork will be sent and + // processed by the Upstairs even if it isn't active. + let wc = WQCounts { + up_count: 0, + ds_count: 0, + active_count: 0, + }; + + let data = Arc::new(Mutex::new(wc)); + let sw = BlockOp::ShowWork { data: data.clone() }; + + let reply = self.send_and_wait(sw).await; + assert!(reply.buffer.is_none()); + reply.result?; + + let wc = data.lock().await; + Ok(*wc) + } + + async fn replace_downstairs( + &self, + id: Uuid, + old: SocketAddr, + new: SocketAddr, + ) -> Result { + let data = Arc::new(Mutex::new(ReplaceResult::Missing)); + let sw = BlockOp::ReplaceDownstairs { + id, + old, + new, + result: data.clone(), + }; + + let reply = self.send_and_wait(sw).await; + assert!(reply.buffer.is_none()); + reply.result?; + + let result = data.lock().await; + Ok(*result) + } +} + +/// Configuration for iops-per-second limiting +#[derive(Copy, Clone, Debug)] +pub struct IopLimit { + bytes_per_iop: usize, + iop_limit: usize, +} + +/// Configuration for guest limits +#[derive(Copy, Clone, Debug)] +pub struct GuestLimits { + iop_limit: Option, + bw_limit: Option, +} + +/// Handle for receiving requests from the guest +/// +/// This is the counterpart to the [`Guest`], which sends requests. It includes +/// the receiving side of the request queue, along with infrastructure for +/// bandwidth and IOP limiting. +/// +/// In addition, it contains information about the mapping from guest to +/// downstairs data structures, in the form of the [`GuestWork`] map. +/// +/// The life-cycle of a request is roughly the following: +/// +/// * Pop the request off the reqs queue. +/// +/// * Copy (and optionally encrypt) any data buffers provided to us by the +/// Guest. +/// +/// * Create one or more downstairs DownstairsIO structures. +/// +/// * Create a GtoS tracking structure with the id's for each downstairs task +/// and the read result buffer if required. +/// +/// * Add the GtoS struct to the in GuestWork active work hashmap. +/// +/// * Put all the DownstairsIO structures on the downstairs work queue +/// +/// * Wait for them to complete, then notify the guest through oneshot channels +pub struct GuestIoHandle { + /// Queue to receive new blockreqs + req_rx: mpsc::Receiver, + + /// Guest IO and bandwidth limits + limits: GuestLimits, + + /// `BlockReq` that is at the head of the queue + /// + /// If a `BlockReq` was pulled from the queue but couldn't be used due to + /// IOP or bandwidth limiting, it's stored here instead (and we check this + /// before awaiting the queue). + req_head: Option, + + /// Are we currently IOP or bandwidth limited? + /// + /// If so, we don't return anything in `recv()` + req_limited: bool, + + /// Current number of IOP tokens + iop_tokens: usize, + + /// Current backpressure (shared with the `Guest`) + backpressure_us: Arc, + + /// Backpressure configuration, as a starting point and max delay + backpressure_config: BackpressureConfig, + + /// Bandwidth tokens (in bytes) + bw_tokens: usize, + + /// Active work from the guest + /// + /// When the crucible listening task has noticed a new IO request, it + /// will pull it from the reqs queue and create an GuestWork struct + /// as well as convert the new IO request into the matching + /// downstairs request(s). Each new GuestWork request will get a + /// unique gw_id, which is also the index for that operation into the + /// hashmap. + /// + /// It is during this process that data will encrypted. For a read, the + /// data is decrypted back to the guest provided buffer after all the + /// required downstairs operations are completed. + guest_work: GuestWork, + + /// Log handle, mainly to pass it into the [`Upstairs`] + pub log: Logger, +} + +impl GuestIoHandle { + /// Leak IOPs tokens + fn leak_iop_tokens(&mut self, tokens: usize) { + self.iop_tokens = self.iop_tokens.saturating_sub(tokens); + self.req_limited = false; + } + + /// Leak bytes from bandwidth tokens + fn leak_bw_tokens(&mut self, bytes: usize) { + self.bw_tokens = self.bw_tokens.saturating_sub(bytes); + self.req_limited = false; + } + + /// Listen for new work + /// + /// This will wait forever if we are currently IOP / BW limited; otherwise, + /// it will return the next value from the `BlockReq` queue. + /// + /// To avoid being stuck forever, this function should be called as **a + /// branch** of a `select!` statement that _also_ includes at least one + /// timeout; we should use that timeout to periodically service the IOP / BW + /// token counters, which will unblock the `GuestIoHandle` in future calls. + async fn recv(&mut self) -> UpstairsAction { + let req = if self.req_limited { + futures::future::pending().await + } else if let Some(req) = self.req_head.take() { + req + } else if let Some(req) = self.req_rx.recv().await { + // NOTE: once we take this req from the queue, we must be cancel + // safe! In other words, we cannot yield until either (1) returning + // the req or (2) storing it in self.req_head for safe-keeping. + req + } else { + warn!(self.log, "Guest handle has been dropped"); + return UpstairsAction::GuestDropped; + }; + + // Check if we can consume right away + let iop_limit_applies = + self.limits.iop_limit.is_some() && req.op.consumes_iops(); + let bw_limit_applies = + self.limits.bw_limit.is_some() && req.op.sz().is_some(); + + if !iop_limit_applies && !bw_limit_applies { + return UpstairsAction::Guest(req); + } + + // Check bandwidth limit before IOP limit, but make sure only to consume + // tokens if both checks pass! + + let mut bw_check_ok = true; + let mut iop_check_ok = true; + + // When checking tokens vs the limit, do not check by checking if adding + // the block request's values to the applicable limit: this would create + // a scenario where a large IO enough would stall the pipeline (see + // test_impossible_io). Instead, check if the limits are already + // reached. + + if let Some(bw_limit) = self.limits.bw_limit { + if req.op.sz().is_some() && self.bw_tokens >= bw_limit { + bw_check_ok = false; + } + } + + if let Some(iop_limit_cfg) = &self.limits.iop_limit { + let bytes_per_iops = iop_limit_cfg.bytes_per_iop; + if req.op.iops(bytes_per_iops).is_some() + && self.iop_tokens >= iop_limit_cfg.iop_limit + { + iop_check_ok = false; + } + } + + // If both checks pass, consume appropriate resources and return the + // block req + if bw_check_ok && iop_check_ok { + if self.limits.bw_limit.is_some() { + if let Some(sz) = req.op.sz() { + self.bw_tokens += sz; + } + } + + if let Some(cfg) = &self.limits.iop_limit { + if let Some(req_iops) = req.op.iops(cfg.bytes_per_iop) { + self.iop_tokens += req_iops; + } + } + + UpstairsAction::Guest(req) + } else { + assert!(self.req_head.is_none()); + self.req_head = Some(req); + futures::future::pending().await + } + } + + /// Set `self.backpressure_us` based on outstanding IO ratio + fn set_backpressure(&self, bytes: u64, ratio: f64) { + // Check to see if the number of outstanding write bytes (between + // the upstairs and downstairs) is particularly high. If so, + // apply some backpressure by delaying host operations, with a + // quadratically-increasing delay. + let d1 = (bytes.saturating_sub(self.backpressure_config.bytes_start) + as f64 + * self.backpressure_config.bytes_scale) + .powf(2.0) as u64; + + // Compute an alternate delay based on queue length + let d2 = self + .backpressure_config + .queue_max_delay + .mul_f64( + ((ratio - self.backpressure_config.queue_start).max(0.0) + / (1.0 - self.backpressure_config.queue_start)) + .powf(2.0), + ) + .as_micros() as u64; + self.backpressure_us.store(d1.max(d2), Ordering::SeqCst); + } + + pub fn set_iop_limit(&mut self, bytes_per_iop: usize, limit: usize) { + self.limits.iop_limit = Some(IopLimit { + bytes_per_iop, + iop_limit: limit, + }); + } + + pub fn set_bw_limit(&mut self, bytes_per_second: usize) { + self.limits.bw_limit = Some(bytes_per_second); + } + + /// Returns the number of active jobs + pub fn active_count(&self) -> usize { + self.guest_work.active.len() + } +} + +/* + * Work Queue Counts, for debug ShowWork IO type + */ +#[derive(Debug, Copy, Clone)] +pub struct WQCounts { + pub up_count: usize, + pub ds_count: usize, + pub active_count: usize, +} + +/* + * Debug function to dump the guest work structure. + * This does a bit while holding the mutex, so don't expect performance + * to get better when calling it. + * + * TODO: make this one big dump, where we include the up.work.active + * printing for each guest_work. It will be much more dense, but require + * holding both locks for the duration. + */ +pub(crate) fn show_guest_work(guest: &GuestIoHandle) -> usize { + println!("Guest work: Active and Completed Jobs:"); + let gw = &guest.guest_work; + let mut kvec: Vec<_> = gw.active.keys().cloned().collect(); + kvec.sort_unstable(); + for id in kvec.iter() { + let job = gw.active.get(id).unwrap(); + println!("GW_JOB active:[{:04}] D:{:?} ", id, job.ds_id); + } + let done = gw.completed.to_vec(); + println!("GW_JOB completed count:{:?} ", done.len()); + kvec.len() +} diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index 63346a494..fe0562ea0 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -10,7 +10,6 @@ use std::fmt::{Debug, Formatter}; use std::io::{Read as _, Result as IOResult, Seek, SeekFrom, Write as _}; use std::net::SocketAddr; use std::pin::Pin; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -21,13 +20,13 @@ pub use crucible_protocol::*; use anyhow::{bail, Result}; pub use bytes::{Bytes, BytesMut}; use oximeter::types::ProducerRegistry; -use ringbuffer::{AllocRingBuffer, RingBuffer}; +use ringbuffer::AllocRingBuffer; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use slog::{error, info, o, warn, Logger}; use tokio::sync::{mpsc, oneshot, Mutex, RwLock}; use tokio::time::Instant; -use tracing::{instrument, span, Level}; +use tracing::instrument; use usdt::register_probes; use uuid::Uuid; @@ -54,6 +53,9 @@ mod mend; pub use mend::{DownstairsMend, ExtentFix, RegionMetadata}; pub use pseudo_file::CruciblePseudoFile; +pub(crate) mod guest; +use guest::{GuestIoHandle, GuestWorkId, WQCounts}; + mod stats; mod impacted_blocks; @@ -1987,871 +1989,6 @@ async fn test_return_iops() { assert_eq!(op.iops(IOP_SZ).unwrap(), 2); } -/* - * This structure is for tracking the underlying storage side operations - * that map to a single Guest IO request. G to S stands for Guest - * to Storage. - * - * The submitted hashmap is indexed by the request number (ds_id) for the - * downstairs requests issued on behalf of this request. - */ -#[derive(Debug)] -struct GtoS { - /* - * Job we sent on to the downstairs. - */ - ds_id: JobId, - - /* - * Buffer provided by the guest request. If this is a read, - * data will be written here. - */ - guest_buffer: Option, - - /* - * Notify the caller waiting on the job to finish. - * This is an Option for the case where we want to send an IO on behalf - * of the Upstairs (not guest driven). Right now the only case where we - * need that is to flush data to downstairs when the guest has not sent - * us a flush in some time. This allows us to free internal buffers. - * If the sender is None, we know it's a request from the Upstairs and - * we don't have to ACK it to anyone. - */ - res: Option, -} - -impl GtoS { - /// Create a new GtoS object where one Guest IO request maps to one - /// downstairs operation. - pub fn new( - ds_id: JobId, - guest_buffer: Option, - res: Option, - ) -> GtoS { - GtoS { - ds_id, - guest_buffer, - res, - } - } - - /* - * When all downstairs jobs have completed, and all buffers have been - * attached to the GtoS struct, we can do the final copy of the data - * from upstairs memory back to the guest's memory. Notify corresponding - * BlockReqWaiter if required - */ - #[instrument] - fn transfer_and_notify( - self, - downstairs_responses: Option>, - result: Result<(), CrucibleError>, - ) { - let guest_buffer = if let Some(mut guest_buffer) = self.guest_buffer { - if let Some(downstairs_responses) = downstairs_responses { - let mut offset = 0; - - // XXX don't do if result.is_err()? - for response in &downstairs_responses { - // Copy over into guest memory. - { - let _ignored = - span!(Level::TRACE, "copy to guest buffer") - .entered(); - - guest_buffer.write_read_response(offset, response); - offset += response.data.len(); - } - } - } else { - /* - * Should this panic? If the caller is requesting a transfer, - * the guest_buffer should exist. If it does not exist, then - * either there is a real problem, or the operation was a write - * or flush and why are we requesting a transfer for those. - * - * However, dropping a Guest before receiving a downstairs - * response will trigger this, so eat it for now. - */ - } - - Some(guest_buffer) - } else { - None - }; - - /* - * If present, send the result to the guest. If this is a flush - * issued on behalf of crucible, then there is no place to send - * a result to. - * - * XXX: If the guest is no longer listening and this returns an - * error, do we care? This could happen if the guest has - * given up because an IO took too long, or other possible - * guest side reasons. - */ - if let Some(res) = self.res { - match result { - Ok(_) => match guest_buffer { - Some(guest_buffer) => res.send_ok_with_buffer(guest_buffer), - None => res.send_ok(), - }, - - Err(e) => match guest_buffer { - Some(guest_buffer) => { - res.send_err_with_buffer(guest_buffer, e) - } - None => res.send_err(e), - }, - } - } - } -} - -/// Strongly-typed ID for guest work (stored in the [`GuestWork`] map) -#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)] -pub(crate) struct GuestWorkId(pub u64); - -impl std::fmt::Display for GuestWorkId { - fn fmt( - &self, - f: &mut std::fmt::Formatter<'_>, - ) -> Result<(), std::fmt::Error> { - std::fmt::Display::fmt(&self.0, f) - } -} - -/** - * This structure keeps track of work that Crucible has accepted from the - * "Guest", aka, Propolis. - * - * The active is a hashmap of GtoS structures for all I/Os that are - * outstanding. Either just created or in progress operations. The key - * for a new job comes from next_gw_id and should always increment. - * - * Once we have decided enough downstairs requests are finished, we remove - * the entry from the active and add the gw_id to the completed vec. - * - * TODO: The completed needs to implement some notify back to the Guest, and - * it should probably be a ring buffer. - */ -#[derive(Debug)] -struct GuestWork { - active: HashMap, - next_gw_id: u64, - completed: AllocRingBuffer, -} - -impl GuestWork { - fn is_empty(&self) -> bool { - self.active.is_empty() - } - - fn next_gw_id(&mut self) -> GuestWorkId { - let id = self.next_gw_id; - self.next_gw_id += 1; - GuestWorkId(id) - } - - /* - * When the required number of completions for a downstairs - * ds_id have arrived, we call this method on the parent GuestWork - * that requested them and include the Option from the IO. - * - * If this operation was a read, then we attach the Bytes read to the - * GtoS struct for later transfer. - * - * A single GtoS job may have multiple downstairs jobs it created, so - * we may not be done yet. When the required number of completions have - * arrived from all the downstairs jobs we created, then we - * can move forward with finishing up the guest work operation. - * This may include moving data buffers from completed reads. - */ - #[instrument] - async fn gw_ds_complete( - &mut self, - gw_id: GuestWorkId, - ds_id: JobId, - data: Option>, - result: Result<(), CrucibleError>, - log: &Logger, - ) { - if let Some(gtos_job) = self.active.remove(&gw_id) { - assert_eq!(gtos_job.ds_id, ds_id); - - /* - * Copy (if present) read data back to the guest buffer they - * provided to us, and notify any waiters. - */ - gtos_job.transfer_and_notify(data, result); - - self.completed.push(gw_id); - } else { - /* - * XXX This is just so I can see if ever does happen. - */ - panic!("gw_id {} for job {} not on active list", gw_id, ds_id); - } - } -} - -impl Default for GuestWork { - fn default() -> Self { - Self { - active: HashMap::new(), // GtoS - next_gw_id: 1, - completed: AllocRingBuffer::new(2048), - } - } -} - -/// IO handles used by the guest uses to pass work into Crucible proper -/// -/// This data structure is the counterpart to the [`GuestIoHandle`], which -/// receives work from the guest and is exclusively owned by the -/// [`upstairs::Upstairs`] -/// -/// Requests from the guest are put into the `req_tx` queue by the guest, and -/// received by the [`GuestIoHandle::req_rx`] side. -#[derive(Debug)] -pub struct Guest { - /// New requests from outside go into this queue - req_tx: mpsc::Sender, - - /// Local cache for block size - /// - /// This is 0 when unpopulated, and non-zero otherwise; storing it locally - /// saves a round-trip through the `reqs` queue, and using an atomic means - /// it can be read from a `&self` reference. - block_size: AtomicU64, - - /// Backpressure is implemented as a delay on host write operations - /// - /// It is stored in an `Arc` so that the `GuestIoHandle` can update it from - /// the IO task. - backpressure_us: Arc, - - /// Lock held during backpressure delay - /// - /// Without this lock, multiple tasks could submit jobs to the upstairs and - /// wait in parallel, which defeats the purpose of backpressure (since you - /// could send arbitrarily many jobs at high speed by sending them from - /// different tasks). - backpressure_lock: Mutex<()>, - - /// Logger for the guest - log: Logger, -} - -/// Configuration for host-side backpressure -/// -/// Backpressure adds an artificial delay to host write messages (which are -/// otherwise acked immediately, before actually being complete). The delay is -/// varied based on two metrics: -/// -/// - number of write bytes outstanding -/// - queue length as a fraction (where 1.0 is full) -/// -/// These two metrics are used for quadratic backpressure, picking the larger of -/// the two delays. -#[derive(Copy, Clone, Debug)] -struct BackpressureConfig { - /// When should backpressure start (in bytes)? - bytes_start: u64, - /// Scale for byte-based quadratic backpressure - bytes_scale: f64, - - /// When should queue-based backpressure start? - queue_start: f64, - /// Maximum queue-based delay - queue_max_delay: Duration, -} - -/* - * These methods are how to add or checking for new work on the Guest struct - */ -impl Guest { - pub fn new(log: Option) -> (Guest, GuestIoHandle) { - let log = log.unwrap_or_else(build_logger); - - // The channel size is chosen arbitrarily here. The `req_rx` side - // is running independently and will constantly be processing messages, - // so we don't expect the queue to become full. The `req_tx` side is - // only ever used in `Guest::send`, which waits for acknowledgement from - // the other side of the queue; there are no places where we put stuff - // into the queue without awaiting a response. - // - // Together, these facts mean that the queue should remain relatively - // small. The exception is if someone spawns a zillion tasks, all of - // which call `Guest` APIs simultaneously. In that case, having the - // queue be full will just look like another source of backpressure (and - // will in fact be invisible to the caller, since they can't distinguish - // time spent waiting for the queue versus time spent in Upstairs code). - let (req_tx, req_rx) = mpsc::channel(500); - - let backpressure_us = Arc::new(AtomicU64::new(0)); - let limits = GuestLimits { - iop_limit: None, - bw_limit: None, - }; - let io = GuestIoHandle { - req_rx, - req_head: None, - req_limited: false, - limits, - - guest_work: GuestWork { - active: HashMap::new(), // GtoS - next_gw_id: 1, - completed: AllocRingBuffer::new(2048), - }, - - iop_tokens: 0, - bw_tokens: 0, - backpressure_us: backpressure_us.clone(), - backpressure_config: BackpressureConfig { - bytes_start: 1024u64.pow(3), // Start at 1 GiB - bytes_scale: 9.3e-8, // Delay of 10ms at 2 GiB in-flight - queue_start: 0.05, - queue_max_delay: Duration::from_millis(5), - }, - log: log.clone(), - }; - let guest = Guest { - req_tx, - - block_size: AtomicU64::new(0), - - backpressure_us, - backpressure_lock: Mutex::new(()), - log, - }; - (guest, io) - } - - /* - * This is used to submit a new BlockOp IO request to Crucible. - */ - async fn send(&self, op: BlockOp) -> BlockReqWaiter { - let (brw, res) = BlockReqWaiter::pair(); - if let Err(e) = self.req_tx.send(BlockReq { op, res }).await { - // This could happen during shutdown, if the up_main task is - // destroyed while the Guest is still trying to do work. - // - // If this happens, then the BlockReqWaiter will immediately return - // with CrucibleError::RecvDisconnected (since the oneshot::Sender - // will have been dropped into the void). - warn!(self.log, "failed to send op to guest: {e}"); - } - brw - } - - async fn send_and_wait(&self, op: BlockOp) -> BlockReqReply { - let brw = self.send(op).await; - brw.wait(&self.log).await - } - - pub async fn query_extent_size(&self) -> Result { - let data = Arc::new(Mutex::new(Block::new(0, 9))); - let extent_query = BlockOp::QueryExtentSize { data: data.clone() }; - - let reply = self.send_and_wait(extent_query).await; - assert!(reply.buffer.is_none()); - reply.result?; - - let es = *data.lock().await; - Ok(es) - } - - pub async fn query_work_queue(&self) -> Result { - let wc = WQCounts { - up_count: 0, - ds_count: 0, - active_count: 0, - }; - - let data = Arc::new(Mutex::new(wc)); - let qwq = BlockOp::QueryWorkQueue { data: data.clone() }; - - let reply = self.send_and_wait(qwq).await; - assert!(reply.buffer.is_none()); - reply.result?; - - let wc = data.lock().await; - Ok(*wc) - } - - // Maybe this can just be a guest specific thing, not a BlockIO - pub async fn activate_with_gen( - &self, - gen: u64, - ) -> Result<(), CrucibleError> { - let waiter = self.send(BlockOp::GoActiveWithGen { gen }).await; - info!( - self.log, - "The guest has requested activation with gen:{}", gen - ); - - let reply = waiter.wait(&self.log).await; - assert!(reply.buffer.is_none()); - reply.result?; - - info!( - self.log, - "The guest has finished waiting for activation with:{}", gen - ); - - Ok(()) - } - - async fn backpressure_sleep(&self) { - let bp = - Duration::from_micros(self.backpressure_us.load(Ordering::SeqCst)); - if bp > Duration::ZERO { - let _guard = self.backpressure_lock.lock().await; - tokio::time::sleep(bp).await; - drop(_guard); - } - } -} - -#[async_trait] -impl BlockIO for Guest { - async fn activate(&self) -> Result<(), CrucibleError> { - let waiter = self.send(BlockOp::GoActive).await; - info!(self.log, "The guest has requested activation"); - - let reply = waiter.wait(&self.log).await; - assert!(reply.buffer.is_none()); - reply.result?; - - info!(self.log, "The guest has finished waiting for activation"); - Ok(()) - } - - /// Disable any more IO from this guest and deactivate the downstairs. - async fn deactivate(&self) -> Result<(), CrucibleError> { - let reply = self.send_and_wait(BlockOp::Deactivate).await; - assert!(reply.buffer.is_none()); - reply.result - } - - async fn query_is_active(&self) -> Result { - let data = Arc::new(Mutex::new(false)); - let active_query = BlockOp::QueryGuestIOReady { data: data.clone() }; - - let reply = self.send_and_wait(active_query).await; - assert!(reply.buffer.is_none()); - reply.result?; - - let is_active = *data.lock().await; - Ok(is_active) - } - - async fn total_size(&self) -> Result { - let data = Arc::new(Mutex::new(0)); - let size_query = BlockOp::QueryTotalSize { data: data.clone() }; - - let reply = self.send_and_wait(size_query).await; - assert!(reply.buffer.is_none()); - reply.result?; - - let total_size = *data.lock().await; - Ok(total_size) - } - - async fn get_block_size(&self) -> Result { - let bs = self.block_size.load(std::sync::atomic::Ordering::Relaxed); - if bs == 0 { - let data = Arc::new(Mutex::new(0)); - let size_query = BlockOp::QueryBlockSize { data: data.clone() }; - - let reply = self.send_and_wait(size_query).await; - assert!(reply.buffer.is_none()); - reply.result?; - - let bs = *data.lock().await; - self.block_size - .store(bs, std::sync::atomic::Ordering::Relaxed); - Ok(bs) - } else { - Ok(bs) - } - } - - async fn get_uuid(&self) -> Result { - let data = Arc::new(Mutex::new(Uuid::default())); - let uuid_query = BlockOp::QueryUpstairsUuid { data: data.clone() }; - - let reply = self.send_and_wait(uuid_query).await; - assert!(reply.buffer.is_none()); - reply.result?; - - let uuid = *data.lock().await; - Ok(uuid) - } - - async fn read( - &self, - offset: Block, - data: &mut Buffer, - ) -> Result<(), CrucibleError> { - let bs = self.check_data_size(data.len()).await?; - - if offset.block_size_in_bytes() as u64 != bs { - crucible_bail!(BlockSizeMismatch); - } - - if data.is_empty() { - return Ok(()); - } - - let buffer = std::mem::take(data); - let rio = BlockOp::Read { - offset, - data: buffer, - }; - - // We've replaced `data` with a blank Buffer, and sent it over the - // channel. Replace it regardless of the outcome of the read so that the - // caller will not have to reallocate it. - let reply = self.send_and_wait(rio).await; - *data = reply.buffer.unwrap(); - - reply.result - } - - async fn write( - &self, - offset: Block, - data: Bytes, - ) -> Result<(), CrucibleError> { - let bs = self.check_data_size(data.len()).await?; - - if offset.block_size_in_bytes() as u64 != bs { - crucible_bail!(BlockSizeMismatch); - } - - if data.is_empty() { - return Ok(()); - } - let wio = BlockOp::Write { offset, data }; - - self.backpressure_sleep().await; - - let reply = self.send_and_wait(wio).await; - assert!(reply.buffer.is_none()); - reply.result - } - - async fn write_unwritten( - &self, - offset: Block, - data: Bytes, - ) -> Result<(), CrucibleError> { - let bs = self.check_data_size(data.len()).await?; - - if offset.block_size_in_bytes() as u64 != bs { - crucible_bail!(BlockSizeMismatch); - } - - if data.is_empty() { - return Ok(()); - } - let wio = BlockOp::WriteUnwritten { offset, data }; - - self.backpressure_sleep().await; - let reply = self.send_and_wait(wio).await; - assert!(reply.buffer.is_none()); - reply.result - } - - async fn flush( - &self, - snapshot_details: Option, - ) -> Result<(), CrucibleError> { - let reply = self - .send_and_wait(BlockOp::Flush { snapshot_details }) - .await; - assert!(reply.buffer.is_none()); - reply.result - } - - async fn show_work(&self) -> Result { - // Note: for this implementation, BlockOp::ShowWork will be sent and - // processed by the Upstairs even if it isn't active. - let wc = WQCounts { - up_count: 0, - ds_count: 0, - active_count: 0, - }; - - let data = Arc::new(Mutex::new(wc)); - let sw = BlockOp::ShowWork { data: data.clone() }; - - let reply = self.send_and_wait(sw).await; - assert!(reply.buffer.is_none()); - reply.result?; - - let wc = data.lock().await; - Ok(*wc) - } - - async fn replace_downstairs( - &self, - id: Uuid, - old: SocketAddr, - new: SocketAddr, - ) -> Result { - let data = Arc::new(Mutex::new(ReplaceResult::Missing)); - let sw = BlockOp::ReplaceDownstairs { - id, - old, - new, - result: data.clone(), - }; - - let reply = self.send_and_wait(sw).await; - assert!(reply.buffer.is_none()); - reply.result?; - - let result = data.lock().await; - Ok(*result) - } -} - -/// Configuration for iops-per-second limiting -#[derive(Copy, Clone, Debug)] -pub struct IopLimit { - bytes_per_iop: usize, - iop_limit: usize, -} - -/// Configuration for guest limits -#[derive(Copy, Clone, Debug)] -pub struct GuestLimits { - iop_limit: Option, - bw_limit: Option, -} - -/// Handle for receiving requests from the guest -/// -/// This is the counterpart to the [`Guest`], which sends requests. It includes -/// the receiving side of the request queue, along with infrastructure for -/// bandwidth and IOP limiting. -/// -/// In addition, it contains information about the mapping from guest to -/// downstairs data structures, in the form of the [`GuestWork`] map. -/// -/// The life-cycle of a request is roughly the following: -/// -/// * Pop the request off the reqs queue. -/// -/// * Copy (and optionally encrypt) any data buffers provided to us by the -/// Guest. -/// -/// * Create one or more downstairs DownstairsIO structures. -/// -/// * Create a GtoS tracking structure with the id's for each downstairs task -/// and the read result buffer if required. -/// -/// * Add the GtoS struct to the in GuestWork active work hashmap. -/// -/// * Put all the DownstairsIO structures on the downstairs work queue -/// -/// * Wait for them to complete, then notify the guest through oneshot channels -pub struct GuestIoHandle { - /// Queue to receive new blockreqs - req_rx: mpsc::Receiver, - - /// Guest IO and bandwidth limits - limits: GuestLimits, - - /// `BlockReq` that is at the head of the queue - /// - /// If a `BlockReq` was pulled from the queue but couldn't be used due to - /// IOP or bandwidth limiting, it's stored here instead (and we check this - /// before awaiting the queue). - req_head: Option, - - /// Are we currently IOP or bandwidth limited? - /// - /// If so, we don't return anything in `recv()` - req_limited: bool, - - /// Current number of IOP tokens - iop_tokens: usize, - - /// Current backpressure (shared with the `Guest`) - backpressure_us: Arc, - - /// Backpressure configuration, as a starting point and max delay - backpressure_config: BackpressureConfig, - - /// Bandwidth tokens (in bytes) - bw_tokens: usize, - - /// Active work from the guest - /// - /// When the crucible listening task has noticed a new IO request, it - /// will pull it from the reqs queue and create an GuestWork struct - /// as well as convert the new IO request into the matching - /// downstairs request(s). Each new GuestWork request will get a - /// unique gw_id, which is also the index for that operation into the - /// hashmap. - /// - /// It is during this process that data will encrypted. For a read, the - /// data is decrypted back to the guest provided buffer after all the - /// required downstairs operations are completed. - guest_work: GuestWork, - - /// Log handle, mainly to pass it into the [`Upstairs`] - log: Logger, -} - -impl GuestIoHandle { - /// Leak IOPs tokens - fn leak_iop_tokens(&mut self, tokens: usize) { - self.iop_tokens = self.iop_tokens.saturating_sub(tokens); - self.req_limited = false; - } - - /// Leak bytes from bandwidth tokens - fn leak_bw_tokens(&mut self, bytes: usize) { - self.bw_tokens = self.bw_tokens.saturating_sub(bytes); - self.req_limited = false; - } - - /// Listen for new work - /// - /// This will wait forever if we are currently IOP / BW limited; otherwise, - /// it will return the next value from the `BlockReq` queue. - /// - /// To avoid being stuck forever, this function should be called as **a - /// branch** of a `select!` statement that _also_ includes at least one - /// timeout; we should use that timeout to periodically service the IOP / BW - /// token counters, which will unblock the `GuestIoHandle` in future calls. - async fn recv(&mut self) -> UpstairsAction { - let req = if self.req_limited { - futures::future::pending().await - } else if let Some(req) = self.req_head.take() { - req - } else if let Some(req) = self.req_rx.recv().await { - // NOTE: once we take this req from the queue, we must be cancel - // safe! In other words, we cannot yield until either (1) returning - // the req or (2) storing it in self.req_head for safe-keeping. - req - } else { - warn!(self.log, "Guest handle has been dropped"); - return UpstairsAction::GuestDropped; - }; - - // Check if we can consume right away - let iop_limit_applies = - self.limits.iop_limit.is_some() && req.op.consumes_iops(); - let bw_limit_applies = - self.limits.bw_limit.is_some() && req.op.sz().is_some(); - - if !iop_limit_applies && !bw_limit_applies { - return UpstairsAction::Guest(req); - } - - // Check bandwidth limit before IOP limit, but make sure only to consume - // tokens if both checks pass! - - let mut bw_check_ok = true; - let mut iop_check_ok = true; - - // When checking tokens vs the limit, do not check by checking if adding - // the block request's values to the applicable limit: this would create - // a scenario where a large IO enough would stall the pipeline (see - // test_impossible_io). Instead, check if the limits are already - // reached. - - if let Some(bw_limit) = self.limits.bw_limit { - if req.op.sz().is_some() && self.bw_tokens >= bw_limit { - bw_check_ok = false; - } - } - - if let Some(iop_limit_cfg) = &self.limits.iop_limit { - let bytes_per_iops = iop_limit_cfg.bytes_per_iop; - if req.op.iops(bytes_per_iops).is_some() - && self.iop_tokens >= iop_limit_cfg.iop_limit - { - iop_check_ok = false; - } - } - - // If both checks pass, consume appropriate resources and return the - // block req - if bw_check_ok && iop_check_ok { - if self.limits.bw_limit.is_some() { - if let Some(sz) = req.op.sz() { - self.bw_tokens += sz; - } - } - - if let Some(cfg) = &self.limits.iop_limit { - if let Some(req_iops) = req.op.iops(cfg.bytes_per_iop) { - self.iop_tokens += req_iops; - } - } - - UpstairsAction::Guest(req) - } else { - assert!(self.req_head.is_none()); - self.req_head = Some(req); - futures::future::pending().await - } - } - - /// Set `self.backpressure_us` based on outstanding IO ratio - fn set_backpressure(&self, bytes: u64, ratio: f64) { - // Check to see if the number of outstanding write bytes (between - // the upstairs and downstairs) is particularly high. If so, - // apply some backpressure by delaying host operations, with a - // quadratically-increasing delay. - let d1 = (bytes.saturating_sub(self.backpressure_config.bytes_start) - as f64 - * self.backpressure_config.bytes_scale) - .powf(2.0) as u64; - - // Compute an alternate delay based on queue length - let d2 = self - .backpressure_config - .queue_max_delay - .mul_f64( - ((ratio - self.backpressure_config.queue_start).max(0.0) - / (1.0 - self.backpressure_config.queue_start)) - .powf(2.0), - ) - .as_micros() as u64; - self.backpressure_us.store(d1.max(d2), Ordering::SeqCst); - } - - pub fn set_iop_limit(&mut self, bytes_per_iop: usize, limit: usize) { - self.limits.iop_limit = Some(IopLimit { - bytes_per_iop, - iop_limit: limit, - }); - } - - pub fn set_bw_limit(&mut self, bytes_per_second: usize) { - self.limits.bw_limit = Some(bytes_per_second); - } -} - -/* - * Work Queue Counts, for debug ShowWork IO type - */ -#[derive(Debug, Copy, Clone)] -pub struct WQCounts { - pub up_count: usize, - pub ds_count: usize, - pub active_count: usize, -} - /** * Stat counters struct used by DTrace */ @@ -2963,26 +2100,3 @@ pub fn up_main( Ok(join_handle) } - -/* - * Debug function to dump the guest work structure. - * This does a bit while holding the mutex, so don't expect performance - * to get better when calling it. - * - * TODO: make this one big dump, where we include the up.work.active - * printing for each guest_work. It will be much more dense, but require - * holding both locks for the duration. - */ -fn show_guest_work(guest: &GuestIoHandle) -> usize { - println!("Guest work: Active and Completed Jobs:"); - let gw = &guest.guest_work; - let mut kvec: Vec<_> = gw.active.keys().cloned().collect(); - kvec.sort_unstable(); - for id in kvec.iter() { - let job = gw.active.get(id).unwrap(); - println!("GW_JOB active:[{:04}] D:{:?} ", id, job.ds_id); - } - let done = gw.completed.to_vec(); - println!("GW_JOB completed count:{:?} ", done.len()); - kvec.len() -} diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index 84990c16f..c240a9293 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -1117,7 +1117,7 @@ impl Upstairs { pub(crate) fn show_all_work(&self) -> WQCounts { let gior = self.guest_io_ready(); - let up_count = self.guest.guest_work.active.len(); + let up_count = self.guest.active_count(); let ds_count = self.downstairs.active_count(); @@ -1133,7 +1133,7 @@ impl Upstairs { ); if ds_count == 0 { if up_count != 0 { - crate::show_guest_work(&self.guest); + crate::guest::show_guest_work(&self.guest); } } else { self.downstairs.show_all_work() From c61c65b7f03ea774b9047dc8c9751786f0928969 Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 30 Jan 2024 10:26:47 -0500 Subject: [PATCH 2/4] Everything is building and working --- upstairs/src/downstairs.rs | 44 +- upstairs/src/dummy_downstairs_tests.rs | 4 +- upstairs/src/guest.rs | 547 +++++++++++++++++++++++-- upstairs/src/lib.rs | 3 +- upstairs/src/test.rs | 424 +------------------ upstairs/src/upstairs.rs | 110 ++--- upstairs/src/volume.rs | 2 + 7 files changed, 591 insertions(+), 543 deletions(-) diff --git a/upstairs/src/downstairs.rs b/upstairs/src/downstairs.rs index dd75d3288..46eb0c44c 100644 --- a/upstairs/src/downstairs.rs +++ b/upstairs/src/downstairs.rs @@ -8,15 +8,16 @@ use std::{ use crate::{ cdt, client::{ClientAction, ClientRequest, ClientStopReason, DownstairsClient}, + guest::GuestWork, live_repair::ExtentInfo, stats::UpStatOuter, upstairs::{UpstairsConfig, UpstairsState}, AckStatus, ActiveJobs, AllocRingBuffer, ClientData, ClientIOStateCount, ClientId, ClientMap, CrucibleError, DownstairsIO, DownstairsMend, DsState, - ExtentFix, ExtentRepairIDs, GtoS, GuestWork, GuestWorkId, IOState, - IOStateCount, IOop, ImpactedBlocks, JobId, Message, RawMessage, - ReadRequest, ReadResponse, ReconcileIO, ReconciliationId, RegionDefinition, - ReplaceResult, SerializedWrite, SnapshotDetails, WorkSummary, + ExtentFix, ExtentRepairIDs, GuestWorkId, IOState, IOStateCount, IOop, + ImpactedBlocks, JobId, Message, RawMessage, ReadRequest, ReadResponse, + ReconcileIO, ReconciliationId, RegionDefinition, ReplaceResult, + SerializedWrite, SnapshotDetails, WorkSummary, }; use crucible_common::MAX_ACTIVE_COUNT; @@ -1250,15 +1251,15 @@ impl Downstairs { || (repair.aborting_repair && !have_reserved_jobs) { // We're done, submit a final flush! - let gw_id = gw.next_gw_id(); - cdt::gw__flush__start!(|| (gw_id.0)); - - let flush_id = self.submit_flush(gw_id, None); + let (gw_id, flush_id) = gw.submit_job( + |gw_id| { + cdt::gw__flush__start!(|| (gw_id.0)); + self.submit_flush(gw_id, None) + }, + None, + None, + ); info!(self.log, "LiveRepair final flush submitted"); - - let new_gtos = GtoS::new(flush_id, None, None); - gw.active.insert(gw_id, new_gtos); - cdt::up__to__ds__flush__start!(|| (gw_id.0)); LiveRepairState::FinalFlush { flush_id } @@ -1310,8 +1311,7 @@ impl Downstairs { let nio = Self::create_noop_io(noop_id, deps, gw_noop_id); cdt::gw__noop__start!(|| (gw_noop_id.0)); - let new_gtos = GtoS::new(noop_id, None, None); - gw.active.insert(gw_noop_id, new_gtos); + gw.insert(gw_noop_id, noop_id, None, None); self.enqueue_repair(nio); } @@ -1356,8 +1356,7 @@ impl Downstairs { cdt::gw__repair__start!(|| (gw_repair_id.0, eid)); - let new_gtos = GtoS::new(repair_id, None, None); - gw.active.insert(gw_repair_id, new_gtos); + gw.insert(gw_repair_id, repair_id, None, None); self.enqueue_repair(repair_io); } @@ -1614,8 +1613,7 @@ impl Downstairs { cdt::gw__reopen__start!(|| (gw_reopen_id.0, eid)); - let new_gtos = GtoS::new(reopen_id, None, None); - gw.active.insert(gw_reopen_id, new_gtos); + gw.insert(gw_reopen_id, reopen_id, None, None); self.enqueue_repair(reopen_io); } @@ -1797,8 +1795,7 @@ impl Downstairs { ); cdt::gw__close__start!(|| (gw_close_id.0, eid)); - let new_gtos = GtoS::new(close_id, None, None); - gw.active.insert(gw_close_id, new_gtos); + gw.insert(gw_close_id, close_id, None, None); self.enqueue_repair(close_io); } @@ -3548,11 +3545,12 @@ pub(crate) mod test { use super::Downstairs; use crate::{ downstairs::{LiveRepairData, LiveRepairState}, + guest::GuestWork, live_repair::ExtentInfo, upstairs::UpstairsState, - ClientId, CrucibleError, DownstairsIO, DsState, ExtentFix, GuestWork, - GuestWorkId, IOState, IOop, ImpactedAddr, ImpactedBlocks, JobId, - ReadResponse, ReconcileIO, ReconciliationId, SnapshotDetails, + ClientId, CrucibleError, DownstairsIO, DsState, ExtentFix, GuestWorkId, + IOState, IOop, ImpactedAddr, ImpactedBlocks, JobId, ReadResponse, + ReconcileIO, ReconciliationId, SnapshotDetails, }; use bytes::Bytes; diff --git a/upstairs/src/dummy_downstairs_tests.rs b/upstairs/src/dummy_downstairs_tests.rs index 5e5298755..8f9871011 100644 --- a/upstairs/src/dummy_downstairs_tests.rs +++ b/upstairs/src/dummy_downstairs_tests.rs @@ -7,12 +7,12 @@ pub(crate) mod protocol_test { use std::sync::Arc; use std::time::Duration; + use crate::guest::Guest; use crate::up_main; use crate::BlockContext; use crate::BlockIO; use crate::Buffer; use crate::CrucibleError; - use crate::Guest; use crate::IO_OUTSTANDING_MAX; use crate::MAX_ACTIVE_COUNT; use crucible_client_types::CrucibleOpts; @@ -523,7 +523,7 @@ pub(crate) mod protocol_test { // Configure our guest without queue backpressure, to speed up tests // which require triggering a timeout let (g, mut io) = Guest::new(Some(log.clone())); - io.backpressure_config.queue_max_delay = Duration::ZERO; + io.disable_queue_backpressure(); let guest = Arc::new(g); let crucible_opts = CrucibleOpts { diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs index bbbb853bf..1605a764c 100644 --- a/upstairs/src/guest.rs +++ b/upstairs/src/guest.rs @@ -173,23 +173,52 @@ impl std::fmt::Display for GuestWorkId { * it should probably be a ring buffer. */ #[derive(Debug)] -struct GuestWork { +pub struct GuestWork { active: HashMap, next_gw_id: u64, completed: AllocRingBuffer, } impl GuestWork { - fn is_empty(&self) -> bool { + pub fn is_empty(&self) -> bool { self.active.is_empty() } - fn next_gw_id(&mut self) -> GuestWorkId { + pub fn len(&self) -> usize { + self.active.len() + } + + /// Helper function to install new work into the map + pub(crate) fn submit_job JobId>( + &mut self, + f: F, + guest_buffer: Option, + res: Option, + ) -> (GuestWorkId, JobId) { + let gw_id = self.next_gw_id(); + let ds_id = f(gw_id); + + self.insert(gw_id, ds_id, guest_buffer, res); + (gw_id, ds_id) + } + + pub(crate) fn next_gw_id(&mut self) -> GuestWorkId { let id = self.next_gw_id; self.next_gw_id += 1; GuestWorkId(id) } + pub(crate) fn insert( + &mut self, + gw_id: GuestWorkId, + ds_id: JobId, + guest_buffer: Option, + res: Option, + ) { + let new_gtos = GtoS::new(ds_id, guest_buffer, res); + self.active.insert(gw_id, new_gtos); + } + /* * When the required number of completions for a downstairs * ds_id have arrived, we call this method on the parent GuestWork @@ -205,7 +234,7 @@ impl GuestWork { * This may include moving data buffers from completed reads. */ #[instrument] - async fn gw_ds_complete( + pub async fn gw_ds_complete( &mut self, gw_id: GuestWorkId, ds_id: JobId, @@ -230,6 +259,13 @@ impl GuestWork { panic!("gw_id {} for job {} not on active list", gw_id, ds_id); } } + + pub fn print_last_completed(&self, n: usize) { + print!("Upstairs last five completed: "); + for j in self.completed.iter().rev().take(n) { + print!(" {:4}", j); + } + } } impl Default for GuestWork { @@ -368,6 +404,8 @@ impl Guest { /* * This is used to submit a new BlockOp IO request to Crucible. + * + * It's public for testing, but shouldn't be called */ async fn send(&self, op: BlockOp) -> BlockReqWaiter { let (brw, res) = BlockReqWaiter::pair(); @@ -740,13 +778,30 @@ pub struct GuestIoHandle { /// It is during this process that data will encrypted. For a read, the /// data is decrypted back to the guest provided buffer after all the /// required downstairs operations are completed. - guest_work: GuestWork, + pub guest_work: GuestWork, /// Log handle, mainly to pass it into the [`Upstairs`] pub log: Logger, } impl GuestIoHandle { + pub fn iop_tokens(&self) -> usize { + self.iop_tokens + } + + /// Leaks IOP and BW tokens + pub fn leak_check(&mut self, leak_ms: usize) { + if let Some(iop_limit_cfg) = self.limits.iop_limit { + let tokens = iop_limit_cfg.iop_limit / (1000 / leak_ms); + self.leak_iop_tokens(tokens); + } + + if let Some(bw_limit) = self.limits.bw_limit { + let tokens = bw_limit / (1000 / leak_ms); + self.leak_bw_tokens(tokens); + } + } + /// Leak IOPs tokens fn leak_iop_tokens(&mut self, tokens: usize) { self.iop_tokens = self.iop_tokens.saturating_sub(tokens); @@ -768,7 +823,7 @@ impl GuestIoHandle { /// branch** of a `select!` statement that _also_ includes at least one /// timeout; we should use that timeout to periodically service the IOP / BW /// token counters, which will unblock the `GuestIoHandle` in future calls. - async fn recv(&mut self) -> UpstairsAction { + pub(crate) async fn recv(&mut self) -> UpstairsAction { let req = if self.req_limited { futures::future::pending().await } else if let Some(req) = self.req_head.take() { @@ -843,8 +898,13 @@ impl GuestIoHandle { } } + #[cfg(test)] + pub fn disable_queue_backpressure(&mut self) { + self.backpressure_config.queue_max_delay = Duration::ZERO; + } + /// Set `self.backpressure_us` based on outstanding IO ratio - fn set_backpressure(&self, bytes: u64, ratio: f64) { + pub fn set_backpressure(&self, bytes: u64, ratio: f64) { // Check to see if the number of outstanding write bytes (between // the upstairs and downstairs) is particularly high. If so, // apply some backpressure by delaying host operations, with a @@ -882,6 +942,31 @@ impl GuestIoHandle { pub fn active_count(&self) -> usize { self.guest_work.active.len() } + + /// Looks up current backpressure + pub fn backpressure_us(&self) -> u64 { + self.backpressure_us + .load(std::sync::atomic::Ordering::Acquire) + } + + /// Debug function to dump the guest work structure. + /// + /// TODO: make this one big dump, where we include the up.work.active + /// printing for each guest_work. It will be much more dense, but require + /// holding both locks for the duration. + pub(crate) fn show_work(&self) -> usize { + println!("Guest work: Active and Completed Jobs:"); + let gw = &self.guest_work; + let mut kvec: Vec<_> = gw.active.keys().cloned().collect(); + kvec.sort_unstable(); + for id in kvec.iter() { + let job = gw.active.get(id).unwrap(); + println!("GW_JOB active:[{:04}] D:{:?} ", id, job.ds_id); + } + let done = gw.completed.to_vec(); + println!("GW_JOB completed count:{:?} ", done.len()); + kvec.len() + } } /* @@ -894,25 +979,431 @@ pub struct WQCounts { pub active_count: usize, } -/* - * Debug function to dump the guest work structure. - * This does a bit while holding the mutex, so don't expect performance - * to get better when calling it. - * - * TODO: make this one big dump, where we include the up.work.active - * printing for each guest_work. It will be much more dense, but require - * holding both locks for the duration. - */ -pub(crate) fn show_guest_work(guest: &GuestIoHandle) -> usize { - println!("Guest work: Active and Completed Jobs:"); - let gw = &guest.guest_work; - let mut kvec: Vec<_> = gw.active.keys().cloned().collect(); - kvec.sort_unstable(); - for id in kvec.iter() { - let job = gw.active.get(id).unwrap(); - println!("GW_JOB active:[{:04}] D:{:?} ", id, job.ds_id); - } - let done = gw.completed.to_vec(); - println!("GW_JOB completed count:{:?} ", done.len()); - kvec.len() +#[cfg(test)] +mod test { + use super::*; + use anyhow::Result; + + async fn assert_consumed(io: &mut GuestIoHandle) { + tokio::select! { + _ = io.recv() => { + // correct! + }, + _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => { + panic!("timed out while waiting for message"); + } + } + } + + async fn assert_none_consumed(io: &mut GuestIoHandle) { + tokio::select! { + _ = io.recv() => { + panic!("got message when expecting nothing") + }, + _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => { + // nothing to do here + } + } + } + + #[tokio::test] + async fn test_no_iop_limit() -> Result<()> { + let (guest, mut io) = Guest::new(None); + assert_none_consumed(&mut io).await; + + // Don't use guest.read, that will send a block size query that will + // never be answered. + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(1, 512), + }) + .await; + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(8, 512), + }) + .await; + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(32, 512), + }) + .await; + + // With no IOP limit, all requests are consumed immediately + assert_consumed(&mut io).await; + assert_consumed(&mut io).await; + assert_consumed(&mut io).await; + + assert_none_consumed(&mut io).await; + + // If no IOP limit set, don't track it + assert_eq!(io.iop_tokens, 0); + + Ok(()) + } + + #[tokio::test] + async fn test_iop_limit() -> Result<()> { + let (guest, mut io) = Guest::new(None); + io.set_iop_limit(16000, 2); + + assert_none_consumed(&mut io).await; + + // Don't use guest.read, that will send a block size query that will + // never be answered. + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(1, 512), + }) + .await; + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(8, 512), + }) + .await; + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(31, 512), + }) + .await; + + // First two reads succeed + assert_consumed(&mut io).await; + assert_consumed(&mut io).await; + + // Next cannot be consumed until there's available IOP tokens so it + // remains in the queue. Strictly speaking, it's been popped to the + // `req_head` position. + assert_none_consumed(&mut io).await; + assert!(io.req_rx.try_recv().is_err()); + assert_eq!(io.iop_tokens, 2); + assert!(io.req_head.is_some()); + + // Replenish one token, meaning next read can be consumed + io.leak_iop_tokens(1); + assert_eq!(io.iop_tokens, 1); + + assert_consumed(&mut io).await; + assert!(io.req_rx.try_recv().is_err()); + assert!(io.req_head.is_none()); + assert_eq!(io.iop_tokens, 2); + + io.leak_iop_tokens(2); + assert_eq!(io.iop_tokens, 0); + + io.leak_iop_tokens(16000); + assert_eq!(io.iop_tokens, 0); + + Ok(()) + } + + #[tokio::test] + async fn test_flush_does_not_consume_iops() -> Result<()> { + let (guest, mut io) = Guest::new(None); + + // Set 0 as IOP limit + io.set_iop_limit(16000, 0); + assert_none_consumed(&mut io).await; + + let _ = guest + .send(BlockOp::Flush { + snapshot_details: None, + }) + .await; + let _ = guest + .send(BlockOp::Flush { + snapshot_details: None, + }) + .await; + let _ = guest + .send(BlockOp::Flush { + snapshot_details: None, + }) + .await; + + assert_consumed(&mut io).await; + assert_consumed(&mut io).await; + assert_consumed(&mut io).await; + + assert_none_consumed(&mut io).await; + + Ok(()) + } + + #[tokio::test] + async fn test_set_bw_limit() -> Result<()> { + let (guest, mut io) = Guest::new(None); + io.set_bw_limit(1024 * 1024); // 1 KiB + + assert_none_consumed(&mut io).await; + + // Don't use guest.read, that will send a block size query that will + // never be answered. + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(1024, 512), // 512 KiB + }) + .await; + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(1024, 512), + }) + .await; + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(1024, 512), + }) + .await; + + // First two reads succeed + assert_consumed(&mut io).await; + assert_consumed(&mut io).await; + + // Next cannot be consumed until there's available BW tokens so it + // remains in the queue. + assert_none_consumed(&mut io).await; + assert!(io.req_rx.try_recv().is_err()); + assert!(io.req_head.is_some()); + assert_eq!(io.bw_tokens, 1024 * 1024); + + // Replenish enough tokens, meaning next read can be consumed + io.leak_bw_tokens(1024 * 1024 / 2); + assert_eq!(io.bw_tokens, 1024 * 1024 / 2); + + assert_consumed(&mut io).await; + assert!(io.req_rx.try_recv().is_err()); + assert!(io.req_head.is_none()); + assert_eq!(io.bw_tokens, 1024 * 1024); + + io.leak_bw_tokens(1024 * 1024); + assert_eq!(io.bw_tokens, 0); + + io.leak_bw_tokens(1024 * 1024 * 1024); + assert_eq!(io.bw_tokens, 0); + + Ok(()) + } + + #[tokio::test] + async fn test_flush_does_not_consume_bw() -> Result<()> { + let (guest, mut io) = Guest::new(None); + + // Set 0 as bandwidth limit + io.set_bw_limit(0); + assert_none_consumed(&mut io).await; + + let _ = guest + .send(BlockOp::Flush { + snapshot_details: None, + }) + .await; + let _ = guest + .send(BlockOp::Flush { + snapshot_details: None, + }) + .await; + let _ = guest + .send(BlockOp::Flush { + snapshot_details: None, + }) + .await; + + assert_consumed(&mut io).await; + assert_consumed(&mut io).await; + assert_consumed(&mut io).await; + + assert_none_consumed(&mut io).await; + + Ok(()) + } + + #[tokio::test] + async fn test_iop_and_bw_limit() -> Result<()> { + let (guest, mut io) = Guest::new(None); + + io.set_iop_limit(16384, 500); // 1 IOP is 16 KiB + io.set_bw_limit(6400 * 1024); // 16384 B * 400 = 6400 KiB/s + assert_none_consumed(&mut io).await; + + // Don't use guest.read, that will send a block size query that will + // never be answered. + + // Validate that BW limit activates by sending two 7000 KiB IOs. 7000 + // KiB is only 437.5 IOPs + + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(14000, 512), // 7000 KiB + }) + .await; + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(14000, 512), // 7000 KiB + }) + .await; + + assert_consumed(&mut io).await; + assert_none_consumed(&mut io).await; + + // Assert we've hit the BW limit before IOPS + assert_eq!(io.iop_tokens, 438); // 437.5 rounded up + assert_eq!(io.bw_tokens, 7000 * 1024); + + io.leak_iop_tokens(438); + io.leak_bw_tokens(7000 * 1024); + + assert_consumed(&mut io).await; + + // Everything should be empty now + assert!(io.req_rx.try_recv().is_err()); + assert!(io.req_head.is_none()); + + // Back to zero + io.leak_iop_tokens(438); + io.leak_bw_tokens(7000 * 1024); + + assert_eq!(io.iop_tokens, 0); + assert_eq!(io.bw_tokens, 0); + + // Validate that IOP limit activates by sending 501 1024b IOs + for _ in 0..500 { + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(2, 512), + }) + .await; + assert_consumed(&mut io).await; + } + + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(2, 512), + }) + .await; + assert_none_consumed(&mut io).await; + + // Assert we've hit the IOPS limit + assert_eq!(io.iop_tokens, 500); + assert_eq!(io.bw_tokens, 500 * 1024); + + // Back to zero + io.leak_iop_tokens(500); + io.leak_bw_tokens(500 * 1024); + + // Remove the 501st request + assert!(io.req_head.take().is_some()); + assert!(io.req_rx.try_recv().is_err()); + assert_eq!(io.iop_tokens, 0); + assert_eq!(io.bw_tokens, 0); + + // From + // https://aws.amazon.com/premiumsupport/knowledge-center/ebs-calculate-optimal-io-size/: + // + // Amazon EBS calculates the optimal I/O size using the following + // equation: throughput / number of IOPS = optimal I/O size. + + let optimal_io_size: usize = 6400 * 1024 / 500; + + // Round down to the nearest size in blocks + let optimal_io_size = (optimal_io_size / 512) * 512; + + // Make sure this is <= an IOP size + assert!(optimal_io_size <= 16384); + + // I mean, it makes sense: now we submit 500 of those to reach both + // limits at the same time. + for i in 0..500 { + assert_eq!(io.iop_tokens, i); + assert_eq!(io.bw_tokens, i * optimal_io_size); + assert_eq!(optimal_io_size % 512, 0); + + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(optimal_io_size / 512, 512), + }) + .await; + + assert_consumed(&mut io).await; + } + + assert_eq!(io.iop_tokens, 500); + assert_eq!(io.bw_tokens, 500 * optimal_io_size); + + Ok(()) + } + + // Is it possible to submit an IO that will never be sent? It shouldn't be! + #[tokio::test] + async fn test_impossible_io() -> Result<()> { + let (guest, mut io) = Guest::new(None); + + io.set_iop_limit(1024 * 1024 / 2, 10); // 1 IOP is half a KiB + io.set_bw_limit(1024 * 1024); // 1 KiB + assert_none_consumed(&mut io).await; + + // Sending an IO of 10 MiB is larger than the bandwidth limit and + // represents 20 IOPs, larger than the IOP limit. + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(20480, 512), // 10 MiB + }) + .await; + let _ = guest + .send(BlockOp::Read { + offset: Block::new_512(0), + data: Buffer::new(0, 512), + }) + .await; + + assert_eq!(io.iop_tokens, 0); + assert_eq!(io.bw_tokens, 0); + + // Even though the first IO is larger than the bandwidth and IOP limit, + // it should still succeed. The next IO should not, even if it consumes + // nothing, because the iops and bw tokens will be larger than the limit + // for a while (until they leak enough). + + assert_consumed(&mut io).await; + assert_none_consumed(&mut io).await; + + assert_eq!(io.iop_tokens, 20); + assert_eq!(io.bw_tokens, 10 * 1024 * 1024); + + // Bandwidth trigger is going to be larger and need more leaking to get + // down to a point where the zero sized IO can fire. + for _ in 0..9 { + io.leak_iop_tokens(10); + io.leak_bw_tokens(1024 * 1024); + + assert_none_consumed(&mut io).await; + } + + assert_eq!(io.iop_tokens, 0); + assert_eq!(io.bw_tokens, 1024 * 1024); + + assert_none_consumed(&mut io).await; + + io.leak_iop_tokens(10); + io.leak_bw_tokens(1024 * 1024); + + // We've leaked 10 KiB worth, it should fire now! + assert_consumed(&mut io).await; + + Ok(()) + } } diff --git a/upstairs/src/lib.rs b/upstairs/src/lib.rs index fe0562ea0..33d2b4da8 100644 --- a/upstairs/src/lib.rs +++ b/upstairs/src/lib.rs @@ -54,7 +54,8 @@ pub use mend::{DownstairsMend, ExtentFix, RegionMetadata}; pub use pseudo_file::CruciblePseudoFile; pub(crate) mod guest; -use guest::{GuestIoHandle, GuestWorkId, WQCounts}; +pub use guest::{Guest, WQCounts}; +use guest::{GuestIoHandle, GuestWorkId}; mod stats; diff --git a/upstairs/src/test.rs b/upstairs/src/test.rs index aba3586db..826db9c8b 100644 --- a/upstairs/src/test.rs +++ b/upstairs/src/test.rs @@ -1,5 +1,6 @@ // Copyright 2023 Oxide Computer Company +use crate::guest::Guest; use crate::*; /* @@ -694,429 +695,6 @@ pub(crate) mod up_test { Ok(()) } - async fn assert_consumed(io: &mut GuestIoHandle) { - tokio::select! { - _ = io.recv() => { - // correct! - }, - _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => { - panic!("timed out while waiting for message"); - } - } - } - - async fn assert_none_consumed(io: &mut GuestIoHandle) { - tokio::select! { - _ = io.recv() => { - panic!("got message when expecting nothing") - }, - _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => { - // nothing to do here - } - } - } - - #[tokio::test] - async fn test_no_iop_limit() -> Result<()> { - let (guest, mut io) = Guest::new(None); - assert_none_consumed(&mut io).await; - - // Don't use guest.read, that will send a block size query that will - // never be answered. - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(1, 512), - }) - .await; - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(8, 512), - }) - .await; - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(32, 512), - }) - .await; - - // With no IOP limit, all requests are consumed immediately - assert_consumed(&mut io).await; - assert_consumed(&mut io).await; - assert_consumed(&mut io).await; - - assert_none_consumed(&mut io).await; - - // If no IOP limit set, don't track it - assert_eq!(io.iop_tokens, 0); - - Ok(()) - } - - #[tokio::test] - async fn test_iop_limit() -> Result<()> { - let (guest, mut io) = Guest::new(None); - io.set_iop_limit(16000, 2); - - assert_none_consumed(&mut io).await; - - // Don't use guest.read, that will send a block size query that will - // never be answered. - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(1, 512), - }) - .await; - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(8, 512), - }) - .await; - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(31, 512), - }) - .await; - - // First two reads succeed - assert_consumed(&mut io).await; - assert_consumed(&mut io).await; - - // Next cannot be consumed until there's available IOP tokens so it - // remains in the queue. Strictly speaking, it's been popped to the - // `req_head` position. - assert_none_consumed(&mut io).await; - assert!(io.req_rx.try_recv().is_err()); - assert_eq!(io.iop_tokens, 2); - assert!(io.req_head.is_some()); - - // Replenish one token, meaning next read can be consumed - io.leak_iop_tokens(1); - assert_eq!(io.iop_tokens, 1); - - assert_consumed(&mut io).await; - assert!(io.req_rx.try_recv().is_err()); - assert!(io.req_head.is_none()); - assert_eq!(io.iop_tokens, 2); - - io.leak_iop_tokens(2); - assert_eq!(io.iop_tokens, 0); - - io.leak_iop_tokens(16000); - assert_eq!(io.iop_tokens, 0); - - Ok(()) - } - - #[tokio::test] - async fn test_flush_does_not_consume_iops() -> Result<()> { - let (guest, mut io) = Guest::new(None); - - // Set 0 as IOP limit - io.set_iop_limit(16000, 0); - assert_none_consumed(&mut io).await; - - let _ = guest - .send(BlockOp::Flush { - snapshot_details: None, - }) - .await; - let _ = guest - .send(BlockOp::Flush { - snapshot_details: None, - }) - .await; - let _ = guest - .send(BlockOp::Flush { - snapshot_details: None, - }) - .await; - - assert_consumed(&mut io).await; - assert_consumed(&mut io).await; - assert_consumed(&mut io).await; - - assert_none_consumed(&mut io).await; - - Ok(()) - } - - #[tokio::test] - async fn test_set_bw_limit() -> Result<()> { - let (guest, mut io) = Guest::new(None); - io.set_bw_limit(1024 * 1024); // 1 KiB - - assert_none_consumed(&mut io).await; - - // Don't use guest.read, that will send a block size query that will - // never be answered. - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(1024, 512), // 512 KiB - }) - .await; - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(1024, 512), - }) - .await; - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(1024, 512), - }) - .await; - - // First two reads succeed - assert_consumed(&mut io).await; - assert_consumed(&mut io).await; - - // Next cannot be consumed until there's available BW tokens so it - // remains in the queue. - assert_none_consumed(&mut io).await; - assert!(io.req_rx.try_recv().is_err()); - assert!(io.req_head.is_some()); - assert_eq!(io.bw_tokens, 1024 * 1024); - - // Replenish enough tokens, meaning next read can be consumed - io.leak_bw_tokens(1024 * 1024 / 2); - assert_eq!(io.bw_tokens, 1024 * 1024 / 2); - - assert_consumed(&mut io).await; - assert!(io.req_rx.try_recv().is_err()); - assert!(io.req_head.is_none()); - assert_eq!(io.bw_tokens, 1024 * 1024); - - io.leak_bw_tokens(1024 * 1024); - assert_eq!(io.bw_tokens, 0); - - io.leak_bw_tokens(1024 * 1024 * 1024); - assert_eq!(io.bw_tokens, 0); - - Ok(()) - } - - #[tokio::test] - async fn test_flush_does_not_consume_bw() -> Result<()> { - let (guest, mut io) = Guest::new(None); - - // Set 0 as bandwidth limit - io.set_bw_limit(0); - assert_none_consumed(&mut io).await; - - let _ = guest - .send(BlockOp::Flush { - snapshot_details: None, - }) - .await; - let _ = guest - .send(BlockOp::Flush { - snapshot_details: None, - }) - .await; - let _ = guest - .send(BlockOp::Flush { - snapshot_details: None, - }) - .await; - - assert_consumed(&mut io).await; - assert_consumed(&mut io).await; - assert_consumed(&mut io).await; - - assert_none_consumed(&mut io).await; - - Ok(()) - } - - #[tokio::test] - async fn test_iop_and_bw_limit() -> Result<()> { - let (guest, mut io) = Guest::new(None); - - io.set_iop_limit(16384, 500); // 1 IOP is 16 KiB - io.set_bw_limit(6400 * 1024); // 16384 B * 400 = 6400 KiB/s - assert_none_consumed(&mut io).await; - - // Don't use guest.read, that will send a block size query that will - // never be answered. - - // Validate that BW limit activates by sending two 7000 KiB IOs. 7000 - // KiB is only 437.5 IOPs - - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(14000, 512), // 7000 KiB - }) - .await; - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(14000, 512), // 7000 KiB - }) - .await; - - assert_consumed(&mut io).await; - assert_none_consumed(&mut io).await; - - // Assert we've hit the BW limit before IOPS - assert_eq!(io.iop_tokens, 438); // 437.5 rounded up - assert_eq!(io.bw_tokens, 7000 * 1024); - - io.leak_iop_tokens(438); - io.leak_bw_tokens(7000 * 1024); - - assert_consumed(&mut io).await; - - // Everything should be empty now - assert!(io.req_rx.try_recv().is_err()); - assert!(io.req_head.is_none()); - - // Back to zero - io.leak_iop_tokens(438); - io.leak_bw_tokens(7000 * 1024); - - assert_eq!(io.iop_tokens, 0); - assert_eq!(io.bw_tokens, 0); - - // Validate that IOP limit activates by sending 501 1024b IOs - for _ in 0..500 { - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(2, 512), - }) - .await; - assert_consumed(&mut io).await; - } - - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(2, 512), - }) - .await; - assert_none_consumed(&mut io).await; - - // Assert we've hit the IOPS limit - assert_eq!(io.iop_tokens, 500); - assert_eq!(io.bw_tokens, 500 * 1024); - - // Back to zero - io.leak_iop_tokens(500); - io.leak_bw_tokens(500 * 1024); - - // Remove the 501st request - assert!(io.req_head.take().is_some()); - assert!(io.req_rx.try_recv().is_err()); - assert_eq!(io.iop_tokens, 0); - assert_eq!(io.bw_tokens, 0); - - // From - // https://aws.amazon.com/premiumsupport/knowledge-center/ebs-calculate-optimal-io-size/: - // - // Amazon EBS calculates the optimal I/O size using the following - // equation: throughput / number of IOPS = optimal I/O size. - - let optimal_io_size: usize = 6400 * 1024 / 500; - - // Round down to the nearest size in blocks - let optimal_io_size = (optimal_io_size / 512) * 512; - - // Make sure this is <= an IOP size - assert!(optimal_io_size <= 16384); - - // I mean, it makes sense: now we submit 500 of those to reach both - // limits at the same time. - for i in 0..500 { - assert_eq!(io.iop_tokens, i); - assert_eq!(io.bw_tokens, i * optimal_io_size); - assert_eq!(optimal_io_size % 512, 0); - - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(optimal_io_size / 512, 512), - }) - .await; - - assert_consumed(&mut io).await; - } - - assert_eq!(io.iop_tokens, 500); - assert_eq!(io.bw_tokens, 500 * optimal_io_size); - - Ok(()) - } - - // Is it possible to submit an IO that will never be sent? It shouldn't be! - #[tokio::test] - async fn test_impossible_io() -> Result<()> { - let (guest, mut io) = Guest::new(None); - - io.set_iop_limit(1024 * 1024 / 2, 10); // 1 IOP is half a KiB - io.set_bw_limit(1024 * 1024); // 1 KiB - assert_none_consumed(&mut io).await; - - // Sending an IO of 10 MiB is larger than the bandwidth limit and - // represents 20 IOPs, larger than the IOP limit. - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(20480, 512), // 10 MiB - }) - .await; - let _ = guest - .send(BlockOp::Read { - offset: Block::new_512(0), - data: Buffer::new(0, 512), - }) - .await; - - assert_eq!(io.iop_tokens, 0); - assert_eq!(io.bw_tokens, 0); - - // Even though the first IO is larger than the bandwidth and IOP limit, - // it should still succeed. The next IO should not, even if it consumes - // nothing, because the iops and bw tokens will be larger than the limit - // for a while (until they leak enough). - - assert_consumed(&mut io).await; - assert_none_consumed(&mut io).await; - - assert_eq!(io.iop_tokens, 20); - assert_eq!(io.bw_tokens, 10 * 1024 * 1024); - - // Bandwidth trigger is going to be larger and need more leaking to get - // down to a point where the zero sized IO can fire. - for _ in 0..9 { - io.leak_iop_tokens(10); - io.leak_bw_tokens(1024 * 1024); - - assert_none_consumed(&mut io).await; - } - - assert_eq!(io.iop_tokens, 0); - assert_eq!(io.bw_tokens, 1024 * 1024); - - assert_none_consumed(&mut io).await; - - io.leak_iop_tokens(10); - io.leak_bw_tokens(1024 * 1024); - - // We've leaked 10 KiB worth, it should fire now! - assert_consumed(&mut io).await; - - Ok(()) - } - #[tokio::test] async fn send_io_live_repair_read() { // Check the send_io_live_repair for a read below extent limit, diff --git a/upstairs/src/upstairs.rs b/upstairs/src/upstairs.rs index c240a9293..6e76464b0 100644 --- a/upstairs/src/upstairs.rs +++ b/upstairs/src/upstairs.rs @@ -13,7 +13,7 @@ use crate::{ extent_from_offset, stats::UpStatOuter, Block, BlockOp, BlockReq, BlockRes, Buffer, Bytes, ClientId, ClientMap, - CrucibleOpts, DsState, EncryptionContext, GtoS, GuestIoHandle, Message, + CrucibleOpts, DsState, EncryptionContext, GuestIoHandle, Message, RegionDefinition, RegionDefinitionStatus, SnapshotDetails, WQCounts, }; use crucible_common::CrucibleError; @@ -25,7 +25,6 @@ use std::sync::{ }; use futures::future::{pending, Either}; -use ringbuffer::RingBuffer; use slog::{debug, error, info, o, warn, Logger}; use tokio::{ sync::mpsc, @@ -415,7 +414,7 @@ impl Upstairs { }; let log = crucible_common::build_logger(); - let (_guest, io) = crate::Guest::new(Some(log.clone())); + let (_guest, io) = crate::guest::Guest::new(Some(log.clone())); Self::new(&opts, 0, ddef, io, None) } @@ -545,18 +544,9 @@ impl Upstairs { .counters .action_leak_check)); const LEAK_MS: usize = 1000; + self.guest.leak_check(LEAK_MS); let leak_tick = tokio::time::Duration::from_millis(LEAK_MS as u64); - if let Some(iop_limit_cfg) = self.guest.limits.iop_limit { - let tokens = iop_limit_cfg.iop_limit / (1000 / LEAK_MS); - self.guest.leak_iop_tokens(tokens); - } - - if let Some(bw_limit) = self.guest.limits.bw_limit { - let tokens = bw_limit / (1000 / LEAK_MS); - self.guest.leak_bw_tokens(tokens); - } - self.leak_deadline = Instant::now().checked_add(leak_tick).unwrap(); } @@ -725,7 +715,7 @@ impl Upstairs { /// Fires the `up-status` DTrace probe fn on_stat_update(&self) { - let up_count = self.guest.guest_work.active.len() as u32; + let up_count = self.guest.guest_work.len() as u32; let up_counters = self.counters; let ds_count = self.downstairs.active_count() as u32; @@ -751,10 +741,7 @@ impl Upstairs { let ds_ro_lr_skipped = self.downstairs.collect_stats(|c| c.stats.ro_lr_skipped); - let up_backpressure = self - .guest - .backpressure_us - .load(std::sync::atomic::Ordering::Acquire); + let up_backpressure = self.guest.backpressure_us(); let write_bytes_out = self.downstairs.write_bytes_outstanding(); cdt::up__status!(|| { @@ -786,7 +773,7 @@ impl Upstairs { match c { ControlRequest::UpstairsStats(tx) => { let ds_state = self.downstairs.collect_stats(|c| c.state()); - let up_jobs = self.guest.guest_work.active.len(); + let up_jobs = self.guest.guest_work.len(); let ds_jobs = self.downstairs.active_count(); let reconcile_done = self.downstairs.reconcile_repaired(); let reconcile_needed = @@ -1067,7 +1054,7 @@ impl Upstairs { .filter(|c| c.state() == DsState::Active) .count(); *data.lock().await = WQCounts { - up_count: self.guest.guest_work.active.len(), + up_count: self.guest.guest_work.len(), ds_count: self.downstairs.active_count(), active_count, }; @@ -1133,7 +1120,7 @@ impl Upstairs { ); if ds_count == 0 { if up_count != 0 { - crate::guest::show_guest_work(&self.guest); + self.guest.show_work(); } } else { self.downstairs.show_all_work() @@ -1150,13 +1137,7 @@ impl Upstairs { .filter(|c| c.state() == DsState::Active) .count(); - // TODO this is a ringbuffer, why are we turning it to a Vec to look at - // the last five items? - let up_done = self.guest.guest_work.completed.to_vec(); - print!("Upstairs last five completed: "); - for j in up_done.iter().rev().take(5) { - print!(" {:4}", j); - } + self.guest.guest_work.print_last_completed(5); println!(); WQCounts { @@ -1256,17 +1237,17 @@ impl Upstairs { * ID and the next_id are connected here, in that all future writes * should be flushed at the next flush ID. */ - let gw_id = self.guest.guest_work.next_gw_id(); - cdt::gw__flush__start!(|| (gw_id.0)); - - if snapshot_details.is_some() { - info!(self.log, "flush with snap requested"); - } - - let next_id = self.downstairs.submit_flush(gw_id, snapshot_details); - - let new_gtos = GtoS::new(next_id, None, res); - self.guest.guest_work.active.insert(gw_id, new_gtos); + let (gw_id, _) = self.guest.guest_work.submit_job( + |gw_id| { + cdt::gw__flush__start!(|| (gw_id.0)); + if snapshot_details.is_some() { + info!(self.log, "flush with snap requested"); + } + self.downstairs.submit_flush(gw_id, snapshot_details) + }, + None, + res, + ); cdt::up__to__ds__flush__start!(|| (gw_id.0)); } @@ -1336,17 +1317,14 @@ impl Upstairs { * Grab this ID after extent_from_offset: in case of Err we don't * want to create a gap in the IDs. */ - let gw_id = self.guest.guest_work.next_gw_id(); - cdt::gw__read__start!(|| (gw_id.0)); - - let next_id = self.downstairs.submit_read(gw_id, impacted_blocks, ddef); - - // New work created, add to the guest_work HM. It's fine to do this - // after submitting the job to the downstairs, because no one else is - // modifying the Upstairs right now; even if the job finishes - // instantaneously, it can't interrupt this function. - let new_gtos = GtoS::new(next_id, Some(data), res); - self.guest.guest_work.active.insert(gw_id, new_gtos); + let (gw_id, _) = self.guest.guest_work.submit_job( + |gw_id| { + cdt::gw__read__start!(|| (gw_id.0)); + self.downstairs.submit_read(gw_id, impacted_blocks, ddef) + }, + Some(data), + res, + ); cdt::up__to__ds__read__start!(|| (gw_id.0)); } @@ -1482,24 +1460,24 @@ impl Upstairs { * Grab this ID after extent_from_offset: in case of Err we don't * want to create a gap in the IDs. */ - let gw_id = self.guest.guest_work.next_gw_id(); - if write.is_write_unwritten { - cdt::gw__write__unwritten__start!(|| (gw_id.0)); - } else { - cdt::gw__write__start!(|| (gw_id.0)); - } - - let next_id = self.downstairs.submit_write( - gw_id, - write.impacted_blocks, - write.data, - write.is_write_unwritten, + let (gw_id, _) = self.guest.guest_work.submit_job( + |gw_id| { + if write.is_write_unwritten { + cdt::gw__write__unwritten__start!(|| (gw_id.0)); + } else { + cdt::gw__write__start!(|| (gw_id.0)); + } + self.downstairs.submit_write( + gw_id, + write.impacted_blocks, + write.data, + write.is_write_unwritten, + ) + }, + None, + write.res, ); - // New work created, add to the guest_work HM - let new_gtos = GtoS::new(next_id, None, write.res); - self.guest.guest_work.active.insert(gw_id, new_gtos); - if write.is_write_unwritten { cdt::up__to__ds__write__unwritten__start!(|| (gw_id.0)); } else { diff --git a/upstairs/src/volume.rs b/upstairs/src/volume.rs index 288657ffa..673a3cba5 100644 --- a/upstairs/src/volume.rs +++ b/upstairs/src/volume.rs @@ -1,6 +1,8 @@ // Copyright 2023 Oxide Computer Company use super::*; +use crate::guest::Guest; + use async_recursion::async_recursion; use oximeter::types::ProducerRegistry; use std::ops::Range; From 4128a81c414d97d8427b75f0eb212f2644df932a Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Tue, 30 Jan 2024 10:46:55 -0500 Subject: [PATCH 3/4] Minor tweaks --- upstairs/src/guest.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs index 1605a764c..ca826a549 100644 --- a/upstairs/src/guest.rs +++ b/upstairs/src/guest.rs @@ -202,12 +202,22 @@ impl GuestWork { (gw_id, ds_id) } + /// Low-level function to get next guest work ID + /// + /// Normally, `submit_job` should be called instead; this function should + /// only be used to reserve `GuestWorkId`s in advance of submitting the + /// jobs. pub(crate) fn next_gw_id(&mut self) -> GuestWorkId { let id = self.next_gw_id; self.next_gw_id += 1; GuestWorkId(id) } + /// Low-level function to insert work into the map + /// + /// Normally, `submit_job` should be called instead; this function should + /// only be used if we have reserved the `GuestWorkId` and `JobId` in + /// advance. pub(crate) fn insert( &mut self, gw_id: GuestWorkId, @@ -536,7 +546,7 @@ impl BlockIO for Guest { } async fn get_block_size(&self) -> Result { - let bs = self.block_size.load(std::sync::atomic::Ordering::Relaxed); + let bs = self.block_size.load(Ordering::Relaxed); if bs == 0 { let data = Arc::new(Mutex::new(0)); let size_query = BlockOp::QueryBlockSize { data: data.clone() }; @@ -546,8 +556,7 @@ impl BlockIO for Guest { reply.result?; let bs = *data.lock().await; - self.block_size - .store(bs, std::sync::atomic::Ordering::Relaxed); + self.block_size.store(bs, Ordering::Relaxed); Ok(bs) } else { Ok(bs) @@ -945,8 +954,7 @@ impl GuestIoHandle { /// Looks up current backpressure pub fn backpressure_us(&self) -> u64 { - self.backpressure_us - .load(std::sync::atomic::Ordering::Acquire) + self.backpressure_us.load(Ordering::Acquire) } /// Debug function to dump the guest work structure. From 24836e2c4d76be332fe33bc8f22f62e25ba0a96e Mon Sep 17 00:00:00 2001 From: Matt Keeter Date: Wed, 7 Feb 2024 11:05:37 -0500 Subject: [PATCH 4/4] Bump copyright --- upstairs/src/guest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/upstairs/src/guest.rs b/upstairs/src/guest.rs index ca826a549..f157b7817 100644 --- a/upstairs/src/guest.rs +++ b/upstairs/src/guest.rs @@ -1,4 +1,4 @@ -// Copyright 2023 Oxide Computer Company +// Copyright 2024 Oxide Computer Company use std::{ collections::HashMap, net::SocketAddr,