Skip to content

Commit d02e514

Browse files
committed
feat: add baggage propagation
1 parent 92f22b5 commit d02e514

File tree

12 files changed

+210
-31
lines changed

12 files changed

+210
-31
lines changed

Cargo.lock

+47
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@ graphql_client = { version = "0.13.0", default-features = fa
2424
http = { version = "0.2" } # request use 0.2
2525
moka = { version = "0.12.4", features = ["future"] }
2626
opentelemetry = { version = "0.21.0" }
27+
opentelemetry-http = { version = "0.10.0" }
2728
opentelemetry-semantic-conventions = { version = "0.13.0" }
2829
opentelemetry_sdk = { version = "0.21.2" }
30+
rand = { version = "0.8.5" }
2931
reqwest = { version = "0.11.23", default-features = false, features = ["rustls-tls", "json"] }
3032
serde = { version = "1", features = ["derive"] }
3133
serde_json = { version = "1.0.111" }
3234
tokio = { version = "1.35" }
3335
tracing = { version = "0.1.40" }
36+
tracing-opentelemetry = { version = "0.22.0" }
3437
tracing-subscriber = { version = "0.3.18", features = ["smallvec", "fmt", "ansi", "std", "env-filter", "time"], default-features = false }
3538
url = { version = "2.5.0" }
3639

crates/synd_api/src/main.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use tracing::{error, info};
33
use synd_api::{args, config, dependency::Dependency, serve::listen_and_serve};
44

55
fn init_tracing() {
6-
use synd_o11y::tracing_subscriber::{audit, otel_log};
6+
use synd_o11y::{
7+
opentelemetry::init_propagation,
8+
tracing_subscriber::{audit, otel_log},
9+
};
710
use tracing_subscriber::{
811
filter::EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt as _, Layer as _,
912
Registry,
@@ -43,6 +46,9 @@ fn init_tracing() {
4346
)
4447
.with(audit::layer())
4548
.init();
49+
50+
// Set text map propagator globally
51+
init_propagation();
4652
}
4753

4854
#[tokio::main]

crates/synd_api/src/serve/layer/trace.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,26 @@ pub struct MakeSpan;
66

77
impl<B> tower_http::trace::MakeSpan<B> for MakeSpan {
88
fn make_span(&mut self, request: &axum::http::Request<B>) -> tracing::Span {
9-
tracing::span!(
9+
use synd_o11y::opentelemetry::extension::*;
10+
let cx = synd_o11y::opentelemetry::http::extract(request.headers());
11+
12+
let request_id = cx
13+
.baggage()
14+
.get("request.id")
15+
.map_or("?".into(), |v| v.as_str());
16+
17+
let span = tracing::span!(
1018
Level::INFO,
1119
"http",
1220
method = %request.method(),
1321
uri = %request.uri(),
14-
)
22+
%request_id,
23+
);
24+
25+
tracing::info!("Request headers: {:#?}", request.headers());
26+
27+
span.set_parent(cx);
28+
span
1529
}
1630
}
1731

crates/synd_o11y/Cargo.toml

+6-2
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@ name = "synd_o11y"
1212
version = "0.1.0"
1313

1414
[dependencies]
15-
# disable logs_level_enabled which affect global filitering
15+
axum = { workspace = true }
16+
http = { workspace = true }
1617
opentelemetry = { workspace = true }
17-
opentelemetry-appender-tracing = { version = "0.2.0", default-features = false }
18+
opentelemetry-appender-tracing = { version = "0.2.0", default-features = false } # disable logs_level_enabled which affect global filtering
19+
opentelemetry-http = { workspace = true }
1820
opentelemetry-otlp = { version = "0.14.0", default-features = false, features = ["trace", "metrics", "logs", "grpc-tonic"] }
1921
opentelemetry-semantic-conventions = { workspace = true }
2022
opentelemetry_sdk = { workspace = true, features = ["logs", "rt-tokio"] }
23+
reqwest = { workspace = true }
2124
tracing = { workspace = true }
25+
tracing-opentelemetry = { workspace = true }
2226
tracing-subscriber = { workspace = true }
2327

2428
[lints]
+5-22
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,7 @@
1-
use std::borrow::Cow;
1+
mod resource;
2+
pub use resource::resource;
23

3-
use opentelemetry::KeyValue;
4-
use opentelemetry_sdk::Resource;
5-
use opentelemetry_semantic_conventions::resource::{
6-
DEPLOYMENT_ENVIRONMENT, SERVICE_NAME, SERVICE_NAMESPACE, SERVICE_VERSION,
7-
};
4+
mod propagation;
5+
pub use propagation::{extension, http, init_propagation};
86

9-
pub fn resource(
10-
service_name: impl Into<Cow<'static, str>>,
11-
service_version: impl Into<Cow<'static, str>>,
12-
deployment_environment: impl Into<Cow<'static, str>>,
13-
) -> Resource {
14-
Resource::new(
15-
[
16-
(SERVICE_NAME, service_name.into()),
17-
(SERVICE_VERSION, service_version.into()),
18-
(SERVICE_NAMESPACE, "syndicationd".into()),
19-
(DEPLOYMENT_ENVIRONMENT, deployment_environment.into()),
20-
]
21-
.into_iter()
22-
.map(|(key, value)| KeyValue::new(key, value)),
23-
)
24-
}
7+
pub use opentelemetry::KeyValue;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use opentelemetry_sdk::propagation::{
2+
BaggagePropagator, TextMapCompositePropagator, TraceContextPropagator,
3+
};
4+
5+
pub mod extension {
6+
pub use opentelemetry::baggage::BaggageExt as _;
7+
pub use tracing_opentelemetry::OpenTelemetrySpanExt as _;
8+
}
9+
10+
/// Currently axum and reqwest have different http crate versions.
11+
/// axum is ver 1, reqwest ver 0.2, therefore, we use each type in inject and extract.
12+
pub mod http {
13+
use super::extension::*;
14+
use opentelemetry::propagation::Extractor;
15+
use opentelemetry_http::HeaderInjector;
16+
17+
/// Inject current opentelemetry context into given headers
18+
pub fn inject(cx: &opentelemetry::Context, headers: &mut reqwest::header::HeaderMap) {
19+
opentelemetry::global::get_text_map_propagator(|propagator| {
20+
propagator.inject_context(cx, &mut HeaderInjector(headers));
21+
});
22+
}
23+
pub fn inject_with_baggage<T, I>(
24+
cx: &opentelemetry::Context,
25+
headers: &mut reqwest::header::HeaderMap,
26+
baggage: T,
27+
) where
28+
T: IntoIterator<Item = I>,
29+
I: Into<opentelemetry::baggage::KeyValueMetadata>,
30+
{
31+
inject(&cx.with_baggage(baggage), headers);
32+
}
33+
34+
pub fn extract(headers: &axum::http::HeaderMap) -> opentelemetry::Context {
35+
opentelemetry::global::get_text_map_propagator(|propagator| {
36+
propagator.extract(&HeaderExtractor(headers))
37+
})
38+
}
39+
40+
/// `opentelemetry_http` implement `HeaderExtractor` against http 0.2
41+
/// so, imple manually
42+
struct HeaderExtractor<'a>(pub &'a axum::http::HeaderMap);
43+
44+
impl<'a> Extractor for HeaderExtractor<'a> {
45+
/// Get a value for a key from the `HeaderMap`. If the value is not valid ASCII, returns None.
46+
fn get(&self, key: &str) -> Option<&str> {
47+
self.0.get(key).and_then(|value| value.to_str().ok())
48+
}
49+
50+
/// Collect all the keys from the `HeaderMap`.
51+
fn keys(&self) -> Vec<&str> {
52+
self.0
53+
.keys()
54+
.map(axum::http::HeaderName::as_str)
55+
.collect::<Vec<_>>()
56+
}
57+
}
58+
}
59+
60+
pub fn init_propagation() {
61+
let trace_propagator = TraceContextPropagator::new();
62+
let baggage_propagator = BaggagePropagator::new();
63+
let composite_propagator = TextMapCompositePropagator::new(vec![
64+
Box::new(trace_propagator),
65+
Box::new(baggage_propagator),
66+
]);
67+
68+
opentelemetry::global::set_text_map_propagator(composite_propagator);
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
use opentelemetry::KeyValue;
2+
use opentelemetry_sdk::Resource;
3+
use opentelemetry_semantic_conventions::resource::{
4+
DEPLOYMENT_ENVIRONMENT, SERVICE_NAME, SERVICE_NAMESPACE, SERVICE_VERSION,
5+
};
6+
use std::borrow::Cow;
7+
8+
pub fn resource(
9+
service_name: impl Into<Cow<'static, str>>,
10+
service_version: impl Into<Cow<'static, str>>,
11+
deployment_environment: impl Into<Cow<'static, str>>,
12+
) -> Resource {
13+
Resource::new(
14+
[
15+
(SERVICE_NAME, service_name.into()),
16+
(SERVICE_VERSION, service_version.into()),
17+
(SERVICE_NAMESPACE, "syndicationd".into()),
18+
(DEPLOYMENT_ENVIRONMENT, deployment_environment.into()),
19+
]
20+
.into_iter()
21+
.map(|(key, value)| KeyValue::new(key, value)),
22+
)
23+
}

crates/synd_term/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ path = "src/main.rs"
1818
[dependencies]
1919
synd_authn = { path = "../synd_authn", version = "0.1.0" }
2020
synd_feed = { path = "../synd_feed", version = "0.1.0" }
21+
synd_o11y = { path = "../synd_o11y", version = "0.1.0" }
2122

2223
anyhow = { workspace = true }
2324
chrono = { workspace = true }
@@ -29,6 +30,7 @@ futures-util = "0.3.30"
2930
graphql_client = { workspace = true }
3031
html2text = { version = "0.12" }
3132
open = "5.0.1"
33+
rand = { workspace = true }
3234
ratatui = { git = "https://github.com/ratatui-org/ratatui.git", branch = "main" }
3335
reqwest = { workspace = true }
3436
serde = { workspace = true, features = ["derive"] }

crates/synd_term/src/client/mod.rs

+24-4
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ use anyhow::anyhow;
44
use graphql_client::{GraphQLQuery, Response};
55
use reqwest::header::{self, HeaderValue};
66
use serde::{de::DeserializeOwned, Serialize};
7-
use tracing::error;
7+
use synd_o11y::opentelemetry::{extension::*, KeyValue};
8+
use tracing::{error, Span};
89
use url::Url;
910

1011
use crate::{auth::Credential, config, types};
@@ -110,7 +111,7 @@ impl Client {
110111
Body: Serialize + ?Sized,
111112
ResponseData: DeserializeOwned + Debug,
112113
{
113-
let res: Response<ResponseData> = self
114+
let mut request = self
114115
.client
115116
.post(self.endpoint.clone())
116117
.header(
@@ -121,13 +122,26 @@ impl Client {
121122
.clone(),
122123
)
123124
.json(&body)
124-
.send()
125+
.build()?;
126+
127+
// TODO: use trace_id
128+
let request_id = Self::request_id();
129+
130+
synd_o11y::opentelemetry::http::inject_with_baggage(
131+
&Span::current().context(),
132+
request.headers_mut(),
133+
std::iter::once(KeyValue::new("request.id", request_id)),
134+
);
135+
136+
let response: Response<ResponseData> = self
137+
.client
138+
.execute(request)
125139
.await?
126140
.error_for_status()?
127141
.json()
128142
.await?;
129143

130-
match (res.data, res.errors) {
144+
match (response.data, response.errors) {
131145
(_, Some(errs)) if !errs.is_empty() => {
132146
for err in errs {
133147
error!("{err:?}");
@@ -138,4 +152,10 @@ impl Client {
138152
_ => Err(anyhow::anyhow!("unexpected response",)),
139153
}
140154
}
155+
156+
fn request_id() -> String {
157+
// https://stackoverflow.com/questions/54275459/how-do-i-create-a-random-string-by-sampling-from-alphanumeric-characters
158+
use rand::distributions::{Alphanumeric, DistString};
159+
Alphanumeric.sample_string(&mut rand::thread_rng(), 10)
160+
}
141161
}

0 commit comments

Comments
 (0)