@@ -449,22 +449,28 @@ impl<R: RuntimeChannel> BatchSpanProcessor<R> {
449
449
pub ( crate ) fn new ( exporter : Box < dyn SpanExporter > , config : BatchConfig , runtime : R ) -> Self {
450
450
let ( message_sender, message_receiver) =
451
451
runtime. batch_message_channel ( config. max_queue_size ) ;
452
- let ticker = runtime
453
- . interval ( config. scheduled_delay )
454
- . map ( |_| BatchMessage :: Flush ( None ) ) ;
455
- let timeout_runtime = runtime. clone ( ) ;
456
-
457
- let messages = Box :: pin ( stream:: select ( message_receiver, ticker) ) ;
458
- let processor = BatchSpanProcessorInternal {
459
- spans : Vec :: new ( ) ,
460
- export_tasks : FuturesUnordered :: new ( ) ,
461
- runtime : timeout_runtime,
462
- config,
463
- exporter,
464
- } ;
465
452
453
+ let inner_runtime = runtime. clone ( ) ;
466
454
// Spawn worker process via user-defined spawn function.
467
- runtime. spawn ( Box :: pin ( processor. run ( messages) ) ) ;
455
+ runtime. spawn ( Box :: pin ( async move {
456
+ // Timer will take a reference to the current runtime, so its important we do this within the
457
+ // runtime.spawn()
458
+ let ticker = inner_runtime
459
+ . interval ( config. scheduled_delay )
460
+ . map ( |_| BatchMessage :: Flush ( None ) ) ;
461
+ let timeout_runtime = inner_runtime. clone ( ) ;
462
+
463
+ let messages = Box :: pin ( stream:: select ( message_receiver, ticker) ) ;
464
+ let processor = BatchSpanProcessorInternal {
465
+ spans : Vec :: new ( ) ,
466
+ export_tasks : FuturesUnordered :: new ( ) ,
467
+ runtime : timeout_runtime,
468
+ config,
469
+ exporter,
470
+ } ;
471
+
472
+ processor. run ( messages) . await
473
+ } ) ) ;
468
474
469
475
// Return batch processor with link to worker
470
476
BatchSpanProcessor { message_sender }
0 commit comments