@@ -26,6 +26,7 @@ use oximeter_api::ProducerDetails;
26
26
use oximeter_db:: Client ;
27
27
use oximeter_db:: DbWrite ;
28
28
use qorb:: claim:: Handle ;
29
+ use qorb:: policy:: Policy ;
29
30
use qorb:: pool:: Pool ;
30
31
use qorb:: resolver:: BoxedResolver ;
31
32
use slog:: Logger ;
@@ -56,11 +57,12 @@ pub struct OximeterAgent {
56
57
log : Logger ,
57
58
// Oximeter target used by this agent to produce metrics about itself.
58
59
collection_target : self_stats:: OximeterCollector ,
59
- // Handle to the TX-side of a channel for collecting results from the collection tasks
60
- result_sender : mpsc:: Sender < CollectionTaskOutput > ,
60
+ // Wrapper of the two handles to the TX-side of the single-node and cluster
61
+ // channels for collecting results from the collection tasks.
62
+ result_sender : CollectionTaskSenderWrapper ,
61
63
// Handle to each Tokio task collection from a single producer.
62
64
collection_tasks : Arc < Mutex < BTreeMap < Uuid , CollectionTaskHandle > > > ,
63
- // The interval on which we refresh our list of producers from Nexus
65
+ // The interval on which we refresh our list of producers from Nexus.
64
66
refresh_interval : Duration ,
65
67
// Handle to the task used to periodically refresh the list of producers.
66
68
refresh_task : Arc < StdMutex < Option < tokio:: task:: JoinHandle < ( ) > > > > ,
@@ -70,22 +72,31 @@ pub struct OximeterAgent {
70
72
71
73
impl OximeterAgent {
72
74
/// Construct a new agent with the given ID and logger.
75
+ // TODO: Remove this linter exception once we only write to a
76
+ // single database
77
+ #[ allow( clippy:: too_many_arguments) ]
73
78
pub async fn with_id (
74
79
id : Uuid ,
75
80
address : SocketAddrV6 ,
76
81
refresh_interval : Duration ,
77
82
db_config : DbConfig ,
78
83
native_resolver : BoxedResolver ,
84
+ // Temporary resolver to write to a replicated ClickHouse
85
+ // cluster as well as a single-node installation.
86
+ cluster_resolver : BoxedResolver ,
79
87
log : & Logger ,
80
88
replicated : bool ,
81
89
) -> Result < Self , Error > {
82
- let ( result_sender, result_receiver) = mpsc:: channel ( 8 ) ;
90
+ let collection_task_wrapper = CollectionTaskWrapper :: new ( ) ;
91
+
83
92
let log = log. new ( o ! (
84
93
"component" => "oximeter-agent" ,
85
94
"collector_id" => id. to_string( ) ,
86
95
"collector_ip" => address. ip( ) . to_string( ) ,
87
96
) ) ;
88
97
let insertion_log = log. new ( o ! ( "component" => "results-sink" ) ) ;
98
+ let instertion_log_cluster =
99
+ log. new ( o ! ( "component" => "results-sink-cluster" ) ) ;
89
100
90
101
// Determine the version of the database.
91
102
//
@@ -126,14 +137,54 @@ impl OximeterAgent {
126
137
collector_port : address. port ( ) ,
127
138
} ;
128
139
129
- // Spawn the task for aggregating and inserting all metrics
140
+ // Spawn the task for aggregating and inserting all metrics to a
141
+ // single node ClickHouse installation.
130
142
tokio:: spawn ( async move {
131
143
crate :: results_sink:: database_inserter (
132
144
insertion_log,
133
145
client,
134
146
db_config. batch_size ,
135
147
Duration :: from_secs ( db_config. batch_interval ) ,
136
- result_receiver,
148
+ collection_task_wrapper. single_rx ,
149
+ )
150
+ . await
151
+ } ) ;
152
+
153
+ // Our internal testing rack will be running a ClickHouse cluster
154
+ // alongside a single-node installation for a while. We want to handle
155
+ // the case of these two installations running alongside each other, and
156
+ // oximeter writing to both of them. On our production racks ClickHouse
157
+ // will only be run on single-node modality, so we'll ignore all cases where
158
+ // the `ClickhouseClusterNative` service is not available.
159
+ // This will be done by spawning a second task for DB inserts to a replicated
160
+ // ClickHouse cluster. If oximeter cannot connect to the database, it will
161
+ // simply log a warning and move on.
162
+
163
+ // Temporary additional client that writes to a replicated cluster
164
+ // This will be removed once we phase out the single node installation.
165
+ //
166
+ // We don't need to check whether the DB is at the expected version since
167
+ // this is already handled by reconfigurator via clickhouse-admin.
168
+ //
169
+ // We have a short claim timeout so oximeter can move on quickly if the cluster
170
+ // does not exist.
171
+ let claim_policy = Policy {
172
+ claim_timeout : Duration :: from_millis ( 100 ) ,
173
+ ..Default :: default ( )
174
+ } ;
175
+
176
+ let cluster_client =
177
+ Client :: new_with_pool_policy ( cluster_resolver, claim_policy, & log) ;
178
+
179
+ // Spawn the task for aggregating and inserting all metrics to a
180
+ // replicated cluster ClickHouse installation
181
+ tokio:: spawn ( async move {
182
+ results_sink:: database_inserter (
183
+ instertion_log_cluster,
184
+ cluster_client,
185
+ db_config. batch_size ,
186
+ Duration :: from_secs ( db_config. batch_interval ) ,
187
+ collection_task_wrapper. cluster_rx ,
137
188
)
138
189
. await
139
190
} ) ;
@@ -142,7 +193,7 @@ impl OximeterAgent {
142
193
id,
143
194
log,
144
195
collection_target,
145
- result_sender,
196
+ result_sender : collection_task_wrapper . wrapper_tx ,
146
197
collection_tasks : Arc :: new ( Mutex :: new ( BTreeMap :: new ( ) ) ) ,
147
198
refresh_interval,
148
199
refresh_task : Arc :: new ( StdMutex :: new ( None ) ) ,
@@ -183,13 +234,14 @@ impl OximeterAgent {
183
234
db_config : Option < DbConfig > ,
184
235
log : & Logger ,
185
236
) -> Result < Self , Error > {
186
- let ( result_sender, result_receiver) = mpsc:: channel ( 8 ) ;
187
237
let log = log. new ( o ! (
188
238
"component" => "oximeter-standalone" ,
189
239
"collector_id" => id. to_string( ) ,
190
240
"collector_ip" => address. ip( ) . to_string( ) ,
191
241
) ) ;
192
242
243
+ let collection_task_wrapper = CollectionTaskWrapper :: new ( ) ;
244
+
193
245
// If we have configuration for ClickHouse, we'll spawn the results
194
246
// sink task as usual. If not, we'll spawn a dummy task that simply
195
247
// prints the results as they're received.
@@ -218,12 +270,15 @@ impl OximeterAgent {
218
270
client,
219
271
db_config. batch_size ,
220
272
Duration :: from_secs ( db_config. batch_interval ) ,
221
- result_receiver ,
273
+ collection_task_wrapper . single_rx ,
222
274
)
223
275
. await
224
276
} ) ;
225
277
} else {
226
- tokio:: spawn ( results_sink:: logger ( insertion_log, result_receiver) ) ;
278
+ tokio:: spawn ( results_sink:: logger (
279
+ insertion_log,
280
+ collection_task_wrapper. single_rx ,
281
+ ) ) ;
227
282
}
228
283
229
284
// Set up tracking of statistics about ourselves.
@@ -242,7 +297,7 @@ impl OximeterAgent {
242
297
id,
243
298
log,
244
299
collection_target,
245
- result_sender,
300
+ result_sender : collection_task_wrapper . wrapper_tx ,
246
301
collection_tasks : Arc :: new ( Mutex :: new ( BTreeMap :: new ( ) ) ) ,
247
302
refresh_interval,
248
303
refresh_task : Arc :: new ( StdMutex :: new ( None ) ) ,
@@ -434,6 +489,60 @@ impl OximeterAgent {
434
489
}
435
490
}
436
491
492
+ #[ derive( Debug , Clone ) ]
493
+ pub struct CollectionTaskSenderWrapper {
494
+ single_tx : mpsc:: Sender < CollectionTaskOutput > ,
495
+ cluster_tx : mpsc:: Sender < CollectionTaskOutput > ,
496
+ }
497
+
498
+ impl CollectionTaskSenderWrapper {
499
+ pub async fn send (
500
+ & self ,
501
+ msg : CollectionTaskOutput ,
502
+ log : & Logger ,
503
+ ) -> anyhow:: Result < ( ) > {
504
+ let ( result_single, result_cluster) = futures:: future:: join (
505
+ self . single_tx . send ( msg. clone ( ) ) ,
506
+ self . cluster_tx . send ( msg) ,
507
+ )
508
+ . await ;
509
+
510
+ if let Err ( e) = result_single {
511
+ error ! (
512
+ log,
513
+ "failed to send value from the collection task to channel for single node: {e:?}"
514
+ ) ;
515
+ } ;
516
+ if let Err ( e) = result_cluster {
517
+ error ! (
518
+ log,
519
+ "failed to send value from the collection task to channel for cluster: {e:?}"
520
+ ) ;
521
+ } ;
522
+ Ok ( ( ) )
523
+ }
524
+ }
525
+
526
+ #[ derive( Debug ) ]
527
+ pub struct CollectionTaskWrapper {
528
+ wrapper_tx : CollectionTaskSenderWrapper ,
529
+ single_rx : mpsc:: Receiver < CollectionTaskOutput > ,
530
+ cluster_rx : mpsc:: Receiver < CollectionTaskOutput > ,
531
+ }
532
+
533
+ impl CollectionTaskWrapper {
534
+ pub fn new ( ) -> Self {
535
+ let ( single_tx, single_rx) = mpsc:: channel ( 8 ) ;
536
+ let ( cluster_tx, cluster_rx) = mpsc:: channel ( 8 ) ;
537
+
538
+ Self {
539
+ wrapper_tx : CollectionTaskSenderWrapper { single_tx, cluster_tx } ,
540
+ single_rx,
541
+ cluster_rx,
542
+ }
543
+ }
544
+ }
545
+
437
546
// A task which periodically updates our list of producers from Nexus.
438
547
async fn refresh_producer_list_task (
439
548
agent : OximeterAgent ,
@@ -543,7 +652,7 @@ async fn claim_nexus_with_backoff(
543
652
"failed to lookup Nexus IP, will retry" ;
544
653
"delay" => ?delay,
545
654
// No `InlineErrorChain` here: `error` is a string
546
- "error" => error,
655
+ "error" => % error,
547
656
) ;
548
657
} ;
549
658
let do_lookup = || async {
0 commit comments