Skip to content

Commit 597a2af

Browse files
committed
more changes
1 parent 63fdf18 commit 597a2af

File tree

3 files changed

+50
-20
lines changed

3 files changed

+50
-20
lines changed

opentelemetry-otlp/src/exporter/http/logs.rs

+17-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ impl LogExporter for OtlpHttpClient {
1919
_ => Err(LogError::Other("exporter is already shut down".into())),
2020
})?;
2121

22-
let (body, content_type) = build_body(batch)?;
22+
let (body, content_type) = {
23+
let resource = self.resource.lock().unwrap();
24+
build_body(batch, &*resource)?
25+
};
2326
let mut request = http::Request::builder()
2427
.method(Method::POST)
2528
.uri(&self.collector_endpoint)
@@ -58,21 +61,29 @@ impl LogExporter for OtlpHttpClient {
5861
}
5962

6063
#[cfg(feature = "http-proto")]
61-
fn build_body(logs: Vec<LogEvent>) -> LogResult<(Vec<u8>, &'static str)> {
64+
fn build_body(
65+
logs: Vec<LogEvent>,
66+
resource: &opentelemetry_sdk::Resource,
67+
) -> LogResult<(Vec<u8>, &'static str)> {
6268
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
6369
use prost::Message;
70+
let resource_logs = logs
71+
.into_iter()
72+
.map(|log_event| (log_event, resource).into())
73+
.collect::<Vec<_>>();
6474

65-
let req = ExportLogsServiceRequest {
66-
resource_logs: logs.into_iter().map(Into::into).collect(),
67-
};
75+
let req = ExportLogsServiceRequest { resource_logs };
6876
let mut buf = vec![];
6977
req.encode(&mut buf).map_err(crate::Error::from)?;
7078

7179
Ok((buf, "application/x-protobuf"))
7280
}
7381

7482
#[cfg(not(feature = "http-proto"))]
75-
fn build_body(logs: Vec<LogData>) -> LogResult<(Vec<u8>, &'static str)> {
83+
fn build_body(
84+
logs: Vec<LogEvent>,
85+
resource: &opentelemetry_sdk::Resource,
86+
) -> LogResult<(Vec<u8>, &'static str)> {
7687
Err(LogsError::Other(
7788
"No http protocol configured. Enable one via `http-proto`".into(),
7889
))

opentelemetry-otlp/src/exporter/tonic/logs.rs

+14-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use core::fmt;
22

3+
use std::sync::{Arc, Mutex};
4+
35
use async_trait::async_trait;
46
use opentelemetry::logs::{LogError, LogResult};
57
use opentelemetry_proto::tonic::collector::logs::v1::{
@@ -12,6 +14,7 @@ use super::BoxInterceptor;
1214

1315
pub(crate) struct TonicLogsClient {
1416
inner: Option<ClientInner>,
17+
resource: Arc<Mutex<opentelemetry_sdk::Resource>>,
1518
}
1619

1720
struct ClientInner {
@@ -43,6 +46,7 @@ impl TonicLogsClient {
4346
client,
4447
interceptor,
4548
}),
49+
resource: Arc::new(Mutex::new(opentelemetry_sdk::Resource::default())),
4650
}
4751
}
4852
}
@@ -62,13 +66,20 @@ impl LogExporter for TonicLogsClient {
6266
None => return Err(LogError::Other("exporter is already shut down".into())),
6367
};
6468

69+
let resource_logs = {
70+
let resource = self.resource.lock().unwrap();
71+
batch
72+
.into_iter()
73+
.map(|log_event| (log_event, &*resource))
74+
.map(Into::into)
75+
.collect()
76+
};
77+
6578
client
6679
.export(Request::from_parts(
6780
metadata,
6881
extensions,
69-
ExportLogsServiceRequest {
70-
resource_logs: batch.into_iter().map(Into::into).collect(),
71-
},
82+
ExportLogsServiceRequest { resource_logs },
7283
))
7384
.await
7485
.map_err(crate::Error::from)?;

opentelemetry-proto/src/transform/logs.rs

+19-11
Original file line numberDiff line numberDiff line change
@@ -110,27 +110,35 @@ pub mod tonic {
110110
}
111111
}
112112

113-
impl From<opentelemetry_sdk::export::logs::LogData> for ResourceLogs {
114-
fn from(log_data: opentelemetry_sdk::export::logs::LogData) -> Self {
113+
impl
114+
From<(
115+
opentelemetry_sdk::export::logs::LogEvent,
116+
&opentelemetry_sdk::Resource,
117+
)> for ResourceLogs
118+
{
119+
fn from(
120+
data: (
121+
opentelemetry_sdk::export::logs::LogEvent,
122+
&opentelemetry_sdk::Resource,
123+
),
124+
) -> Self {
125+
let (log_event, resource) = data;
126+
115127
ResourceLogs {
116128
resource: Some(Resource {
117-
attributes: resource_attributes(&log_data.resource).0,
129+
attributes: resource_attributes(resource).0,
118130
dropped_attributes_count: 0,
119131
}),
120-
schema_url: log_data
121-
.resource
122-
.schema_url()
123-
.map(Into::into)
124-
.unwrap_or_default(),
132+
schema_url: resource.schema_url().map(Into::into).unwrap_or_default(),
125133
scope_logs: vec![ScopeLogs {
126-
schema_url: log_data
134+
schema_url: log_event
127135
.instrumentation
128136
.schema_url
129137
.clone()
130138
.map(Into::into)
131139
.unwrap_or_default(),
132-
scope: Some(log_data.instrumentation.into()),
133-
log_records: vec![log_data.record.into()],
140+
scope: Some(log_event.instrumentation.into()),
141+
log_records: vec![log_event.record.into()],
134142
}],
135143
}
136144
}

0 commit comments

Comments
 (0)