-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathclient.rs
3558 lines (3297 loc) · 128 KB
/
client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2023 Oxide Computer Company
use crate::{
cdt, deadline_secs, integrity_hash, live_repair::ExtentInfo,
upstairs::UpstairsConfig, upstairs::UpstairsState, ClientIOStateCount,
ClientId, CrucibleDecoder, CrucibleEncoder, CrucibleError, DownstairsIO,
DsState, EncryptionContext, IOState, IOop, JobId, Message, RawMessage,
ReadResponse, ReconcileIO, RegionDefinitionStatus, RegionMetadata,
MAX_ACTIVE_COUNT,
};
use crucible_common::x509::TLSContext;
use crucible_protocol::{ReconciliationId, CRUCIBLE_MESSAGE_VERSION};
use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};
use futures::StreamExt;
use slog::{debug, error, info, o, warn, Logger};
use tokio::{
io::AsyncWriteExt,
net::{TcpSocket, TcpStream},
sync::{mpsc, oneshot},
time::sleep_until,
};
use tokio_util::codec::{Encoder, FramedRead};
use uuid::Uuid;
const TIMEOUT_SECS: f32 = 50.0;
const PING_INTERVAL_SECS: f32 = 5.0;
#[derive(Debug)]
pub(crate) enum ClientRequest {
/// Normal message to be sent down the wire
Message(Message),
/// Pre-serialized message to be sent down the wire
RawMessage(RawMessage, bytes::Bytes),
}
impl ClientRequest {
/// Write the given message to an `AsyncWrite` sink
async fn write<W>(&self, fw: &mut W) -> Result<(), CrucibleError>
where
W: tokio::io::AsyncWrite
+ std::marker::Unpin
+ std::marker::Send
+ 'static,
{
match self {
ClientRequest::Message(m) => {
let mut out = bytes::BytesMut::new();
let mut e = CrucibleEncoder::new();
e.encode(m, &mut out)?;
fw.write_all(&out).await?;
}
ClientRequest::RawMessage(m, data) => {
// Manual implementation of CrucibleEncoder, for situations
// where the bulk of the message has already been
// pre-serialized.
let mut header = bincode::serialize(&(
0u32, // dummy length, to be patched later
&m,
))
.unwrap();
// Patch the length
let len: u32 = (header.len() + data.len()).try_into().unwrap();
header[0..4].copy_from_slice(&len.to_le_bytes());
// Patch the discriminant
bincode::serialize_into(&mut header[4..8], &m.discriminant())
.unwrap();
// write_all_vectored would save a syscall, but is nightly-only
fw.write_all(&header).await?;
fw.write_all(data).await?;
}
}
Ok(())
}
}
/// Handle to a running I/O task
///
/// The I/O task is "thin"; it simply forwards messages around. The task
/// continues running until `client_request_tx` is dropped, so it is always
/// valid to send it new messages.
#[derive(Debug)]
struct ClientTaskHandle {
/// Handle to send data to the I/O task
///
/// The only thing that we send to the client is [`Message`], which is then
/// sent out over the network.
client_request_tx: mpsc::Sender<ClientRequest>,
/// Handle to receive data from the I/O task
///
/// The client has a variety of responses, which include [`Message`]
/// replies, but also things like "the I/O task has stopped"
client_response_rx: mpsc::Receiver<ClientResponse>,
/// One-shot sender to ask the client to open its connection
///
/// This is used to hold the client (without connecting) in cases where we
/// have deliberately deactivated this client.
client_connect_tx: Option<oneshot::Sender<()>>,
/// One-shot sender to stop the client
///
/// This is a oneshot so that functions which stop the client don't
/// necessarily need to be `async`.
///
/// It is `None` if we have already requested that the client stop, but have
/// not yet seen the task finished.
client_stop_tx: Option<oneshot::Sender<ClientStopReason>>,
}
/// Per-client data
///
/// This data structure contains client-specific state and manages communication
/// with a per-client IO task (through the `ClientTaskHandle`).
#[derive(Debug)]
pub(crate) struct DownstairsClient {
/// Shared (static) configuration
cfg: Arc<UpstairsConfig>,
/// One's own client ID
client_id: ClientId,
/// Per-client log
pub(crate) log: Logger,
/// Client task IO
///
/// The client task always sends `ClientResponse::Done` before stopping;
/// this handle should never be dropped before that point.
client_task: ClientTaskHandle,
/// IO state counters
pub(crate) io_state_count: ClientIOStateCount,
/// UUID for this downstairs region
///
/// Unpopulated until provided by `Message::RegionInfo`
region_uuid: Option<Uuid>,
/// The IP:Port of each of the downstairs
///
/// This is left unpopulated in some unit tests
pub(crate) target_addr: Option<SocketAddr>,
/// The IP:Port for repair when contacting the downstairs
///
/// This is set to `None` during initialization
pub(crate) repair_addr: Option<SocketAddr>,
/// TLS context (if present)
///
/// This is passed as a pointer to minimize copies
tls_context: Option<Arc<crucible_common::x509::TLSContext>>,
/// State of the downstairs connection
state: DsState,
/// The `JobId` of the last flush that this downstairs has acked
///
/// Note that this is a job ID, not a downstairs flush index (contrast with
/// [`Downstairs::next_flush`], which is a flush index).
pub(crate) last_flush: JobId,
/// Cache of new jobs
new_jobs: BTreeSet<JobId>,
/// Jobs that have been skipped
pub(crate) skipped_jobs: BTreeSet<JobId>,
/// Region metadata for this particular Downstairs
///
/// On Startup, we collect info from each downstairs region. We use that
/// info to make sure that all three regions in a region set are the
/// same, and if not the same, to decide which data we will consider
/// valid and make the other downstairs contain that same data.
pub(crate) region_metadata: Option<RegionMetadata>,
/**
* Live Repair info
* This will contain the extent info for each downstairs as reported
* by those downstairs and is used to decide if an extent requires
* repair or not.
*/
pub(crate) repair_info: Option<ExtentInfo>,
/// Accumulated statistics
pub(crate) stats: DownstairsStats,
/// State for the "promote to active" action
promote_state: Option<PromoteState>,
/// State for startup negotiation
negotiation_state: NegotiationState,
}
impl DownstairsClient {
pub(crate) fn new(
client_id: ClientId,
cfg: Arc<UpstairsConfig>,
target_addr: Option<SocketAddr>,
log: Logger,
tls_context: Option<Arc<crucible_common::x509::TLSContext>>,
) -> Self {
Self {
cfg,
client_task: Self::new_io_task(
target_addr,
false, // do not delay in starting the task
false, // do not start the task until GoActive
client_id,
tls_context.clone(),
&log,
),
client_id,
region_uuid: None,
negotiation_state: NegotiationState::Start,
tls_context,
promote_state: None,
log,
target_addr,
repair_addr: None,
state: DsState::New,
last_flush: JobId(0),
stats: DownstairsStats::default(),
new_jobs: BTreeSet::new(),
skipped_jobs: BTreeSet::new(),
region_metadata: None,
repair_info: None,
io_state_count: ClientIOStateCount::new(),
}
}
/// Builds a minimal `DownstairsClient` for testing
///
/// The resulting client has no target address; any packets sent by the
/// client will disappear into the void.
#[cfg(test)]
fn test_default() -> Self {
let cfg = Arc::new(UpstairsConfig {
encryption_context: None,
upstairs_id: Uuid::new_v4(),
session_id: Uuid::new_v4(),
generation: std::sync::atomic::AtomicU64::new(1),
read_only: false,
lossy: false,
});
Self {
cfg,
client_task: Self::new_dummy_task(false),
client_id: ClientId::new(0),
region_uuid: None,
negotiation_state: NegotiationState::Start,
tls_context: None,
promote_state: None,
log: crucible_common::build_logger(),
target_addr: None,
repair_addr: None,
state: DsState::New,
last_flush: JobId(0),
stats: DownstairsStats::default(),
new_jobs: BTreeSet::new(),
skipped_jobs: BTreeSet::new(),
region_metadata: None,
repair_info: None,
io_state_count: ClientIOStateCount::new(),
}
}
/// Return true if `io_send` can send more work, otherwise return false
pub(crate) fn should_do_more_work(&self) -> bool {
!self.new_jobs.is_empty()
&& matches!(self.state, DsState::Active | DsState::LiveRepair)
&& (crate::MAX_ACTIVE_COUNT
- self.io_state_count.in_progress as usize)
> 0
}
/// Choose which `ClientAction` to apply
///
/// This function is called from within a top-level `select!`, so not only
/// must the select expressions be cancel safe, but the **bodies** must also
/// be cancel-safe. This is why we simply return a single value in the body
/// of each statement.
///
/// This function will wait forever if we have asked for the client task to
/// stop, so it should only be called in a higher-level `select!`.
pub(crate) async fn select(&mut self) -> ClientAction {
loop {
let out = match self.client_task.client_response_rx.recv().await {
Some(c) => c.into(),
None => break ClientAction::ChannelClosed,
};
// Ignore client responses if we have told the client to exit (we
// still propagate other ClientAction variants, e.g. TaskStopped).
if self.client_task.client_stop_tx.is_some()
|| !matches!(out, ClientAction::Response(..))
{
break out;
}
}
}
/// Send a `Message::HereIAm` via the client IO task
pub(crate) async fn send_here_i_am(&mut self) {
self.send_message(Message::HereIAm {
version: CRUCIBLE_MESSAGE_VERSION,
upstairs_id: self.cfg.upstairs_id,
session_id: self.cfg.session_id,
gen: self.cfg.generation(),
read_only: self.cfg.read_only,
encrypted: self.cfg.encrypted(),
alternate_versions: vec![],
})
.await;
}
fn halt_io_task(&mut self, r: ClientStopReason) {
if let Some(t) = self.client_task.client_stop_tx.take() {
if let Err(_e) = t.send(r) {
warn!(self.log, "failed to send stop request")
}
} else {
warn!(self.log, "client task is already stopping")
}
}
/// Return a list of downstairs request IDs that represent unissued
/// requests for this client.
///
/// Returns a tuple of `(jobs, flow control)` where flow control is true if
/// the jobs list has been clamped to `max_count`.
pub(crate) fn new_work(
&mut self,
max_count: usize,
) -> (BTreeSet<JobId>, bool) {
if max_count >= self.new_jobs.len() {
// Happy path: we can grab everything
(std::mem::take(&mut self.new_jobs), false)
} else {
// Otherwise, pop elements from the queue
let mut out = BTreeSet::new();
for _ in 0..max_count {
out.insert(self.new_jobs.pop_first().unwrap());
}
(out, true)
}
}
/// Requeues a single job
///
/// This is used when running in lossy mode, where jobs may be skipped in
/// `io_send`.
pub(crate) fn requeue_one(&mut self, work: JobId) {
self.new_jobs.insert(work);
}
pub(crate) async fn send_message(&mut self, m: Message) {
self.send(ClientRequest::Message(m)).await
}
pub(crate) async fn send(&mut self, m: ClientRequest) {
// Normally, the client task continues running until
// `self.client_task.client_request_tx` is dropped; as such, we should
// always be able to send it a message.
//
// However, during Tokio shutdown, tasks may stop in arbitrary order.
// We log an error but don't panic, because panicking is uncouth.
if let Err(e) = self.client_task.client_request_tx.send(m).await {
error!(
self.log,
"failed to send message: {e};
this should only happen during shutdown"
)
}
}
/// Sets a job state, handling `io_state_count` counters
fn set_job_state(
&mut self,
job: &mut DownstairsIO,
new_state: IOState,
) -> IOState {
self.io_state_count.incr(&new_state);
let old_state = job.state.insert(self.client_id, new_state);
self.io_state_count.decr(&old_state);
old_state
}
/// Mark the given job as in-progress for this client
///
/// Returns an `IOop` with modified dependencies
pub(crate) fn in_progress(
&mut self,
job: &mut DownstairsIO,
repair_min_id: Option<JobId>,
) -> IOop {
let old_state = self.set_job_state(job, IOState::InProgress);
assert_eq!(old_state, IOState::New);
let mut out = job.work.clone();
if self.dependencies_need_cleanup() {
match &mut out {
IOop::Write { dependencies, .. }
| IOop::WriteUnwritten { dependencies, .. }
| IOop::Flush { dependencies, .. }
| IOop::Read { dependencies, .. }
| IOop::ExtentFlushClose { dependencies, .. }
| IOop::ExtentLiveRepair { dependencies, .. }
| IOop::ExtentLiveReopen { dependencies, .. }
| IOop::ExtentLiveNoOp { dependencies } => {
self.remove_dep_if_live_repair(
dependencies,
job.ds_id,
repair_min_id.expect("must have repair_min_id"),
);
}
}
}
// If our downstairs is under repair, then include any extent limit sent
// in the IOop; otherwise, clear it out
if let IOop::Flush { extent_limit, .. } = &mut out {
if !matches!(self.state, DsState::LiveRepair) {
*extent_limit = None;
}
}
out
}
pub(crate) fn replay_job(&mut self, job: &mut DownstairsIO) {
/*
* If the job is InProgress or New, then we can just go back
* to New and no extra work is required.
* If it's Done, then by definition it has been acked; assert that here
* to double-check.
*/
if IOState::Done == job.state[self.client_id] {
assert!(job.acked);
}
let old_state = self.set_job_state(job, IOState::New);
job.replay = true;
if old_state != IOState::New {
self.requeue_one(job.ds_id);
}
}
/// Sets this job as skipped and moves it to `skipped_jobs`
pub(crate) fn skip_job(&mut self, job: &mut DownstairsIO) {
self.set_job_state(job, IOState::Skipped);
self.skipped_jobs.insert(job.ds_id);
}
/// Returns true if it's possible that we need to clean job dependencies
pub(crate) fn dependencies_need_cleanup(&self) -> bool {
matches!(self.state, DsState::LiveRepair)
&& !self.skipped_jobs.is_empty()
}
/// Sets our state to `DsState::Reconcile`
///
/// # Panics
/// If the previous state is not `DsState::WaitQuorum`
pub(crate) fn begin_reconcile(&mut self) {
info!(self.log, "Transition from {} to Reconcile", self.state);
assert_eq!(self.state, DsState::WaitQuorum);
self.state = DsState::Reconcile;
}
/// Go through the list of dependencies and remove any jobs that this
/// downstairs has already skipped, as the downstairs on the other side will
/// not have received these IOs.
///
/// First off, any job that was "skipped" should not be a dependency for
/// this specific downstairs. In addition, any job that happened before
/// the skipped jobs that was marked as "Done" should also be removed, as
/// there will be no replay here and we are basically rebuilding this
/// downstairs from other downstairs.
fn remove_dep_if_live_repair(
&self,
deps: &mut Vec<JobId>,
ds_id: JobId,
repair_min_id: JobId,
) {
debug!(
self.log,
"{} Remove check skipped:{:?} from deps:{:?}",
ds_id,
self.skipped_jobs,
deps
);
assert!(matches!(self.state, DsState::LiveRepair));
deps.retain(|x| !self.skipped_jobs.contains(x));
// If we are repairing, then there must be a repair_min_id set so we
// know where to stop with dependency inclusion.
debug!(
self.log,
"{} Remove check < min repaired:{} from deps:{:?}",
ds_id,
repair_min_id,
deps
);
deps.retain(|x| x >= &repair_min_id);
info!(self.log, " {} final dependency list {:?}", ds_id, deps);
}
/// When the downstairs is marked as missing, handle its state transition
pub(crate) fn on_missing(&mut self) {
let current = &self.state;
let new_state = match current {
DsState::Active | DsState::Replay | DsState::Offline => {
DsState::Offline
}
DsState::Faulted
| DsState::LiveRepair
| DsState::LiveRepairReady
| DsState::Replaced => DsState::Faulted,
DsState::New
| DsState::Deactivated
| DsState::Reconcile
| DsState::FailedReconcile
| DsState::Disconnected
| DsState::BadVersion
| DsState::WaitQuorum
| DsState::BadRegion
| DsState::WaitActive
| DsState::Disabled => DsState::Disconnected,
DsState::Replacing => DsState::Replaced,
DsState::Migrating => panic!(),
};
if *current != new_state {
info!(
self.log,
"Gone missing, transition from {current:?} to {new_state:?}"
);
} else {
info!(self.log, "Still missing, state is {current:?}");
}
// Jobs are skipped and replayed in `Downstairs::reinitialize`, which is
// (probably) the caller of this function.
self.state = new_state;
}
/// Checks whether this Downstairs is ready for the upstairs to deactivate
///
/// # Panics
/// If the downstairs is offline
pub(crate) fn ready_to_deactivate(&self) -> bool {
match &self.state {
DsState::New | DsState::WaitActive => {
info!(
self.log,
"ready to deactivate from state {:?}", self.state
);
true
}
DsState::Offline => {
panic!("can't deactivate while a downstairs is offline")
}
s => {
info!(self.log, "not ready to deactivate due to state {s:?}");
false
}
}
}
/// Switches the client state to Deactivated and stops the IO task
pub(crate) fn deactivate(&mut self, up_state: &UpstairsState) {
self.checked_state_transition(up_state, DsState::Deactivated);
self.halt_io_task(ClientStopReason::Deactivated)
}
/// Resets this Downstairs and start a fresh connection
///
/// # Panics
/// If `self.client_task` is not `None`, or `self.target_addr` is `None`
pub(crate) fn reinitialize(&mut self, auto_promote: bool) {
// Clear this Downstair's repair address, and let the YesItsMe set it.
// This works if this Downstairs is new, reconnecting, or was replaced
// entirely; the repair address could have changed in any of these
// cases.
self.repair_addr = None;
if auto_promote {
self.promote_state = Some(PromoteState::Waiting);
} else {
self.promote_state = None;
}
self.negotiation_state = NegotiationState::Start;
// TODO this is an awkward special case!
if self.state == DsState::Disconnected {
info!(self.log, "Disconnected -> New");
self.state = DsState::New;
}
// Restart with a short delay
self.start_task(true, auto_promote);
}
/// Returns the last flush ID handled by this client
pub(crate) fn last_flush(&self) -> JobId {
self.last_flush
}
/// Starts a client IO task, saving the handle in `self.client_task`
///
/// If we are running unit tests and `self.target_addr` is not populated, we
/// start a dummy task instead.
///
/// # Panics
/// If `self.client_task` is not `None`, or `self.target_addr` is `None` and
/// this isn't running in test mode
fn start_task(&mut self, delay: bool, connect: bool) {
self.client_task = Self::new_io_task(
self.target_addr,
delay,
connect,
self.client_id,
self.tls_context.clone(),
&self.log,
);
}
fn new_io_task(
target: Option<SocketAddr>,
delay: bool,
connect: bool,
client_id: ClientId,
tls_context: Option<Arc<TLSContext>>,
log: &Logger,
) -> ClientTaskHandle {
#[cfg(test)]
if let Some(target) = target {
Self::new_network_task(
target,
delay,
connect,
client_id,
tls_context,
log,
)
} else {
Self::new_dummy_task(connect)
}
#[cfg(not(test))]
Self::new_network_task(
target.expect("must provide socketaddr"),
delay,
connect,
client_id,
tls_context,
log,
)
}
fn new_network_task(
target: SocketAddr,
delay: bool,
connect: bool,
client_id: ClientId,
tls_context: Option<Arc<TLSContext>>,
log: &Logger,
) -> ClientTaskHandle {
// These channels must support at least MAX_ACTIVE_COUNT messages;
// otherwise, we risk a deadlock if the IO task and main task
// simultaneously try sending each other data when the channels are
// full.
let (client_request_tx, client_request_rx) =
mpsc::channel(MAX_ACTIVE_COUNT * 2);
let (client_response_tx, client_response_rx) =
mpsc::channel(MAX_ACTIVE_COUNT * 2);
let (client_stop_tx, client_stop_rx) = oneshot::channel();
let (client_connect_tx, client_connect_rx) = oneshot::channel();
let client_connect_tx = if connect {
client_connect_tx.send(()).unwrap();
None
} else {
Some(client_connect_tx)
};
let log = log.new(o!("" => "io task"));
tokio::spawn(async move {
let mut c = ClientIoTask {
client_id,
tls_context,
target,
request_rx: client_request_rx,
response_tx: client_response_tx,
start: client_connect_rx,
stop: client_stop_rx,
recv_task: ClientRxTask {
handle: None,
log: log.clone(),
},
delay,
log,
};
c.run().await
});
ClientTaskHandle {
client_request_tx,
client_connect_tx,
client_stop_tx: Some(client_stop_tx),
client_response_rx,
}
}
/// Starts a dummy IO task, returning its IO handle
#[cfg(test)]
fn new_dummy_task(connect: bool) -> ClientTaskHandle {
let (client_request_tx, client_request_rx) =
mpsc::channel(MAX_ACTIVE_COUNT * 2);
let (_client_response_tx, client_response_rx) =
mpsc::channel(MAX_ACTIVE_COUNT * 2);
let (client_stop_tx, client_stop_rx) = oneshot::channel();
let (client_connect_tx, client_connect_rx) = oneshot::channel();
// Forget these without dropping them, so that we can send values into
// the void!
std::mem::forget(client_request_rx);
std::mem::forget(client_stop_rx);
std::mem::forget(client_connect_rx);
ClientTaskHandle {
client_request_tx,
client_connect_tx: if connect {
None
} else {
Some(client_connect_tx)
},
client_stop_tx: Some(client_stop_tx),
client_response_rx,
}
}
/// Indicate that the upstairs has requested that we go active
///
/// This either sent a `PromoteToActive` request directly, or schedules it
/// to be sent once the client state reaches `WaitActive`.
///
/// # Panics
/// If we already called this function (without `reinitialize` in between),
/// or `self.state` is invalid for promotion.
pub(crate) async fn set_active_request(&mut self) {
if let Some(t) = self.client_task.client_connect_tx.take() {
info!(self.log, "sending connect oneshot to client");
if let Err(e) = t.send(()) {
error!(
self.log,
"failed to set client as active {e:?};
are we shutting down?"
);
}
}
match self.promote_state {
Some(PromoteState::Waiting) => {
panic!("called set_active_request while already waiting")
}
Some(PromoteState::Sent) => {
panic!("called set_active_request after it was sent")
}
None => (),
}
// If we're already in the point of negotiation where we're waiting to
// go active, then immediately go active!
match self.state {
DsState::New => {
info!(
self.log,
"client set_active_request while in {:?}; waiting...",
self.state,
);
self.promote_state = Some(PromoteState::Waiting);
}
DsState::WaitActive => {
info!(
self.log,
"client set_active_request while in WaitActive \
-> WaitForPromote"
);
// If the client task has stopped, then print a warning but
// otherwise continue (because we'll be cleaned up by the
// JoinHandle watcher).
self.send_message(Message::PromoteToActive {
upstairs_id: self.cfg.upstairs_id,
session_id: self.cfg.session_id,
gen: self.cfg.generation(),
})
.await;
self.promote_state = Some(PromoteState::Sent);
// TODO: negotiation / promotion state is spread across
// DsState, PromoteState, and NegotiationState. We should
// consolidate into a single place
assert!(
self.negotiation_state == NegotiationState::Start
|| self.negotiation_state
== NegotiationState::WaitForPromote
);
self.negotiation_state = NegotiationState::WaitForPromote;
}
s => panic!("invalid state for set_active_request: {s:?}"),
}
}
/// Accessor method for client connection state
pub(crate) fn state(&self) -> DsState {
self.state
}
/// Sets the current state to `DsState::FailedReconcile`
pub(crate) fn set_failed_reconcile(&mut self, up_state: &UpstairsState) {
info!(
self.log,
"Transition from {} to FailedReconcile", self.state
);
self.checked_state_transition(up_state, DsState::FailedReconcile);
self.restart_connection(up_state, ClientStopReason::FailedReconcile)
}
pub(crate) fn restart_connection(
&mut self,
up_state: &UpstairsState,
reason: ClientStopReason,
) {
let new_state = match self.state {
DsState::Active => DsState::Offline,
DsState::Replay => DsState::Offline,
DsState::Offline => DsState::Offline,
DsState::Migrating => DsState::Faulted,
DsState::Faulted => DsState::Faulted,
DsState::Deactivated => DsState::New,
DsState::Reconcile => DsState::New,
DsState::FailedReconcile => DsState::New,
DsState::LiveRepair => DsState::Faulted,
DsState::LiveRepairReady => DsState::Faulted,
DsState::Replacing => DsState::Replaced,
_ => {
/*
* Any other state means we had not yet enabled this
* downstairs to receive IO, so we go to the back of the
* line and have to re-verify it again.
*/
DsState::Disconnected
}
};
info!(
self.log,
"restarting connection, transition from {} to {}",
self.state,
new_state,
);
self.checked_state_transition(up_state, new_state);
self.halt_io_task(reason);
}
/// Sets the current state to `DsState::Active`
pub(crate) fn set_active(&mut self) {
info!(self.log, "Transition from {} to Active", self.state);
self.state = DsState::Active;
}
pub(crate) fn enqueue(
&mut self,
io: &mut DownstairsIO,
last_repair_extent: Option<u64>,
) -> IOState {
assert_eq!(io.state[self.client_id], IOState::New);
// If a downstairs is faulted or ready for repair, we can move
// that job directly to IOState::Skipped
// If a downstairs is in repair, then we need to see if this
// IO is on a repaired extent or not. If an IO spans extents
// where some are repaired and some are not, then this IO had
// better have the dependencies already set to reflect the
// requirement that a repair IO will need to finish first.
let r = match self.state {
DsState::Faulted
| DsState::Replaced
| DsState::Replacing
| DsState::LiveRepairReady => {
io.state.insert(self.client_id, IOState::Skipped);
self.skipped_jobs.insert(io.ds_id);
IOState::Skipped
}
DsState::LiveRepair => {
// Pick the latest repair limit that's relevant for this
// downstairs. This is either the extent under repair (if
// there are no reserved repair jobs), or the last extent
// for which we have reserved a repair job ID; either way, the
// caller has provided it to us.
if io.work.send_io_live_repair(last_repair_extent) {
// Leave this IO as New, the downstairs will receive it.
self.new_jobs.insert(io.ds_id);
IOState::New
} else {
// Move this IO to skipped, we are not ready for
// the downstairs to receive it.
io.state.insert(self.client_id, IOState::Skipped);
self.skipped_jobs.insert(io.ds_id);
IOState::Skipped
}
}
_ => {
self.new_jobs.insert(io.ds_id);
IOState::New
}
};
self.io_state_count.incr(&r);
r
}
/// Prepares for a new connection, then restarts the IO task
pub(crate) fn replace(
&mut self,
up_state: &UpstairsState,
new: SocketAddr,
) {
self.target_addr = Some(new);
self.region_metadata = None;
self.checked_state_transition(up_state, DsState::Replacing);
self.stats.replaced += 1;
self.halt_io_task(ClientStopReason::Replacing);
}
/// Sets `self.state` to `new_state`, with logging and validity checking
///
/// Conceptually, this function is a checked assignment to `self.state`.
/// Thinking in terms of the graph of all possible states, this function
/// will panic if there is not a valid state transition edge between the
/// current `self.state` and the requested `new_state`.
///
/// For example, transitioning to a `new_state` of [DsState::Replacing] is
/// *always* possible, so this will never panic for that state transition.
/// On the other hand, [DsState::Replaced] can *only* follow
/// [DsState::Replacing], so if the current state is *anything else*, that
/// indicates a logic error happened in some other part of the code.
///
/// If the state transition is valid, this function simply sets `self.state`
/// to the newly requested state. There's no magic here beyond that; this
/// function does not change anything about the state or any other internal
/// variables.
///
/// # Panics
/// If the transition is not valid
pub(crate) fn checked_state_transition(
&mut self,
up_state: &UpstairsState,
new_state: DsState,
) {
// TODO this should probably be private!
info!(self.log, "ds_transition from {} to {new_state}", self.state);
let old_state = self.state;
/*
* Check that this is a valid transition
*/
let panic_invalid = || {
panic!(
"[{}] {} Invalid transition: {:?} -> {:?}",
self.client_id, self.cfg.upstairs_id, old_state, new_state
)
};
match new_state {
DsState::Replacing => {
// A downstairs can be replaced at any time.
}
DsState::Replaced => {
assert_eq!(old_state, DsState::Replacing);
}
DsState::WaitActive => {
if old_state == DsState::Offline {
if matches!(up_state, UpstairsState::Active) {
panic!(
"[{}] {} Bad up active state change {} -> {}",
self.client_id,
self.cfg.upstairs_id,
old_state,
new_state,
);
}
} else if old_state != DsState::New