Skip to content

Commit df5433f

Browse files
authored
Unify trace and logs runtime extensions traits. (#1067)
Merge the TraceRuntime and LogRuntime traits in the trace and logs module respectively into MessageRuntime, an extension trait to Runtime generic over the type of Message/Signal type to be sent and received. All references to the older traits now become MessageRuntime<trace::BatchMessage> and MessageRuntime<logs::BatchMessage> respectively.
1 parent 2a4a9cd commit df5433f

File tree

18 files changed

+318
-439
lines changed

18 files changed

+318
-439
lines changed

opentelemetry-contrib/src/trace/exporter/jaeger_json.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
44
use async_trait::async_trait;
55
use futures::{future::BoxFuture, FutureExt};
6+
use opentelemetry::runtime::RuntimeChannel;
67
use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter};
7-
use opentelemetry::sdk::trace::{TraceRuntime, Tracer};
8+
use opentelemetry::sdk::trace::{BatchMessage, Tracer};
89
use opentelemetry::trace::{SpanId, TraceError};
910
use opentelemetry_semantic_conventions::SCHEMA_URL;
1011
use std::collections::HashMap;
@@ -211,11 +212,11 @@ fn opentelemetry_value_to_json(value: &opentelemetry::Value) -> (&str, serde_jso
211212
}
212213
}
213214

