@@ -13,6 +13,7 @@ use dropshot::{
13
13
} ;
14
14
use futures:: SinkExt ;
15
15
use slog:: { error, o, Logger } ;
16
+ use std:: collections:: BTreeMap ;
16
17
use thiserror:: Error ;
17
18
use tokio:: sync:: { watch, Mutex } ;
18
19
use tokio_tungstenite:: tungstenite:: protocol:: { Role , WebSocketConfig } ;
@@ -28,37 +29,53 @@ pub enum Error {
28
29
TransitionSendFail ,
29
30
#[ error( "Cannot request any new mock instance state once it is stopped/destroyed/failed" ) ]
30
31
TerminalState ,
32
+ #[ error( "Cannot transition to {requested:?} from {current:?}" ) ]
33
+ InvalidTransition {
34
+ current : api:: InstanceState ,
35
+ requested : api:: InstanceStateRequested ,
36
+ } ,
31
37
}
32
38
33
39
/// simulated instance properties
34
40
pub struct InstanceContext {
35
- pub state : api:: InstanceState ,
36
- pub generation : u64 ,
41
+ /// The instance's current generation last observed by the
42
+ /// `instance-state-monitor` endpoint.
43
+ curr_gen : u64 ,
44
+ /// The next generation to use when inserting new state(s) into the queue.
45
+ next_queue_gen : u64 ,
37
46
pub properties : api:: InstanceProperties ,
38
47
serial : Arc < serial:: Serial > ,
39
48
serial_task : serial:: SerialTask ,
40
- state_watcher_rx : watch:: Receiver < api:: InstanceStateMonitorResponse > ,
41
- state_watcher_tx : watch:: Sender < api:: InstanceStateMonitorResponse > ,
49
+ state_watcher_rx :
50
+ watch:: Receiver < BTreeMap < u64 , api:: InstanceStateMonitorResponse > > ,
51
+ state_watcher_tx :
52
+ watch:: Sender < BTreeMap < u64 , api:: InstanceStateMonitorResponse > > ,
42
53
}
43
54
44
55
impl InstanceContext {
45
56
pub fn new ( properties : api:: InstanceProperties , _log : & Logger ) -> Self {
46
- let ( state_watcher_tx, state_watcher_rx) =
47
- watch:: channel ( api:: InstanceStateMonitorResponse {
48
- gen : 0 ,
49
- state : api:: InstanceState :: Creating ,
50
- migration : api:: InstanceMigrateStatusResponse {
51
- migration_in : None ,
52
- migration_out : None ,
57
+ let ( state_watcher_tx, state_watcher_rx) = {
58
+ let mut states = BTreeMap :: new ( ) ;
59
+ states. insert (
60
+ 0 ,
61
+ api:: InstanceStateMonitorResponse {
62
+ gen : 0 ,
63
+ state : api:: InstanceState :: Creating ,
64
+ migration : api:: InstanceMigrateStatusResponse {
65
+ migration_in : None ,
66
+ migration_out : None ,
67
+ } ,
53
68
} ,
54
- } ) ;
69
+ ) ;
70
+ watch:: channel ( states)
71
+ } ;
55
72
let serial = serial:: Serial :: new ( & properties. name ) ;
56
73
57
74
let serial_task = serial:: SerialTask :: spawn ( ) ;
58
75
59
76
Self {
60
- state : api :: InstanceState :: Creating ,
61
- generation : 0 ,
77
+ curr_gen : 0 ,
78
+ next_queue_gen : 1 ,
62
79
properties,
63
80
serial,
64
81
serial_task,
@@ -72,46 +89,110 @@ impl InstanceContext {
72
89
/// Returns an error if the state transition is invalid.
73
90
pub async fn set_target_state (
74
91
& mut self ,
92
+ log : & Logger ,
75
93
target : api:: InstanceStateRequested ,
76
94
) -> Result < ( ) , Error > {
77
- match self . state {
78
- api:: InstanceState :: Stopped
79
- | api:: InstanceState :: Destroyed
80
- | api:: InstanceState :: Failed => {
95
+ match ( self . current_state ( ) , target) {
96
+ (
97
+ api:: InstanceState :: Stopped
98
+ | api:: InstanceState :: Destroyed
99
+ | api:: InstanceState :: Failed ,
100
+ _,
101
+ ) => {
81
102
// Cannot request any state once the target is halt/destroy
82
103
Err ( Error :: TerminalState )
83
104
}
84
- api:: InstanceState :: Rebooting
85
- if matches ! ( target, api:: InstanceStateRequested :: Run ) =>
86
- {
105
+ (
106
+ api:: InstanceState :: Rebooting ,
107
+ api:: InstanceStateRequested :: Run ,
108
+ ) => {
87
109
// Requesting a run when already on the road to reboot is an
88
110
// immediate success.
89
111
Ok ( ( ) )
90
112
}
91
- _ => match target {
92
- api:: InstanceStateRequested :: Run
93
- | api:: InstanceStateRequested :: Reboot => {
94
- self . generation += 1 ;
95
- self . state = api:: InstanceState :: Running ;
96
- self . state_watcher_tx
97
- . send ( api:: InstanceStateMonitorResponse {
98
- gen : self . generation ,
99
- state : self . state ,
100
- migration : api:: InstanceMigrateStatusResponse {
101
- migration_in : None ,
102
- migration_out : None ,
103
- } ,
104
- } )
105
- . map_err ( |_| Error :: TransitionSendFail )
106
- }
107
- api:: InstanceStateRequested :: Stop => {
108
- self . state = api:: InstanceState :: Stopped ;
109
- self . serial_task . shutdown ( ) . await ;
110
- Ok ( ( ) )
111
- }
112
- } ,
113
+ ( api:: InstanceState :: Running , api:: InstanceStateRequested :: Run ) => {
114
+ Ok ( ( ) )
115
+ }
116
+ (
117
+ api:: InstanceState :: Running ,
118
+ api:: InstanceStateRequested :: Reboot ,
119
+ ) => {
120
+ self . queue_states (
121
+ log,
122
+ & [
123
+ api:: InstanceState :: Rebooting ,
124
+ api:: InstanceState :: Running ,
125
+ ] ,
126
+ )
127
+ . await ;
128
+ Ok ( ( ) )
129
+ }
130
+ ( current, api:: InstanceStateRequested :: Reboot ) => {
131
+ Err ( Error :: InvalidTransition {
132
+ current,
133
+ requested : api:: InstanceStateRequested :: Reboot ,
134
+ } )
135
+ }
136
+ ( _, api:: InstanceStateRequested :: Run ) => {
137
+ self . queue_states ( log, & [ api:: InstanceState :: Running ] ) . await ;
138
+ Ok ( ( ) )
139
+ }
140
+ (
141
+ api:: InstanceState :: Stopping ,
142
+ api:: InstanceStateRequested :: Stop ,
143
+ ) => Ok ( ( ) ) ,
144
+ ( _, api:: InstanceStateRequested :: Stop ) => {
145
+ self . queue_states (
146
+ log,
147
+ & [
148
+ api:: InstanceState :: Stopping ,
149
+ api:: InstanceState :: Stopped ,
150
+ ] ,
151
+ )
152
+ . await ;
153
+ self . serial_task . shutdown ( ) . await ;
154
+ Ok ( ( ) )
155
+ }
113
156
}
114
157
}
158
+
159
+ fn current_state ( & self ) -> api:: InstanceState {
160
+ self . state_watcher_rx
161
+ . borrow ( )
162
+ . get ( & self . curr_gen )
163
+ . expect ( "current generation must be in the queue, this is weird 'n' bad" )
164
+ . state
165
+ }
166
+
167
+ async fn queue_states (
168
+ & mut self ,
169
+ log : & Logger ,
170
+ states : & [ api:: InstanceState ] ,
171
+ ) {
172
+ self . state_watcher_tx . send_modify ( |queue| {
173
+ for & state in states {
174
+ let generation = self . next_queue_gen ;
175
+ self . next_queue_gen += 1 ;
176
+ queue. insert (
177
+ generation,
178
+ api:: InstanceStateMonitorResponse {
179
+ gen : generation,
180
+ migration : api:: InstanceMigrateStatusResponse {
181
+ migration_in : None ,
182
+ migration_out : None ,
183
+ } ,
184
+ state,
185
+ } ,
186
+ ) ;
187
+ slog:: info!(
188
+ log,
189
+ "queued instance state transition" ;
190
+ "state" => ?state,
191
+ "gen" => ?generation,
192
+ ) ;
193
+ }
194
+ } )
195
+ }
115
196
}
116
197
117
198
/// Contextual information accessible from mock HTTP callbacks.
@@ -169,7 +250,7 @@ async fn instance_get(
169
250
} ) ?;
170
251
let instance_info = api:: Instance {
171
252
properties : instance. properties . clone ( ) ,
172
- state : instance. state ,
253
+ state : instance. current_state ( ) ,
173
254
} ;
174
255
Ok ( HttpResponseOk ( api:: InstanceGetResponse { instance : instance_info } ) )
175
256
}
@@ -195,18 +276,23 @@ async fn instance_state_monitor(
195
276
} ;
196
277
197
278
loop {
198
- let last = state_watcher. borrow ( ) . clone ( ) ;
199
- if gen <= last. gen {
200
- let response = api:: InstanceStateMonitorResponse {
201
- gen : last. gen ,
202
- state : last. state ,
203
- migration : api:: InstanceMigrateStatusResponse {
204
- migration_in : None ,
205
- migration_out : None ,
206
- } ,
207
- } ;
208
- return Ok ( HttpResponseOk ( response) ) ;
279
+ let next_gen = gen + 1 ;
280
+ let state = state_watcher. borrow ( ) . get ( & next_gen) . cloned ( ) ;
281
+ if let Some ( state) = state {
282
+ // Advance to the state with the generation we showed to the
283
+ // watcher, for use in `instance_get` and when determining what
284
+ // state transitions are valid.
285
+ rqctx
286
+ . context ( )
287
+ . instance
288
+ . lock ( )
289
+ . await
290
+ . as_mut ( )
291
+ . expect ( "if we didn't have an instance, we shouldn't have gotten here" )
292
+ . curr_gen = next_gen;
293
+ return Ok ( HttpResponseOk ( state. clone ( ) ) ) ;
209
294
}
295
+
210
296
state_watcher. changed ( ) . await . unwrap ( ) ;
211
297
}
212
298
}
@@ -226,9 +312,14 @@ async fn instance_state_put(
226
312
)
227
313
} ) ?;
228
314
let requested_state = request. into_inner ( ) ;
229
- instance. set_target_state ( requested_state) . await . map_err ( |err| {
230
- HttpError :: for_internal_error ( format ! ( "Failed to transition: {}" , err) )
231
- } ) ?;
315
+ instance. set_target_state ( & rqctx. log , requested_state) . await . map_err (
316
+ |err| {
317
+ HttpError :: for_internal_error ( format ! (
318
+ "Failed to transition: {}" ,
319
+ err
320
+ ) )
321
+ } ,
322
+ ) ?;
232
323
Ok ( HttpResponseUpdatedNoContent { } )
233
324
}
234
325
@@ -255,11 +346,15 @@ async fn instance_serial(
255
346
ws_stream. send ( Message :: Close ( None ) ) . await ?;
256
347
Err ( "Instance not yet created!" . into ( ) )
257
348
}
258
- Some ( InstanceContext { state , .. } )
259
- if * state != api:: InstanceState :: Running =>
349
+ Some ( instance_ctx )
350
+ if instance_ctx . current_state ( ) != api:: InstanceState :: Running =>
260
351
{
261
352
ws_stream. send ( Message :: Close ( None ) ) . await ?;
262
- Err ( format ! ( "Instance isn't Running! ({:?})" , state) . into ( ) )
353
+ Err ( format ! (
354
+ "Instance isn't Running! ({:?})" ,
355
+ instance_ctx. current_state( )
356
+ )
357
+ . into ( ) )
263
358
}
264
359
Some ( instance_ctx) => {
265
360
let serial = instance_ctx. serial . clone ( ) ;
0 commit comments