@@ -148,7 +148,7 @@ impl PeriodicReader {
148
148
let exporter_arc = Arc :: new ( exporter) ;
149
149
let reader = PeriodicReader {
150
150
inner : Arc :: new ( PeriodicReaderInner {
151
- message_sender : Arc :: new ( message_sender ) ,
151
+ message_sender,
152
152
producer : Mutex :: new ( None ) ,
153
153
exporter : exporter_arc. clone ( ) ,
154
154
} ) ,
@@ -294,7 +294,7 @@ impl fmt::Debug for PeriodicReader {
294
294
295
295
struct PeriodicReaderInner {
296
296
exporter : Arc < dyn PushMetricExporter > ,
297
- message_sender : Arc < mpsc:: Sender < Message > > ,
297
+ message_sender : mpsc:: Sender < Message > ,
298
298
producer : Mutex < Option < Weak < dyn SdkProducer > > > ,
299
299
}
300
300
@@ -320,15 +320,23 @@ impl PeriodicReaderInner {
320
320
}
321
321
}
322
322
323
- fn collect_and_export ( & self , _timeout : Duration ) -> MetricResult < ( ) > {
323
+ fn collect_and_export ( & self , timeout : Duration ) -> MetricResult < ( ) > {
324
324
// TODO: Reuse the internal vectors. Or refactor to avoid needing any
325
325
// owned data structures to be passed to exporters.
326
326
let mut rm = ResourceMetrics {
327
327
resource : Resource :: empty ( ) ,
328
328
scope_metrics : Vec :: new ( ) ,
329
329
} ;
330
330
331
+ // Measure time taken for collect, and subtract it from the timeout.
332
+ let current_time = Instant :: now ( ) ;
331
333
let collect_result = self . collect ( & mut rm) ;
334
+ let time_taken_for_collect = current_time. elapsed ( ) ;
335
+ let _timeout = if time_taken_for_collect > timeout {
336
+ Duration :: from_secs ( 0 )
337
+ } else {
338
+ timeout - time_taken_for_collect
339
+ } ;
332
340
#[ allow( clippy:: question_mark) ]
333
341
if let Err ( e) = collect_result {
334
342
otel_warn ! (
@@ -346,15 +354,10 @@ impl PeriodicReaderInner {
346
354
let metrics_count = rm. scope_metrics . iter ( ) . fold ( 0 , |count, scope_metrics| {
347
355
count + scope_metrics. metrics . len ( )
348
356
} ) ;
349
- otel_debug ! ( name: "PeriodicReaderMetricsCollected" , count = metrics_count) ;
357
+ otel_debug ! ( name: "PeriodicReaderMetricsCollected" , count = metrics_count, time_taken_in_millis = time_taken_for_collect . as_millis ( ) ) ;
350
358
351
- // TODO: subtract the time taken for collect from the timeout. collect
352
- // involves observable callbacks too, which are user defined and can
353
- // take arbitrary time.
354
- //
355
359
// Relying on futures executor to execute async call.
356
- // TODO: Add timeout and pass it to exporter or consider alternative
357
- // design to enforce timeout here.
360
+ // TODO: Pass timeout to exporter
358
361
let exporter_result = futures_executor:: block_on ( self . exporter . export ( & mut rm) ) ;
359
362
#[ allow( clippy:: question_mark) ]
360
363
if let Err ( e) = exporter_result {
0 commit comments