Skip to content

Commit 54c1c0a

Browse files
authored
Merge branch 'main' into cijothomas/fixperiodicreaderpanic
2 parents 444ec7a + dde68a0 commit 54c1c0a

File tree

2 files changed

+22
-32
lines changed

2 files changed

+22
-32
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ impl LogProcessor for BatchLogProcessor {
343343
// a log, emit a warning.
344344
if self.dropped_logs_count.fetch_add(1, Ordering::Relaxed) == 0 {
345345
otel_warn!(name: "BatchLogProcessor.LogDroppingStarted",
346-
message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
346+
message = "BatchLogProcessor dropped a LogRecord due to queue full. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
347347
}
348348
}
349349
Err(mpsc::TrySendError::Disconnected(_)) => {
@@ -376,9 +376,9 @@ impl LogProcessor for BatchLogProcessor {
376376
// If the control message could not be sent, emit a warning.
377377
otel_debug!(
378378
name: "BatchLogProcessor.ForceFlush.ControlChannelFull",
379-
message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call."
379+
message = "Control message to flush the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
380380
);
381-
LogResult::Err(LogError::Other("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call.".into()))
381+
LogResult::Err(LogError::Other("ForceFlush cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
382382
}
383383
Err(mpsc::TrySendError::Disconnected(_)) => {
384384
// Given background thread is the only receiver, and it's
@@ -404,7 +404,7 @@ impl LogProcessor for BatchLogProcessor {
404404
name: "BatchLogProcessor.LogsDropped",
405405
dropped_logs_count = dropped_logs,
406406
max_queue_size = max_queue_size,
407-
message = "Logs were dropped due to a queue being full or other error. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
407+
message = "Logs were dropped due to a queue being full. The count represents the total count of log records dropped in the lifetime of this BatchLogProcessor. Consider increasing the queue size and/or decrease delay between intervals."
408408
);
409409
}
410410

@@ -442,9 +442,9 @@ impl LogProcessor for BatchLogProcessor {
442442
// If the control message could not be sent, emit a warning.
443443
otel_debug!(
444444
name: "BatchLogProcessor.Shutdown.ControlChannelFull",
445-
message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call."
445+
message = "Control message to shutdown the worker thread could not be sent as the control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call."
446446
);
447-
LogResult::Err(LogError::Other("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush without finishing the previous call.".into()))
447+
LogResult::Err(LogError::Other("Shutdown cannot be performed as Control channel is full. This can occur if user repeatedily calls force_flush/shutdown without finishing the previous call.".into()))
448448
}
449449
Err(mpsc::TrySendError::Disconnected(_)) => {
450450
// Given background thread is the only receiver, and it's

opentelemetry-sdk/src/metrics/periodic_reader.rs

+16-26
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,13 @@ impl PeriodicReaderInner {
367367
.produce(rm)?;
368368
Ok(())
369369
} else {
370-
Err(MetricError::Other("pipeline is not registered".into()))
370+
otel_warn!(
371+
name: "PeriodReader.MeterProviderNotRegistered",
372+
message = "PeriodicReader is not registered with MeterProvider. Metrics will not be collected. \
373+
This occurs when a periodic reader is created but not associated with a MeterProvider \
374+
by calling `.with_reader(reader)` on MeterProviderBuilder."
375+
);
376+
Err(MetricError::Other("MeterProvider is not registered".into()))
371377
}
372378
}
373379

@@ -512,7 +518,7 @@ impl MetricReader for PeriodicReader {
512518
mod tests {
513519
use super::PeriodicReader;
514520
use crate::{
515-
error::ShutdownResult,
521+
error::{ShutdownError, ShutdownResult},
516522
metrics::{
517523
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
518524
InMemoryMetricExporter, MetricError, MetricResult, SdkMeterProvider, Temporality,
@@ -634,11 +640,8 @@ mod tests {
634640
#[test]
635641
fn shutdown_repeat() {
636642
// Arrange
637-
let interval = std::time::Duration::from_millis(1);
638643
let exporter = InMemoryMetricExporter::default();
639-
let reader = PeriodicReader::builder(exporter.clone())
640-
.with_interval(interval)
641-
.build();
644+
let reader = PeriodicReader::builder(exporter.clone()).build();
642645

643646
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
644647
let result = meter_provider.shutdown();
@@ -647,20 +650,19 @@ mod tests {
647650
// calling shutdown again should return Err
648651
let result = meter_provider.shutdown();
649652
assert!(result.is_err());
653+
assert!(matches!(result, Err(ShutdownError::AlreadyShutdown)));
650654

651655
// calling shutdown again should return Err
652656
let result = meter_provider.shutdown();
653657
assert!(result.is_err());
658+
assert!(matches!(result, Err(ShutdownError::AlreadyShutdown)));
654659
}
655660

656661
#[test]
657662
fn flush_after_shutdown() {
658663
// Arrange
659-
let interval = std::time::Duration::from_millis(1);
660664
let exporter = InMemoryMetricExporter::default();
661-
let reader = PeriodicReader::builder(exporter.clone())
662-
.with_interval(interval)
663-
.build();
665+
let reader = PeriodicReader::builder(exporter.clone()).build();
664666

665667
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
666668
let result = meter_provider.force_flush();
@@ -677,11 +679,8 @@ mod tests {
677679
#[test]
678680
fn flush_repeat() {
679681
// Arrange
680-
let interval = std::time::Duration::from_millis(1);
681682
let exporter = InMemoryMetricExporter::default();
682-
let reader = PeriodicReader::builder(exporter.clone())
683-
.with_interval(interval)
684-
.build();
683+
let reader = PeriodicReader::builder(exporter.clone()).build();
685684

686685
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
687686
let result = meter_provider.force_flush();
@@ -695,11 +694,8 @@ mod tests {
695694
#[test]
696695
fn periodic_reader_without_pipeline() {
697696
// Arrange
698-
let interval = std::time::Duration::from_millis(1);
699697
let exporter = InMemoryMetricExporter::default();
700-
let reader = PeriodicReader::builder(exporter.clone())
701-
.with_interval(interval)
702-
.build();
698+
let reader = PeriodicReader::builder(exporter.clone()).build();
703699

704700
let rm = &mut ResourceMetrics {
705701
resource: Resource::empty(),
@@ -841,11 +837,8 @@ mod tests {
841837

842838
fn collection_helper(trigger: fn(SdkMeterProvider)) {
843839
// Arrange
844-
let interval = std::time::Duration::from_millis(10);
845840
let exporter = InMemoryMetricExporter::default();
846-
let reader = PeriodicReader::builder(exporter.clone())
847-
.with_interval(interval)
848-
.build();
841+
let reader = PeriodicReader::builder(exporter.clone()).build();
849842
let (sender, receiver) = mpsc::channel();
850843

851844
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
@@ -959,11 +952,8 @@ mod tests {
959952
}
960953

961954
fn tokio_async_inside_observable_callback_helper(use_current_tokio_runtime: bool) {
962-
let interval = std::time::Duration::from_millis(10);
963955
let exporter = InMemoryMetricExporter::default();
964-
let reader = PeriodicReader::builder(exporter.clone())
965-
.with_interval(interval)
966-
.build();
956+
let reader = PeriodicReader::builder(exporter.clone()).build();
967957

968958
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
969959
let meter = meter_provider.meter("test");

0 commit comments

Comments
 (0)