|
| 1 | +use opentelemetry::global::{self, set_error_handler, Error as OtelError}; |
| 2 | +use opentelemetry::KeyValue; |
| 3 | +use opentelemetry_appender_tracing::layer; |
| 4 | +use opentelemetry_otlp::WithExportConfig; |
| 5 | +use tracing_subscriber::prelude::*; |
| 6 | +use tracing_subscriber::EnvFilter; |
| 7 | + |
| 8 | +use std::error::Error; |
| 9 | +use tracing::error; |
| 10 | + |
| 11 | +use once_cell::sync::Lazy; |
| 12 | +use std::collections::HashSet; |
| 13 | +use std::sync::{Arc, Mutex}; |
| 14 | + |
| 15 | +use ctrlc; |
| 16 | +use std::sync::mpsc::channel; |
| 17 | + |
| 18 | +struct ErrorState { |
| 19 | + seen_errors: Mutex<HashSet<String>>, |
| 20 | +} |
| 21 | + |
| 22 | +impl ErrorState { |
| 23 | + fn new() -> Self { |
| 24 | + ErrorState { |
| 25 | + seen_errors: Mutex::new(HashSet::new()), |
| 26 | + } |
| 27 | + } |
| 28 | + |
| 29 | + fn mark_as_seen(&self, err: &OtelError) -> bool { |
| 30 | + let mut seen_errors = self.seen_errors.lock().unwrap(); |
| 31 | + seen_errors.insert(err.to_string()) |
| 32 | + } |
| 33 | +} |
| 34 | + |
| 35 | +static GLOBAL_ERROR_STATE: Lazy<Arc<ErrorState>> = Lazy::new(|| Arc::new(ErrorState::new())); |
| 36 | + |
| 37 | +fn custom_error_handler(err: OtelError) { |
| 38 | + if GLOBAL_ERROR_STATE.mark_as_seen(&err) { |
| 39 | + // log error not already seen |
| 40 | + match err { |
| 41 | + OtelError::Metric(err) => error!("OpenTelemetry metrics error occurred: {}", err), |
| 42 | + OtelError::Trace(err) => error!("OpenTelemetry trace error occurred: {}", err), |
| 43 | + OtelError::Log(err) => error!("OpenTelemetry log error occurred: {}", err), |
| 44 | + OtelError::Propagation(err) => { |
| 45 | + error!("OpenTelemetry propagation error occurred: {}", err) |
| 46 | + } |
| 47 | + OtelError::Other(err_msg) => error!("OpenTelemetry error occurred: {}", err_msg), |
| 48 | + _ => error!("OpenTelemetry error occurred: {:?}", err), |
| 49 | + } |
| 50 | + } |
| 51 | +} |
| 52 | + |
| 53 | +fn init_logger_provider() -> opentelemetry_sdk::logs::LoggerProvider { |
| 54 | + let provider = opentelemetry_otlp::new_pipeline() |
| 55 | + .logging() |
| 56 | + .with_exporter( |
| 57 | + opentelemetry_otlp::new_exporter() |
| 58 | + .http() |
| 59 | + .with_endpoint("http://localhost:4318/v1/logs"), |
| 60 | + ) |
| 61 | + .install_batch(opentelemetry_sdk::runtime::Tokio) |
| 62 | + .unwrap(); |
| 63 | + |
| 64 | + // Add a tracing filter to filter events from crates used by opentelemetry-otlp. |
| 65 | + // The filter levels are set as follows: |
| 66 | + // - Allow `info` level and above by default. |
| 67 | + // - Restrict `hyper`, `tonic`, and `reqwest` to `error` level logs only. |
| 68 | + // This ensures events generated from these crates within the OTLP Exporter are not looped back, |
| 69 | + // thus preventing infinite event generation. |
| 70 | + // Note: This will also drop events from these crates used outside the OTLP Exporter. |
| 71 | + // For more details, see: https://github.com/open-telemetry/opentelemetry-rust/issues/761 |
| 72 | + let filter = EnvFilter::new("info") |
| 73 | + .add_directive("hyper=error".parse().unwrap()) |
| 74 | + .add_directive("tonic=error".parse().unwrap()) |
| 75 | + .add_directive("reqwest=error".parse().unwrap()); |
| 76 | + let cloned_provider = provider.clone(); |
| 77 | + let layer = layer::OpenTelemetryTracingBridge::new(&cloned_provider); |
| 78 | + tracing_subscriber::registry() |
| 79 | + .with(filter) |
| 80 | + .with(layer) |
| 81 | + .init(); |
| 82 | + provider |
| 83 | +} |
| 84 | + |
| 85 | +fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider { |
| 86 | + let provider = opentelemetry_otlp::new_pipeline() |
| 87 | + .metrics(opentelemetry_sdk::runtime::Tokio) |
| 88 | + .with_period(std::time::Duration::from_secs(1)) |
| 89 | + .with_exporter( |
| 90 | + opentelemetry_otlp::new_exporter() |
| 91 | + .http() |
| 92 | + .with_endpoint("http://localhost:4318/v1/metrics"), |
| 93 | + ) |
| 94 | + .build() |
| 95 | + .unwrap(); |
| 96 | + let cloned_provider = provider.clone(); |
| 97 | + global::set_meter_provider(cloned_provider); |
| 98 | + provider |
| 99 | +} |
| 100 | + |
| 101 | +#[tokio::main] |
| 102 | +async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> { |
| 103 | + // Set the custom error handler |
| 104 | + if let Err(err) = set_error_handler(custom_error_handler) { |
| 105 | + eprintln!("Failed to set custom error handler: {}", err); |
| 106 | + } |
| 107 | + |
| 108 | + let logger_provider = init_logger_provider(); |
| 109 | + |
| 110 | + // Initialize the MeterProvider with the stdout Exporter. |
| 111 | + let meter_provider = init_meter_provider(); |
| 112 | + |
| 113 | + // Create a meter from the above MeterProvider. |
| 114 | + let meter = global::meter("example"); |
| 115 | + // Create a Counter Instrument. |
| 116 | + let counter = meter.u64_counter("my_counter").init(); |
| 117 | + |
| 118 | + // Record measurements with unique key-value pairs to exceed the cardinality limit |
| 119 | + // of 2000 and trigger error message |
| 120 | + for i in 0..3000 { |
| 121 | + counter.add( |
| 122 | + 10, |
| 123 | + &[KeyValue::new( |
| 124 | + format!("mykey{}", i), |
| 125 | + format!("myvalue{}", i), |
| 126 | + )], |
| 127 | + ); |
| 128 | + } |
| 129 | + |
| 130 | + let (tx, rx) = channel(); |
| 131 | + |
| 132 | + ctrlc::set_handler(move || tx.send(()).expect("Could not send signal on channel.")) |
| 133 | + .expect("Error setting Ctrl-C handler"); |
| 134 | + |
| 135 | + println!("Press Ctrl-C to continue..."); |
| 136 | + rx.recv().expect("Could not receive from channel."); |
| 137 | + println!("Got Ctrl-C, Doing shutdown and existing."); |
| 138 | + |
| 139 | + // MeterProvider is configured with an OTLP Exporter to export metrics every 1 second, |
| 140 | + // however shutting down the MeterProvider here instantly flushes |
| 141 | + // the metrics, instead of waiting for the 1 sec interval. |
| 142 | + meter_provider.shutdown()?; |
| 143 | + let _ = logger_provider.shutdown(); |
| 144 | + Ok(()) |
| 145 | +} |
0 commit comments