214-
/// Jaeger Json Runtime is an extension to [`TraceRuntime`].
215+
/// Jaeger Json Runtime is an extension to [`RuntimeChannel`].
215216
///
216-
/// [`TraceRuntime`]: opentelemetry::sdk::trace::TraceRuntime
217+
/// [`RuntimeChannel`]: opentelemetry::sdk::runtime::RuntimeChannel
217218
#[async_trait]
218-
pub trait JaegerJsonRuntime: TraceRuntime + std::fmt::Debug {
219+
pub trait JaegerJsonRuntime: RuntimeChannel<BatchMessage> + std::fmt::Debug {
219220
/// Create a new directory if the given path does not exist yet
220221
async fn create_dir(&self, path: &Path) -> ExportResult;
221222
/// Write the provided content to a new file at the given path

opentelemetry-datadog/src/exporter/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod model;
44
pub use model::ApiVersion;
55
pub use model::Error;
66
pub use model::FieldMappingFn;
7+
use opentelemetry::runtime::RuntimeChannel;
78

89
use std::borrow::Cow;
910
use std::fmt::{Debug, Formatter};
@@ -16,7 +17,7 @@ use opentelemetry::sdk::export::trace;
1617
use opentelemetry::sdk::export::trace::SpanData;
1718
use opentelemetry::sdk::resource::ResourceDetector;
1819
use opentelemetry::sdk::resource::SdkProvidedResourceDetector;
19-
use opentelemetry::sdk::trace::{Config, TraceRuntime};
20+
use opentelemetry::sdk::trace::{BatchMessage, Config};
2021
use opentelemetry::sdk::Resource;
2122
use opentelemetry::trace::TraceError;
2223
use opentelemetry::{global, sdk, trace::TracerProvider, KeyValue};
@@ -301,7 +302,7 @@ impl DatadogPipelineBuilder {
301302

302303
/// Install the Datadog trace exporter pipeline using a batch span processor with the specified
303304
/// runtime.
304-
pub fn install_batch<R: TraceRuntime>(
305+
pub fn install_batch<R: RuntimeChannel<BatchMessage>>(
305306
mut self,
306307
runtime: R,
307308
) -> Result<sdk::trace::Tracer, TraceError> {

opentelemetry-jaeger/src/exporter/runtime.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
))]
66
use crate::exporter::addrs_and_family;
77
use async_trait::async_trait;
8-
use opentelemetry::sdk::trace::TraceRuntime;
8+
use opentelemetry::{runtime::RuntimeChannel, sdk::trace::BatchMessage};
99
use std::net::ToSocketAddrs;
1010

11-
/// Jaeger Trace Runtime is an extension to [`TraceRuntime`].
11+
/// Jaeger Trace Runtime is an extension to [`RuntimeChannel`].
1212
///
13-
/// [`TraceRuntime`]: opentelemetry::sdk::trace::TraceRuntime
13+
/// [`RuntimeChannel`]: opentelemetry::sdk::runtime::RuntimeChannel
1414
#[async_trait]
15-
pub trait JaegerTraceRuntime: TraceRuntime + std::fmt::Debug {
15+
pub trait JaegerTraceRuntime: RuntimeChannel<BatchMessage> + std::fmt::Debug {
1616
/// A communication socket between Jaeger client and agent.
1717
type Socket: std::fmt::Debug + Send + Sync;
1818

opentelemetry-otlp/src/logs.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use std::{
5555
};
5656

5757
use opentelemetry_api::logs::{LogError, LoggerProvider};
58-
use opentelemetry_sdk::{self, export::logs::LogData, logs::LogRuntime};
58+
use opentelemetry_sdk::{self, export::logs::LogData, logs::BatchMessage, runtime::RuntimeChannel};
5959

6060
impl OtlpPipeline {
6161
/// Create a OTLP logging pipeline.
@@ -435,7 +435,7 @@ impl OtlpLogPipeline {
435435
/// batch log processor.
436436
///
437437
/// [`Logger`]: opentelemetry_sdk::logs::Logger
438-
pub fn batch<R: LogRuntime>(
438+
pub fn batch<R: RuntimeChannel<BatchMessage>>(
439439
self,
440440
runtime: R,
441441
include_trace_context: bool,
@@ -471,7 +471,7 @@ fn build_simple_with_exporter(
471471
)
472472
}
473473

474-
fn build_batch_with_exporter<R: LogRuntime>(
474+
fn build_batch_with_exporter<R: RuntimeChannel<BatchMessage>>(
475475
exporter: LogExporter,
476476
log_config: Option<opentelemetry_sdk::logs::Config>,
477477
runtime: R,

opentelemetry-otlp/src/span.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,12 @@ use opentelemetry_api::{
6060
use opentelemetry_sdk::{
6161
self as sdk,
6262
export::trace::{ExportResult, SpanData},
63-
trace::TraceRuntime,
63+
trace::BatchMessage,
6464
};
6565
use opentelemetry_semantic_conventions::SCHEMA_URL;
6666

6767
use async_trait::async_trait;
68+
use sdk::runtime::RuntimeChannel;
6869

6970
/// Target to which the exporter is going to send spans, defaults to https://localhost:4317/v1/traces.
7071
/// Learn about the relationship between this constant and default/metrics/logs at
@@ -141,7 +142,7 @@ impl OtlpTracePipeline {
141142
/// `install_batch` will panic if not called within a tokio runtime
142143
///
143144
/// [`Tracer`]: opentelemetry_api::trace::Tracer
144-
pub fn install_batch<R: TraceRuntime>(
145+
pub fn install_batch<R: RuntimeChannel<BatchMessage>>(
145146
self,
146147
runtime: R,
147148
) -> Result<sdk::trace::Tracer, TraceError> {
@@ -175,7 +176,7 @@ fn build_simple_with_exporter(
175176
tracer
176177
}
177178

178-
fn build_batch_with_exporter<R: TraceRuntime>(
179+
fn build_batch_with_exporter<R: RuntimeChannel<BatchMessage>>(
179180
exporter: SpanExporter,
180181
trace_config: Option<sdk::trace::Config>,
181182
runtime: R,

opentelemetry-sdk/src/logs/log_emitter.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
use super::{BatchLogProcessor, Config, LogProcessor, LogRuntime, SimpleLogProcessor};
2-
use crate::export::logs::{LogData, LogExporter};
1+
use super::{BatchLogProcessor, BatchMessage, Config, LogProcessor, SimpleLogProcessor};
2+
use crate::{
3+
export::logs::{LogData, LogExporter},
4+
runtime::RuntimeChannel,
5+
};
36
use opentelemetry_api::{
47
global::{self},
58
logs::{LogRecord, LogResult},
@@ -129,7 +132,7 @@ impl Builder {
129132
}
130133

131134
/// The `LogExporter` setup using a default `BatchLogProcessor` that this provider should use.
132-
pub fn with_batch_exporter<T: LogExporter + 'static, R: LogRuntime>(
135+
pub fn with_batch_exporter<T: LogExporter + 'static, R: RuntimeChannel<BatchMessage>>(
133136
self,
134137
exporter: T,
135138
runtime: R,

opentelemetry-sdk/src/logs/log_processor.rs

+12-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
use super::LogRuntime;
21
use crate::{
32
export::logs::{ExportResult, LogData, LogExporter},
4-
logs::TrySend,
3+
runtime::{RuntimeChannel, TrySend},
54
};
65
use futures_channel::oneshot;
76
use futures_util::{
@@ -101,31 +100,32 @@ impl LogProcessor for SimpleLogProcessor {
101100

102101
/// A [`LogProcessor`] that asynchronously buffers log records and reports
103102
/// them at a preconfigured interval.
104-
pub struct BatchLogProcessor<R: LogRuntime> {
103+
pub struct BatchLogProcessor<R: RuntimeChannel<BatchMessage>> {
105104
message_sender: R::Sender,
106105
}
107106

108-
impl<R: LogRuntime> Debug for BatchLogProcessor<R> {
107+
impl<R: RuntimeChannel<BatchMessage>> Debug for BatchLogProcessor<R> {
109108
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
110109
f.debug_struct("BatchLogProcessor")
111110
.field("message_sender", &self.message_sender)
112111
.finish()
113112
}
114113
}
115114

116-
impl<R: LogRuntime> LogProcessor for BatchLogProcessor<R> {
115+
impl<R: RuntimeChannel<BatchMessage>> LogProcessor for BatchLogProcessor<R> {
117116
fn emit(&self, data: LogData) {
118117
let result = self.message_sender.try_send(BatchMessage::ExportLog(data));
119118

120119
if let Err(err) = result {
121-
global::handle_error(err);
120+
global::handle_error(LogError::Other(err.into()));
122121
}
123122
}
124123

125124
fn force_flush(&self) -> LogResult<()> {
126125
let (res_sender, res_receiver) = oneshot::channel();
127126
self.message_sender
128-
.try_send(BatchMessage::Flush(Some(res_sender)))?;
127+
.try_send(BatchMessage::Flush(Some(res_sender)))
128+
.map_err(|err| LogError::Other(err.into()))?;
129129

130130
futures_executor::block_on(res_receiver)
131131
.map_err(|err| LogError::Other(err.into()))
@@ -135,15 +135,16 @@ impl<R: LogRuntime> LogProcessor for BatchLogProcessor<R> {
135135
fn shutdown(&mut self) -> LogResult<()> {
136136
let (res_sender, res_receiver) = oneshot::channel();
137137
self.message_sender
138-
.try_send(BatchMessage::Shutdown(res_sender))?;
138+
.try_send(BatchMessage::Shutdown(res_sender))
139+
.map_err(|err| LogError::Other(err.into()))?;
139140

140141
futures_executor::block_on(res_receiver)
141142
.map_err(|err| LogError::Other(err.into()))
142143
.and_then(std::convert::identity)
143144
}
144145
}
145146

146-
impl<R: LogRuntime> BatchLogProcessor<R> {
147+
impl<R: RuntimeChannel<BatchMessage>> BatchLogProcessor<R> {
147148
pub(crate) fn new(mut exporter: Box<dyn LogExporter>, config: BatchConfig, runtime: R) -> Self {
148149
let (message_sender, message_receiver) =
149150
runtime.batch_message_channel(config.max_queue_size);
@@ -247,7 +248,7 @@ async fn export_with_timeout<R, E>(
247248
batch: Vec<LogData>,
248249
) -> ExportResult
249250
where
250-
R: LogRuntime,
251+
R: RuntimeChannel<BatchMessage>,
251252
E: LogExporter + ?Sized,
252253
{
253254
if batch.is_empty() {
@@ -308,7 +309,7 @@ pub struct BatchLogProcessorBuilder<E, R> {
308309
impl<E, R> BatchLogProcessorBuilder<E, R>
309310
where
310311
E: LogExporter + 'static,
311-
R: LogRuntime,
312+
R: RuntimeChannel<BatchMessage>,
312313
{
313314
/// Set max queue size for batches
314315
pub fn with_max_queue_size(self, size: usize) -> Self {

opentelemetry-sdk/src/logs/mod.rs

-2
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,10 @@
33
mod config;
44
mod log_emitter;
55
mod log_processor;
6-
mod runtime;
76

87
pub use config::Config;
98
pub use log_emitter::{Builder, Logger, LoggerProvider};
109
pub use log_processor::{
1110
BatchConfig, BatchLogProcessor, BatchLogProcessorBuilder, BatchMessage, LogProcessor,
1211
SimpleLogProcessor,
1312
};
14-
pub use runtime::{LogRuntime, TrySend};

opentelemetry-sdk/src/logs/runtime.rs

-119
This file was deleted.

0 commit comments

Comments
 (0)