@@ -18,6 +18,12 @@ use nexus_client::types::{
18
18
} ;
19
19
use omicron_uuid_kinds:: { GenericUuid , TypedUuid } ;
20
20
21
+ #[ derive( Debug ) ]
22
+ pub ( crate ) enum NotifyQos {
23
+ High ,
24
+ Low ,
25
+ }
26
+
21
27
#[ derive( Debug ) ]
22
28
pub ( crate ) enum NotifyRequest {
23
29
ClientTaskStopped {
@@ -65,42 +71,111 @@ pub(crate) enum NotifyRequest {
65
71
} ,
66
72
}
67
73
74
+ impl NotifyRequest {
75
+ pub ( crate ) fn qos ( & self ) -> NotifyQos {
76
+ match & self {
77
+ NotifyRequest :: LiveRepairStart { .. }
78
+ | NotifyRequest :: LiveRepairFinish { .. }
79
+ | NotifyRequest :: ReconcileStart { .. }
80
+ | NotifyRequest :: ReconcileFinish { .. } => NotifyQos :: High ,
81
+
82
+ NotifyRequest :: ClientTaskStopped { .. }
83
+ | NotifyRequest :: LiveRepairProgress { .. }
84
+ | NotifyRequest :: ReconcileProgress { .. } => NotifyQos :: Low ,
85
+ }
86
+ }
87
+ }
88
+
68
89
pub ( crate ) struct NotifyQueue {
69
- tx : mpsc:: Sender < ( DateTime < Utc > , NotifyRequest ) > ,
90
+ tx_high : mpsc:: Sender < ( DateTime < Utc > , NotifyRequest ) > ,
91
+ tx_low : mpsc:: Sender < ( DateTime < Utc > , NotifyRequest ) > ,
70
92
log : Logger ,
71
93
}
72
94
73
95
impl NotifyQueue {
74
96
/// Insert a time-stamped request into the queue
75
97
pub fn send ( & self , r : NotifyRequest ) {
76
98
let now = Utc :: now ( ) ;
77
- if let Err ( r) = self . tx . try_send ( ( now, r) ) {
78
- warn ! ( self . log, "could not send notify {r:?}; queue is full" ) ;
99
+ let qos = r. qos ( ) ;
100
+ let queue = match & qos {
101
+ NotifyQos :: High => & self . tx_high ,
102
+ NotifyQos :: Low => & self . tx_low ,
103
+ } ;
104
+
105
+ if let Err ( e) = queue. try_send ( ( now, r) ) {
106
+ warn ! ( self . log, "could not send {qos:?} notify: {e}" , ) ;
79
107
}
80
108
}
81
109
}
82
110
83
111
pub ( crate ) fn spawn_notify_task ( addr : Ipv6Addr , log : & Logger ) -> NotifyQueue {
84
- let ( tx, rx) = mpsc:: channel ( 128 ) ;
112
+ let ( tx_high, rx_high) = mpsc:: channel ( 128 ) ;
113
+ let ( tx_low, rx_low) = mpsc:: channel ( 128 ) ;
85
114
let task_log = log. new ( slog:: o!( "job" => "notify" ) ) ;
86
- tokio:: spawn ( async move { notify_task_nexus ( addr, rx, task_log) . await } ) ;
115
+
116
+ tokio:: spawn ( async move {
117
+ notify_task_nexus ( addr, rx_high, rx_low, task_log) . await
118
+ } ) ;
119
+
87
120
NotifyQueue {
88
- tx,
121
+ tx_high,
122
+ tx_low,
89
123
log : log. new ( o ! ( "job" => "notify_queue" ) ) ,
90
124
}
91
125
}
92
126
127
+ struct Notification {
128
+ message : ( DateTime < Utc > , NotifyRequest ) ,
129
+ qos : NotifyQos ,
130
+ retries : usize ,
131
+ }
132
+
93
133
async fn notify_task_nexus (
94
134
addr : Ipv6Addr ,
95
- mut rx : mpsc:: Receiver < ( DateTime < Utc > , NotifyRequest ) > ,
135
+ mut rx_high : mpsc:: Receiver < ( DateTime < Utc > , NotifyRequest ) > ,
136
+ mut rx_low : mpsc:: Receiver < ( DateTime < Utc > , NotifyRequest ) > ,
96
137
log : Logger ,
97
138
) {
139
+ info ! ( log, "notify_task started" ) ;
140
+
141
+ // Store high QoS messages if they can't be sent
142
+ let mut stored_notification: Option < Notification > = None ;
143
+
98
144
let reqwest_client = reqwest:: ClientBuilder :: new ( )
99
145
. connect_timeout ( std:: time:: Duration :: from_secs ( 15 ) )
100
146
. timeout ( std:: time:: Duration :: from_secs ( 15 ) )
101
147
. build ( )
102
148
. unwrap ( ) ;
103
- while let Some ( ( time, m) ) = rx. recv ( ) . await {
149
+
150
+ loop {
151
+ let r = tokio:: select! {
152
+ biased;
153
+
154
+ Some ( n) = async { stored_notification. take( ) } => Some ( n) ,
155
+
156
+ i = rx_high. recv( ) => i. map( |message| Notification {
157
+ message,
158
+ qos: NotifyQos :: High ,
159
+ retries: 0 ,
160
+ } ) ,
161
+
162
+ i = rx_low. recv( ) => i. map( |message| Notification {
163
+ message,
164
+ qos: NotifyQos :: Low ,
165
+ retries: 0 ,
166
+ } ) ,
167
+ } ;
168
+
169
+ let Some ( Notification {
170
+ message : ( time, m) ,
171
+ qos,
172
+ retries,
173
+ } ) = r
174
+ else {
175
+ error ! ( log, "one of the notify channels was closed!" ) ;
176
+ break ;
177
+ } ;
178
+
104
179
debug ! ( log, "notify {m:?}" ) ;
105
180
let client = reqwest_client. clone ( ) ;
106
181
let Some ( nexus_client) = get_nexus_client ( & log, client, addr) . await
@@ -114,21 +189,23 @@ async fn notify_task_nexus(
114
189
) ;
115
190
continue ;
116
191
} ;
117
- let ( r, s) = match m {
192
+
193
+ let ( r, s) = match & m {
118
194
NotifyRequest :: ClientTaskStopped {
119
195
upstairs_id,
120
196
downstairs_id,
121
197
reason,
122
198
} => {
123
- let upstairs_id = TypedUuid :: from_untyped_uuid ( upstairs_id) ;
124
- let downstairs_id = TypedUuid :: from_untyped_uuid ( downstairs_id) ;
199
+ let upstairs_id = TypedUuid :: from_untyped_uuid ( * upstairs_id) ;
200
+ let downstairs_id =
201
+ TypedUuid :: from_untyped_uuid ( * downstairs_id) ;
125
202
let reason = match reason {
126
203
ClientRunResult :: ConnectionTimeout => {
127
204
DownstairsClientStoppedReason :: ConnectionTimeout
128
205
}
129
206
ClientRunResult :: ConnectionFailed ( _) => {
130
- // skip this notification, it's too noisy during connection
131
- // retries
207
+ // skip this notification, it's too noisy during
208
+ // connection retries
132
209
//DownstairsClientStoppedReason::ConnectionFailed
133
210
continue ;
134
211
}
@@ -159,16 +236,13 @@ async fn notify_task_nexus(
159
236
} ;
160
237
161
238
(
162
- omicron_common:: retry_until_known_result ( & log, || async {
163
- nexus_client
164
- . cpapi_downstairs_client_stopped (
165
- & upstairs_id,
166
- & downstairs_id,
167
- & DownstairsClientStopped { time, reason } ,
168
- )
169
- . await
170
- } )
171
- . await ,
239
+ nexus_client
240
+ . cpapi_downstairs_client_stopped (
241
+ & upstairs_id,
242
+ & downstairs_id,
243
+ & DownstairsClientStopped { time, reason } ,
244
+ )
245
+ . await ,
172
246
"client stopped" ,
173
247
)
174
248
}
@@ -184,7 +258,7 @@ async fn notify_task_nexus(
184
258
session_id,
185
259
ref repairs,
186
260
} => {
187
- let upstairs_id = TypedUuid :: from_untyped_uuid ( upstairs_id) ;
261
+ let upstairs_id = TypedUuid :: from_untyped_uuid ( * upstairs_id) ;
188
262
let ( description, repair_type) =
189
263
if matches ! ( m, NotifyRequest :: LiveRepairStart { .. } ) {
190
264
( "live repair start" , UpstairsRepairType :: Live )
@@ -193,9 +267,9 @@ async fn notify_task_nexus(
193
267
} ;
194
268
let info = RepairStartInfo {
195
269
time,
196
- repair_id : TypedUuid :: from_untyped_uuid ( repair_id) ,
270
+ repair_id : TypedUuid :: from_untyped_uuid ( * repair_id) ,
197
271
repair_type,
198
- session_id : TypedUuid :: from_untyped_uuid ( session_id) ,
272
+ session_id : TypedUuid :: from_untyped_uuid ( * session_id) ,
199
273
repairs : repairs
200
274
. iter ( )
201
275
. map ( |( region_uuid, target_addr) | {
@@ -208,12 +282,9 @@ async fn notify_task_nexus(
208
282
} ;
209
283
210
284
(
211
- omicron_common:: retry_until_known_result ( & log, || async {
212
- nexus_client
213
- . cpapi_upstairs_repair_start ( & upstairs_id, & info)
214
- . await
215
- } )
216
- . await ,
285
+ nexus_client
286
+ . cpapi_upstairs_repair_start ( & upstairs_id, & info)
287
+ . await ,
217
288
description,
218
289
)
219
290
}
@@ -229,8 +300,8 @@ async fn notify_task_nexus(
229
300
current_item,
230
301
total_items,
231
302
} => {
232
- let upstairs_id = TypedUuid :: from_untyped_uuid ( upstairs_id) ;
233
- let repair_id = TypedUuid :: from_untyped_uuid ( repair_id) ;
303
+ let upstairs_id = TypedUuid :: from_untyped_uuid ( * upstairs_id) ;
304
+ let repair_id = TypedUuid :: from_untyped_uuid ( * repair_id) ;
234
305
let description =
235
306
if matches ! ( m, NotifyRequest :: LiveRepairProgress { .. } ) {
236
307
"live repair progress"
@@ -239,20 +310,17 @@ async fn notify_task_nexus(
239
310
} ;
240
311
241
312
(
242
- omicron_common:: retry_until_known_result ( & log, || async {
243
- nexus_client
244
- . cpapi_upstairs_repair_progress (
245
- & upstairs_id,
246
- & repair_id,
247
- & RepairProgress {
248
- current_item,
249
- total_items,
250
- time,
251
- } ,
252
- )
253
- . await
254
- } )
255
- . await ,
313
+ nexus_client
314
+ . cpapi_upstairs_repair_progress (
315
+ & upstairs_id,
316
+ & repair_id,
317
+ & RepairProgress {
318
+ current_item : * current_item,
319
+ total_items : * total_items,
320
+ time,
321
+ } ,
322
+ )
323
+ . await ,
256
324
description,
257
325
)
258
326
}
@@ -270,7 +338,7 @@ async fn notify_task_nexus(
270
338
aborted,
271
339
ref repairs,
272
340
} => {
273
- let upstairs_id = TypedUuid :: from_untyped_uuid ( upstairs_id) ;
341
+ let upstairs_id = TypedUuid :: from_untyped_uuid ( * upstairs_id) ;
274
342
let ( description, repair_type) =
275
343
if matches ! ( m, NotifyRequest :: LiveRepairFinish { .. } ) {
276
344
( "live repair finish" , UpstairsRepairType :: Live )
@@ -279,9 +347,9 @@ async fn notify_task_nexus(
279
347
} ;
280
348
let info = RepairFinishInfo {
281
349
time,
282
- repair_id : TypedUuid :: from_untyped_uuid ( repair_id) ,
350
+ repair_id : TypedUuid :: from_untyped_uuid ( * repair_id) ,
283
351
repair_type,
284
- session_id : TypedUuid :: from_untyped_uuid ( session_id) ,
352
+ session_id : TypedUuid :: from_untyped_uuid ( * session_id) ,
285
353
repairs : repairs
286
354
. iter ( )
287
355
. map ( |( region_uuid, target_addr) | {
@@ -291,30 +359,49 @@ async fn notify_task_nexus(
291
359
}
292
360
} )
293
361
. collect ( ) ,
294
- aborted,
362
+ aborted : * aborted ,
295
363
} ;
296
364
297
365
(
298
- omicron_common:: retry_until_known_result ( & log, || async {
299
- nexus_client
300
- . cpapi_upstairs_repair_finish ( & upstairs_id, & info)
301
- . await
302
- } )
303
- . await ,
366
+ nexus_client
367
+ . cpapi_upstairs_repair_finish ( & upstairs_id, & info)
368
+ . await ,
304
369
description,
305
370
)
306
371
}
307
372
} ;
373
+
308
374
match r {
309
375
Ok ( _) => {
310
376
info ! ( log, "notified Nexus of {s}" ) ;
311
377
}
312
378
313
379
Err ( e) => {
314
380
error ! ( log, "failed to notify Nexus of {s}: {e}" ) ;
381
+
382
+ // If there's a problem notifying Nexus, it could be due to
383
+ // Nexus being gone before the DNS was updated. If this is the
384
+ // case, then retrying should eventually pick a different Nexus
385
+ // and succeed. Store high priority messages so they can be
386
+ // resent.
387
+ if matches ! ( qos, NotifyQos :: High ) {
388
+ // If we've retried too many times, then drop this message.
389
+ // Unfortunately if this is true then other notifications
390
+ // will also likely fail.
391
+ if retries > 3 {
392
+ warn ! ( log, "retries > 3, dropping {m:?}" ) ;
393
+ } else {
394
+ stored_notification = Some ( Notification {
395
+ message : ( time, m) ,
396
+ qos,
397
+ retries : retries + 1 ,
398
+ } ) ;
399
+ }
400
+ }
315
401
}
316
402
}
317
403
}
404
+
318
405
info ! ( log, "notify_task exiting" ) ;
319
406
}
320
407
0 commit comments