Skip to content

Commit 17cce83

Browse files
gruebelcijothomas
andauthoredJan 29, 2025
feat: Replace Zipkin pipeline with exporter builders (open-telemetry#2565)
Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
1 parent 144fdd9 commit 17cce83

File tree

5 files changed

+154
-151
lines changed

5 files changed

+154
-151
lines changed
 

‎opentelemetry-zipkin/CHANGELOG.md

+20
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,26 @@
33
## vNext
44

55
- Bump msrv to 1.75.0.
6+
- **Breaking** The `opentelemetry_zipkin::new_pipeline()` interface is now replaced with `opentelemetry_zipkin::ZipkinExporter::builder()`.
7+
8+
Previous Signature:
9+
```rust
10+
let tracer = opentelemetry_zipkin::new_pipeline()
11+
.with_service_name("trace-demo")
12+
.install_simple()?;
13+
```
14+
Updated Signature:
15+
```rust
16+
let exporter = ZipkinExporter::builder()
17+
.with_service_name("trace-demo")
18+
.build()?;
19+
let provider = TracerProvider::builder()
20+
.with_simple_exporter(exporter)
21+
.build();
22+
global::set_tracer_provider(provider.clone());
23+
24+
let tracer = global::tracer("zipkin-tracer");
25+
```
626

727
## 0.27.0
828

‎opentelemetry-zipkin/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]
2626
reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"]
2727

2828
[dependencies]
29-
async-trait = { workspace = true }
3029
once_cell = { workspace = true }
3130
opentelemetry = { version = "0.27", path = "../opentelemetry" }
3231
opentelemetry_sdk = { version = "0.27", path = "../opentelemetry-sdk", features = ["trace"] }
@@ -41,6 +40,7 @@ thiserror = { workspace = true }
4140
futures-core = { workspace = true }
4241

4342
[dev-dependencies]
43+
async-trait = { workspace = true }
4444
bytes = { workspace = true }
4545
futures-util = { workspace = true, features = ["io"] }
4646
http-body-util = { workspace = true }

‎opentelemetry-zipkin/examples/zipkin.rs

+23-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use opentelemetry::{
22
global::{self},
3-
trace::{Span, Tracer},
3+
trace::{Span, TraceError, Tracer},
4+
InstrumentationScope, KeyValue,
45
};
6+
use opentelemetry_sdk::trace::TracerProvider;
7+
use opentelemetry_zipkin::ZipkinExporter;
58
use std::thread;
69
use std::time::Duration;
710

@@ -12,10 +15,26 @@ fn bar() {
1215
span.end()
1316
}
1417

15-
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
16-
let (tracer, provider) = opentelemetry_zipkin::new_pipeline()
18+
fn init_traces() -> Result<TracerProvider, TraceError> {
19+
let exporter = ZipkinExporter::builder()
1720
.with_service_name("trace-demo")
18-
.install_simple()?;
21+
.build()?;
22+
23+
Ok(TracerProvider::builder()
24+
.with_simple_exporter(exporter)
25+
.build())
26+
}
27+
28+
fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
29+
let provider = init_traces()?;
30+
global::set_tracer_provider(provider.clone());
31+
32+
let common_scope_attributes = vec![KeyValue::new("scope-key", "scope-value")];
33+
let scope = InstrumentationScope::builder("opentelemetry-zipkin")
34+
.with_version(env!("CARGO_PKG_VERSION"))
35+
.with_attributes(common_scope_attributes)
36+
.build();
37+
let tracer = global::tracer_with_scope(scope.clone());
1938

2039
tracer.in_span("foo", |_cx| {
2140
thread::sleep(Duration::from_millis(6));

‎opentelemetry-zipkin/src/exporter/mod.rs

+32-108
Original file line numberDiff line numberDiff line change
@@ -2,59 +2,54 @@ mod env;
22
mod model;
33
mod uploader;
44

5-
use async_trait::async_trait;
65
use futures_core::future::BoxFuture;
76
use http::Uri;
87
use model::endpoint::Endpoint;
9-
use opentelemetry::{global, trace::TraceError, InstrumentationScope, KeyValue};
8+
use opentelemetry::trace::TraceError;
109
use opentelemetry_http::HttpClient;
1110
use opentelemetry_sdk::{
1211
resource::{ResourceDetector, SdkProvidedResourceDetector},
13-
trace,
14-
trace::{Config, Tracer, TracerProvider},
15-
ExportError, Resource,
12+
trace, ExportError,
1613
};
1714
use opentelemetry_semantic_conventions as semcov;
18-
use std::borrow::Cow;
19-
use std::net::SocketAddr;
15+
use std::net::{AddrParseError, SocketAddr};
2016
use std::sync::Arc;
2117

2218
/// Zipkin span exporter
2319
#[derive(Debug)]
24-
pub struct Exporter {
20+
pub struct ZipkinExporter {
2521
local_endpoint: Endpoint,
2622
uploader: uploader::Uploader,
2723
}
2824

29-
impl Exporter {
25+
impl ZipkinExporter {
26+
/// Get a builder to configure a [ZipkinExporter].
27+
pub fn builder() -> ZipkinExporterBuilder {
28+
ZipkinExporterBuilder::default()
29+
}
30+
3031
fn new(local_endpoint: Endpoint, client: Arc<dyn HttpClient>, collector_endpoint: Uri) -> Self {
31-
Exporter {
32+
ZipkinExporter {
3233
local_endpoint,
3334
uploader: uploader::Uploader::new(client, collector_endpoint),
3435
}
3536
}
3637
}
3738

38-
/// Create a new Zipkin exporter pipeline builder.
39-
pub fn new_pipeline() -> ZipkinPipelineBuilder {
40-
ZipkinPipelineBuilder::default()
41-
}
42-
43-
/// Builder for `ExporterConfig` struct.
39+
/// Builder for `ZipkinExporter` struct.
4440
#[derive(Debug)]
45-
pub struct ZipkinPipelineBuilder {
41+
pub struct ZipkinExporterBuilder {
4642
service_name: Option<String>,
4743
service_addr: Option<SocketAddr>,
4844
collector_endpoint: String,
49-
trace_config: Option<Config>,
5045
client: Option<Arc<dyn HttpClient>>,
5146
}
5247

53-
impl Default for ZipkinPipelineBuilder {
48+
impl Default for ZipkinExporterBuilder {
5449
fn default() -> Self {
5550
let timeout = env::get_timeout();
5651

57-
ZipkinPipelineBuilder {
52+
ZipkinExporterBuilder {
5853
#[cfg(feature = "reqwest-blocking-client")]
5954
client: Some(Arc::new(
6055
reqwest::blocking::Client::builder()
@@ -78,58 +73,29 @@ impl Default for ZipkinPipelineBuilder {
7873
service_name: None,
7974
service_addr: None,
8075
collector_endpoint: env::get_endpoint(),
81-
trace_config: None,
8276
}
8377
}
8478
}
8579

86-
impl ZipkinPipelineBuilder {
87-
/// Initial a Zipkin span exporter.
80+
impl ZipkinExporterBuilder {
81+
/// Creates a new [ZipkinExporter] from this configuration.
8882
///
8983
/// Returns error if the endpoint is not valid or if no http client is provided.
90-
pub fn init_exporter(mut self) -> Result<Exporter, TraceError> {
91-
let (_, endpoint) = self.init_config_and_endpoint();
92-
self.init_exporter_with_endpoint(endpoint)
93-
}
94-
95-
fn init_config_and_endpoint(&mut self) -> (Config, Endpoint) {
96-
let service_name = self.service_name.take();
97-
if let Some(service_name) = service_name {
98-
let config = if let Some(mut cfg) = self.trace_config.take() {
99-
cfg.resource = Cow::Owned(
100-
Resource::builder_empty()
101-
.with_attributes(
102-
cfg.resource
103-
.iter()
104-
.filter(|(k, _v)| k.as_str() != semcov::resource::SERVICE_NAME)
105-
.map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
106-
.collect::<Vec<KeyValue>>(),
107-
)
108-
.build(),
109-
);
110-
cfg
111-
} else {
112-
#[allow(deprecated)]
113-
Config::default().with_resource(Resource::builder_empty().build())
114-
};
115-
(config, Endpoint::new(service_name, self.service_addr))
84+
pub fn build(self) -> Result<ZipkinExporter, TraceError> {
85+
let service_name = if let Some(service_name) = self.service_name {
86+
service_name
11687
} else {
117-
let service_name = SdkProvidedResourceDetector
88+
SdkProvidedResourceDetector
11889
.detect()
11990
.get(&semcov::resource::SERVICE_NAME.into())
12091
.unwrap()
121-
.to_string();
122-
(
123-
#[allow(deprecated)]
124-
Config::default().with_resource(Resource::builder_empty().build()),
125-
Endpoint::new(service_name, self.service_addr),
126-
)
127-
}
128-
}
92+
.to_string()
93+
};
94+
95+
let endpoint = Endpoint::new(service_name, self.service_addr);
12996

130-
fn init_exporter_with_endpoint(self, endpoint: Endpoint) -> Result<Exporter, TraceError> {
13197
if let Some(client) = self.client {
132-
let exporter = Exporter::new(
98+
let exporter = ZipkinExporter::new(
13399
endpoint,
134100
client,
135101
self.collector_endpoint
@@ -142,45 +108,6 @@ impl ZipkinPipelineBuilder {
142108
}
143109
}
144110

145-
/// Install the Zipkin trace exporter pipeline with a simple span processor.
146-
#[allow(deprecated)]
147-
pub fn install_simple(
148-
mut self,
149-
) -> Result<(Tracer, opentelemetry_sdk::trace::TracerProvider), TraceError> {
150-
let (config, endpoint) = self.init_config_and_endpoint();
151-
let exporter = self.init_exporter_with_endpoint(endpoint)?;
152-
let mut provider_builder = TracerProvider::builder().with_simple_exporter(exporter);
153-
provider_builder = provider_builder.with_config(config);
154-
let provider = provider_builder.build();
155-
let scope = InstrumentationScope::builder("opentelemetry-zipkin")
156-
.with_version(env!("CARGO_PKG_VERSION"))
157-
.with_schema_url(semcov::SCHEMA_URL)
158-
.build();
159-
let tracer = opentelemetry::trace::TracerProvider::tracer_with_scope(&provider, scope);
160-
let _ = global::set_tracer_provider(provider.clone());
161-
Ok((tracer, provider))
162-
}
163-
164-
/// Install the Zipkin trace exporter pipeline with a batch span processor using the specified
165-
/// runtime.
166-
#[allow(deprecated)]
167-
pub fn install_batch(
168-
mut self,
169-
) -> Result<(Tracer, opentelemetry_sdk::trace::TracerProvider), TraceError> {
170-
let (config, endpoint) = self.init_config_and_endpoint();
171-
let exporter = self.init_exporter_with_endpoint(endpoint)?;
172-
let mut provider_builder = TracerProvider::builder().with_batch_exporter(exporter);
173-
provider_builder = provider_builder.with_config(config);
174-
let provider = provider_builder.build();
175-
let scope = InstrumentationScope::builder("opentelemetry-zipkin")
176-
.with_version(env!("CARGO_PKG_VERSION"))
177-
.with_schema_url(semcov::SCHEMA_URL)
178-
.build();
179-
let tracer = opentelemetry::trace::TracerProvider::tracer_with_scope(&provider, scope);
180-
let _ = global::set_tracer_provider(provider.clone());
181-
Ok((tracer, provider))
182-
}
183-
184111
/// Assign the service name under which to group traces.
185112
pub fn with_service_name<T: Into<String>>(mut self, name: T) -> Self {
186113
self.service_name = Some(name.into());
@@ -193,7 +120,7 @@ impl ZipkinPipelineBuilder {
193120
self
194121
}
195122

196-
/// Assign the service name under which to group traces.
123+
/// Assign the service address.
197124
pub fn with_service_address(mut self, addr: SocketAddr) -> Self {
198125
self.service_addr = Some(addr);
199126
self
@@ -204,12 +131,6 @@ impl ZipkinPipelineBuilder {
204131
self.collector_endpoint = endpoint.into();
205132
self
206133
}
207-
208-
/// Assign the SDK trace configuration.
209-
pub fn with_trace_config(mut self, config: Config) -> Self {
210-
self.trace_config = Some(config);
211-
self
212-
}
213134
}
214135

215136
async fn zipkin_export(
@@ -225,8 +146,7 @@ async fn zipkin_export(
225146
uploader.upload(zipkin_spans).await
226147
}
227148

228-
#[async_trait]
229-
impl trace::SpanExporter for Exporter {
149+
impl trace::SpanExporter for ZipkinExporter {
230150
/// Export spans to Zipkin collector.
231151
fn export(&mut self, batch: Vec<trace::SpanData>) -> BoxFuture<'static, trace::ExportResult> {
232152
Box::pin(zipkin_export(
@@ -253,6 +173,10 @@ pub enum Error {
253173
#[error("invalid uri")]
254174
InvalidUri(#[from] http::uri::InvalidUri),
255175

176+
/// The IP/socket address provided is invalid
177+
#[error("invalid address")]
178+
InvalidAddress(#[from] AddrParseError),
179+
256180
/// Other errors
257181
#[error("export error: {0}")]
258182
Other(String),

‎opentelemetry-zipkin/src/lib.rs

+78-38
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,23 @@
2121
//! telemetry:
2222
//!
2323
//! ```no_run
24-
//! use opentelemetry::trace::{Tracer, TraceError};
2524
//! use opentelemetry::global;
25+
//! use opentelemetry::trace::{Tracer, TraceError};
26+
//! use opentelemetry_sdk::trace::TracerProvider;
27+
//! use opentelemetry_zipkin::ZipkinExporter;
2628
//!
2729
//! fn main() -> Result<(), TraceError> {
2830
//! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new());
29-
//! let (tracer, provider) = opentelemetry_zipkin::new_pipeline().install_simple()?;
31+
//!
32+
//! let exporter = ZipkinExporter::builder()
33+
//! .with_service_name("trace-demo")
34+
//! .build()?;
35+
//! let provider = TracerProvider::builder()
36+
//! .with_simple_exporter(exporter)
37+
//! .build();
38+
//! global::set_tracer_provider(provider.clone());
39+
//!
40+
//! let tracer = global::tracer("zipkin-tracer");
3041
//!
3142
//! tracer.in_span("doing_work", |cx| {
3243
//! // Traced app logic here...
@@ -41,27 +52,35 @@
4152
//! ## Performance
4253
//!
4354
//! For optimal performance, a batch exporter is recommended as the simple exporter
44-
//! will export each span synchronously on drop. You can enable the [`rt-tokio`],
45-
//! [`rt-tokio-current-thread`] or [`rt-async-std`] features and specify a runtime
46-
//! on the pipeline builder to have a batch exporter configured for you
47-
//! automatically.
48-
//!
49-
//! ```toml
50-
//! [dependencies]
51-
//! opentelemetry = { version = "*", features = ["rt-tokio"] }
52-
//! opentelemetry-zipkin = { version = "*", features = ["reqwest-client"], default-features = false }
53-
//! ```
55+
//! will export each span synchronously on drop. You can achieve this by creating a
56+
//! `BatchSpanProcessor` and passing it to the trace provider.
5457
//!
5558
//! ```no_run
56-
//! # fn main() -> Result<(), opentelemetry::trace::TraceError> {
57-
//! let tracer = opentelemetry_zipkin::new_pipeline()
58-
//! .install_batch(opentelemetry_sdk::runtime::Tokio)?;
59-
//! # Ok(())
60-
//! # }
61-
//! ```
59+
//! use opentelemetry_sdk::{
60+
//! trace::{
61+
//! BatchSpanProcessor,
62+
//! BatchConfigBuilder,
63+
//! TracerProvider,
64+
//! }
65+
//! };
66+
//! use opentelemetry_zipkin::ZipkinExporter;
67+
//!
68+
//! fn main() -> Result<(), opentelemetry::trace::TraceError> {
69+
//! let exporter = ZipkinExporter::builder()
70+
//! .with_service_name("runtime-demo")
71+
//! .build()?;
72+
//!
73+
//! let batch = BatchSpanProcessor::builder(exporter)
74+
//! .with_batch_config(BatchConfigBuilder::default().with_max_queue_size(4096).build())
75+
//! .build();
76+
//!
77+
//! let provider = TracerProvider::builder()
78+
//! .with_span_processor(batch)
79+
//! .build();
6280
//!
63-
//! [`rt-tokio`]: https://tokio.rs
64-
//! [`async-std`]: https://async.rs
81+
//! Ok(())
82+
//! }
83+
//! ```
6584
//!
6685
//! ## Choosing an HTTP client
6786
//!
@@ -81,14 +100,14 @@
81100
//! ## Kitchen Sink Full Configuration
82101
//!
83102
//! Example showing how to override all configuration options. See the
84-
//! [`ZipkinPipelineBuilder`] docs for details of each option.
103+
//! [`ZipkinExporterBuilder`] docs for details of each option.
85104
//!
86105
//!
87106
//! ```no_run
88-
//! use opentelemetry::{global, KeyValue, trace::Tracer};
89-
//! use opentelemetry_sdk::{trace::{self, RandomIdGenerator, Sampler}, Resource};
90-
//! use opentelemetry_sdk::trace::ExportResult;
107+
//! use opentelemetry::{global, InstrumentationScope, KeyValue, trace::{Tracer, TraceError}};
108+
//! use opentelemetry_sdk::{trace::{self, ExportResult, RandomIdGenerator, Sampler}, Resource};
91109
//! use opentelemetry_http::{HttpClient, HttpError};
110+
//! use opentelemetry_zipkin::{Error as ZipkinError, ZipkinExporter};
92111
//! use async_trait::async_trait;
93112
//! use bytes::Bytes;
94113
//! use futures_util::io::AsyncReadExt as _;
@@ -129,28 +148,49 @@
129148
//! }
130149
//! }
131150
//!
132-
//! fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
133-
//! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new());
134-
//! let (tracer, provider) = opentelemetry_zipkin::new_pipeline()
151+
//! fn init_traces() -> Result<trace::TracerProvider, TraceError> {
152+
//! let exporter = ZipkinExporter::builder()
135153
//! .with_http_client(
136154
//! HyperClient(
137155
//! Client::builder(TokioExecutor::new())
138156
//! .build_http()
139157
//! )
140158
//! )
141159
//! .with_service_name("my_app")
142-
//! .with_service_address("127.0.0.1:8080".parse()?)
160+
//! .with_service_address(
161+
//! "127.0.0.1:8080"
162+
//! .parse()
163+
//! .map_err::<ZipkinError, _>(Into::into)?
164+
//! )
143165
//! .with_collector_endpoint("http://localhost:9411/api/v2/spans")
144-
//! .with_trace_config(
145-
//! trace::config()
146-
//! .with_sampler(Sampler::AlwaysOn)
147-
//! .with_id_generator(RandomIdGenerator::default())
148-
//! .with_max_events_per_span(64)
149-
//! .with_max_attributes_per_span(16)
150-
//! .with_max_events_per_span(16)
151-
//! .with_resource(Resource::builder_empty().with_attribute(KeyValue::new("key", "value")).build()),
166+
//! .build()?;
167+
//!
168+
//! Ok(trace::TracerProvider::builder()
169+
//! .with_sampler(Sampler::AlwaysOn)
170+
//! .with_batch_exporter(exporter)
171+
//! .with_id_generator(RandomIdGenerator::default())
172+
//! .with_max_events_per_span(64)
173+
//! .with_max_attributes_per_span(16)
174+
//! .with_max_events_per_span(16)
175+
//! .with_resource(
176+
//! Resource::builder_empty()
177+
//! .with_attribute(KeyValue::new("key", "value"))
178+
//! .build()
152179
//! )
153-
//! .install_batch(opentelemetry_sdk::runtime::Tokio)?;
180+
//! .build())
181+
//! }
182+
//!
183+
//! fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
184+
//! global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new());
185+
//! let provider = init_traces()?;
186+
//! global::set_tracer_provider(provider.clone());
187+
//!
188+
//! let common_scope_attributes = vec![KeyValue::new("scope-key", "scope-value")];
189+
//! let scope = InstrumentationScope::builder("opentelemetry-zipkin")
190+
//! .with_version(env!("CARGO_PKG_VERSION"))
191+
//! .with_attributes(common_scope_attributes)
192+
//! .build();
193+
//! let tracer = global::tracer_with_scope(scope.clone());
154194
//!
155195
//! tracer.in_span("doing_work", |cx| {
156196
//! // Traced app logic here...
@@ -208,5 +248,5 @@ extern crate typed_builder;
208248
mod exporter;
209249
mod propagator;
210250

211-
pub use exporter::{new_pipeline, Error, Exporter, ZipkinPipelineBuilder};
251+
pub use exporter::{Error, ZipkinExporter, ZipkinExporterBuilder};
212252
pub use propagator::{B3Encoding, Propagator};

0 commit comments

Comments
 (0)
Please sign in to comment.