Skip to content

Commit f0acdb7

Browse files
authored
feat: add log stuuport to fake-collector (#152)
* Add FakeLogsService; Split trace to new file Added a FakeLogsService that works in a similar manner to the FakeTraceService. I also split the tracing functionality out into separate file/module for better code organization now that there's more than a single fake service * Remove println! * Remove another println!
1 parent 55b0fdd commit f0acdb7

File tree

6 files changed

+307
-160
lines changed

6 files changed

+307
-160
lines changed

fake-opentelemetry-collector/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ license.workspace = true
1414
futures = "0.3"
1515
hex = "0.4"
1616
opentelemetry = { workspace = true }
17-
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "trace"] }
17+
opentelemetry-otlp = { workspace = true, features = ["grpc-tonic", "trace", "logs"] }
1818
opentelemetry-proto = { workspace = true, features = ["gen-tonic"] }
1919
# need tokio runtime to run smoke tests.
2020
opentelemetry_sdk = { workspace = true, features = [
+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
use std::collections::BTreeMap;
2+
3+
pub(crate) fn cnv_attributes(
4+
attributes: &[opentelemetry_proto::tonic::common::v1::KeyValue],
5+
) -> BTreeMap<String, String> {
6+
attributes
7+
.iter()
8+
.map(|kv| (kv.key.to_string(), format!("{:?}", kv.value)))
9+
.collect::<BTreeMap<String, String>>()
10+
}
+73-159
Original file line numberDiff line numberDiff line change
@@ -1,163 +1,26 @@
1-
//! based on https://github.com/open-telemetry/opentelemetry-rust/blob/main/opentelemetry-otlp/tests/smoke.rs
1+
mod common;
2+
mod logs;
3+
mod trace;
4+
pub use logs::ExportedLog;
5+
pub use trace::ExportedSpan;
6+
7+
use logs::*;
8+
use trace::*;
9+
10+
use std::net::SocketAddr;
11+
212
use futures::StreamExt;
3-
use opentelemetry_proto::tonic::collector::trace::v1::{
4-
trace_service_server::{TraceService, TraceServiceServer},
5-
ExportTraceServiceRequest, ExportTraceServiceResponse,
6-
};
7-
use serde::Serialize;
8-
use std::collections::BTreeMap;
13+
use opentelemetry_otlp::WithExportConfig;
14+
use opentelemetry_proto::tonic::collector::logs::v1::logs_service_server::LogsServiceServer;
15+
use opentelemetry_proto::tonic::collector::trace::v1::trace_service_server::TraceServiceServer;
916
use std::sync::mpsc;
10-
use std::{net::SocketAddr, sync::Mutex};
1117
use tokio_stream::wrappers::TcpListenerStream;
1218
use tracing::debug;
1319

14-
//pub type ExportedSpan = opentelemetry_proto::tonic::trace::v1::Span;
15-
16-
/// opentelemetry_proto::tonic::trace::v1::Span is no compatible with serde::Serialize
17-
/// and to be able to test with insta,... it's needed (Debug is not enough to be able to filter unstable value,...)
18-
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
19-
pub struct ExportedSpan {
20-
pub trace_id: String,
21-
pub span_id: String,
22-
pub trace_state: String,
23-
pub parent_span_id: String,
24-
pub name: String,
25-
pub kind: String, //SpanKind,
26-
pub start_time_unix_nano: u64,
27-
pub end_time_unix_nano: u64,
28-
pub attributes: BTreeMap<String, String>,
29-
pub dropped_attributes_count: u32,
30-
pub events: Vec<Event>,
31-
pub dropped_events_count: u32,
32-
pub links: Vec<Link>,
33-
pub dropped_links_count: u32,
34-
pub status: Option<Status>,
35-
}
36-
37-
impl From<opentelemetry_proto::tonic::trace::v1::Span> for ExportedSpan {
38-
fn from(value: opentelemetry_proto::tonic::trace::v1::Span) -> Self {
39-
Self {
40-
trace_id: hex::encode(&value.trace_id),
41-
span_id: hex::encode(&value.span_id),
42-
trace_state: value.trace_state.clone(),
43-
parent_span_id: hex::encode(&value.parent_span_id),
44-
name: value.name.clone(),
45-
kind: value.kind().as_str_name().to_owned(),
46-
start_time_unix_nano: value.start_time_unix_nano,
47-
end_time_unix_nano: value.end_time_unix_nano,
48-
attributes: cnv_attributes(&value.attributes),
49-
dropped_attributes_count: value.dropped_attributes_count,
50-
events: value.events.iter().map(Event::from).collect(),
51-
dropped_events_count: value.dropped_events_count,
52-
links: value.links.iter().map(Link::from).collect(),
53-
dropped_links_count: value.dropped_links_count,
54-
status: value.status.map(Status::from),
55-
}
56-
}
57-
}
58-
59-
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Serialize)]
60-
pub struct Status {
61-
message: String,
62-
code: String,
63-
}
64-
65-
impl From<opentelemetry_proto::tonic::trace::v1::Status> for Status {
66-
fn from(value: opentelemetry_proto::tonic::trace::v1::Status) -> Self {
67-
Self {
68-
message: value.message.clone(),
69-
code: value.code().as_str_name().to_string(),
70-
}
71-
}
72-
}
73-
74-
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
75-
pub struct Link {
76-
pub trace_id: String,
77-
pub span_id: String,
78-
pub trace_state: String,
79-
pub attributes: BTreeMap<String, String>,
80-
pub dropped_attributes_count: u32,
81-
}
82-
83-
impl From<&opentelemetry_proto::tonic::trace::v1::span::Link> for Link {
84-
fn from(value: &opentelemetry_proto::tonic::trace::v1::span::Link) -> Self {
85-
Self {
86-
trace_id: hex::encode(&value.trace_id),
87-
span_id: hex::encode(&value.span_id),
88-
trace_state: value.trace_state.clone(),
89-
attributes: cnv_attributes(&value.attributes),
90-
dropped_attributes_count: value.dropped_attributes_count,
91-
}
92-
}
93-
}
94-
95-
fn cnv_attributes(
96-
attributes: &[opentelemetry_proto::tonic::common::v1::KeyValue],
97-
) -> BTreeMap<String, String> {
98-
attributes
99-
.iter()
100-
.map(|kv| (kv.key.to_string(), format!("{:?}", kv.value)))
101-
.collect::<BTreeMap<String, String>>()
102-
// v.sort_by_key(|kv| kv.0.clone());
103-
// v
104-
}
105-
106-
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
107-
pub struct Event {
108-
time_unix_nano: u64,
109-
name: String,
110-
attributes: BTreeMap<String, String>,
111-
dropped_attributes_count: u32,
112-
}
113-
114-
impl From<&opentelemetry_proto::tonic::trace::v1::span::Event> for Event {
115-
fn from(value: &opentelemetry_proto::tonic::trace::v1::span::Event) -> Self {
116-
Self {
117-
time_unix_nano: value.time_unix_nano,
118-
name: value.name.clone(),
119-
attributes: cnv_attributes(&value.attributes),
120-
dropped_attributes_count: value.dropped_attributes_count,
121-
}
122-
}
123-
}
124-
125-
struct FakeTraceService {
126-
tx: Mutex<mpsc::SyncSender<ExportedSpan>>,
127-
}
128-
129-
impl FakeTraceService {
130-
pub fn new(tx: mpsc::SyncSender<ExportedSpan>) -> Self {
131-
Self { tx: Mutex::new(tx) }
132-
}
133-
}
134-
135-
#[tonic::async_trait]
136-
impl TraceService for FakeTraceService {
137-
async fn export(
138-
&self,
139-
request: tonic::Request<ExportTraceServiceRequest>,
140-
) -> Result<tonic::Response<ExportTraceServiceResponse>, tonic::Status> {
141-
debug!("Sending request into channel...");
142-
request
143-
.into_inner()
144-
.resource_spans
145-
.into_iter()
146-
.flat_map(|rs| rs.scope_spans)
147-
.flat_map(|ss| ss.spans)
148-
.map(ExportedSpan::from)
149-
.for_each(|es| {
150-
self.tx.lock().unwrap().send(es).expect("Channel full");
151-
});
152-
Ok(tonic::Response::new(ExportTraceServiceResponse {
153-
partial_success: None,
154-
}))
155-
}
156-
}
157-
15820
pub struct FakeCollectorServer {
15921
address: SocketAddr,
16022
req_rx: mpsc::Receiver<ExportedSpan>,
23+
log_rx: mpsc::Receiver<ExportedLog>,
16124
handle: tokio::task::JoinHandle<()>,
16225
}
16326

@@ -174,20 +37,23 @@ impl FakeCollectorServer {
17437
});
17538

17639
let (req_tx, req_rx) = mpsc::sync_channel::<ExportedSpan>(1024);
177-
let service = TraceServiceServer::new(FakeTraceService::new(req_tx));
40+
let (log_tx, log_rx) = mpsc::sync_channel::<ExportedLog>(1024);
41+
let trace_service = TraceServiceServer::new(FakeTraceService::new(req_tx));
42+
let logs_service = LogsServiceServer::new(FakeLogsService::new(log_tx));
17843
let handle = tokio::task::spawn(async move {
17944
debug!("start FakeCollectorServer http://{addr}"); //Devskim: ignore DS137138)
18045
tonic::transport::Server::builder()
181-
.add_service(service)
46+
.add_service(trace_service)
47+
.add_service(logs_service)
18248
.serve_with_incoming(stream)
183-
// .serve(addr)
18449
.await
18550
.expect("Server failed");
18651
debug!("stop FakeCollectorServer");
18752
});
18853
Ok(Self {
18954
address: addr,
19055
req_rx,
56+
log_rx,
19157
handle,
19258
})
19359
}
@@ -204,13 +70,16 @@ impl FakeCollectorServer {
20470
std::iter::from_fn(|| self.req_rx.try_recv().ok()).collect::<Vec<_>>()
20571
}
20672

73+
pub fn exported_logs(&self) -> Vec<ExportedLog> {
74+
std::iter::from_fn(|| self.log_rx.try_recv().ok()).collect::<Vec<_>>()
75+
}
76+
20777
pub fn abort(self) {
20878
self.handle.abort()
20979
}
21080
}
21181

21282
pub async fn setup_tracer(fake_server: &FakeCollectorServer) -> opentelemetry_sdk::trace::Tracer {
213-
use opentelemetry_otlp::WithExportConfig;
21483
// if the environment variable is set (in test or in caller), `with_endpoint` value is ignored
21584
std::env::remove_var("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT");
21685
opentelemetry_otlp::new_pipeline()
@@ -221,14 +90,29 @@ pub async fn setup_tracer(fake_server: &FakeCollectorServer) -> opentelemetry_sd
22190
.with_endpoint(fake_server.endpoint()),
22291
)
22392
.install_batch(opentelemetry_sdk::runtime::Tokio)
224-
.expect("failed to install")
93+
.expect("failed to install tracer")
94+
}
95+
96+
pub async fn setup_logger(
97+
fake_server: &FakeCollectorServer,
98+
) -> opentelemetry_sdk::logs::LoggerProvider {
99+
opentelemetry_otlp::new_pipeline()
100+
.logging()
101+
.with_exporter(
102+
opentelemetry_otlp::new_exporter()
103+
.tonic()
104+
.with_endpoint(fake_server.endpoint()),
105+
)
106+
.install_simple() //Install simple so we don't have to wait for batching in tests
107+
.expect("failed to install logging")
225108
}
226109

227110
#[cfg(test)]
228111
mod tests {
229112
use super::*;
230-
//use opentelemetry::{KeyValue, Value};
113+
231114
use opentelemetry::global::shutdown_tracer_provider;
115+
use opentelemetry::logs::{LogRecord, Logger, LoggerProvider, Severity};
232116
use opentelemetry::trace::{Span, SpanKind, Tracer};
233117

234118
#[tokio::test(flavor = "multi_thread")]
@@ -245,7 +129,6 @@ mod tests {
245129
.start(&tracer);
246130
span.add_event("my-test-event", vec![]);
247131
span.end();
248-
249132
shutdown_tracer_provider();
250133

251134
let otel_spans = fake_collector.exported_spans();
@@ -272,4 +155,35 @@ mod tests {
272155
}),
273156
});
274157
}
158+
159+
#[tokio::test(flavor = "multi_thread")]
160+
async fn test_fake_logger_and_collector() {
161+
let fake_collector = FakeCollectorServer::start()
162+
.await
163+
.expect("fake collector setup and started");
164+
165+
let logger_provider = setup_logger(&fake_collector).await;
166+
let logger = logger_provider.logger("test");
167+
let mut record = logger.create_log_record();
168+
record.set_body("This is information".into());
169+
record.set_severity_number(Severity::Info);
170+
record.set_severity_text("info".into());
171+
logger.emit(record);
172+
173+
let otel_logs = fake_collector.exported_logs();
174+
insta::assert_yaml_snapshot!(otel_logs, {
175+
"[].trace_id" => insta::dynamic_redaction(|value, _path| {
176+
assert2::let_assert!(Some(trace_id) = value.as_str());
177+
format!("[trace_id:lg{}]", trace_id.len())
178+
}),
179+
"[].span_id" => insta::dynamic_redaction(|value, _path| {
180+
assert2::let_assert!(Some(span_id) = value.as_str());
181+
format!("[span_id:lg{}]", span_id.len())
182+
}),
183+
"[].observed_time_unix_nano" => "[timestamp]",
184+
"[].severity_number" => 9,
185+
"[].severity_text" => "info",
186+
"[].body" => "AnyValue { value: Some(StringValue(\"This is information\")) }",
187+
});
188+
}
275189
}
+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use crate::common::cnv_attributes;
2+
use opentelemetry_proto::tonic::collector::logs::v1::{
3+
logs_service_server::LogsService, ExportLogsServiceRequest, ExportLogsServiceResponse,
4+
};
5+
use opentelemetry_proto::tonic::common::v1::AnyValue;
6+
use serde::Serialize;
7+
use std::collections::BTreeMap;
8+
use std::sync::{mpsc, Mutex};
9+
10+
/// This is created to flatten the log record to make it more compatible with insta for testing
11+
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
12+
pub struct ExportedLog {
13+
pub trace_id: String,
14+
pub span_id: String,
15+
pub observed_time_unix_nano: u64,
16+
pub severity_number: i32,
17+
pub severity_text: String,
18+
pub body: Option<String>,
19+
pub attributes: BTreeMap<String, String>,
20+
pub dropped_attributes_count: u32,
21+
pub flags: u32,
22+
}
23+
24+
impl From<opentelemetry_proto::tonic::logs::v1::LogRecord> for ExportedLog {
25+
fn from(value: opentelemetry_proto::tonic::logs::v1::LogRecord) -> Self {
26+
Self {
27+
trace_id: hex::encode(value.trace_id),
28+
span_id: hex::encode(value.span_id),
29+
observed_time_unix_nano: value.observed_time_unix_nano,
30+
severity_number: value.severity_number,
31+
severity_text: value.severity_text,
32+
body: value.body.map(|value| format!("{:?}", value)),
33+
attributes: cnv_attributes(&value.attributes),
34+
dropped_attributes_count: value.dropped_attributes_count,
35+
flags: value.flags,
36+
}
37+
}
38+
}
39+
40+
pub(crate) struct FakeLogsService {
41+
tx: Mutex<mpsc::SyncSender<ExportedLog>>,
42+
}
43+
44+
impl FakeLogsService {
45+
pub fn new(tx: mpsc::SyncSender<ExportedLog>) -> Self {
46+
Self { tx: Mutex::new(tx) }
47+
}
48+
}
49+
50+
#[tonic::async_trait]
51+
impl LogsService for FakeLogsService {
52+
async fn export(
53+
&self,
54+
request: tonic::Request<ExportLogsServiceRequest>,
55+
) -> Result<tonic::Response<ExportLogsServiceResponse>, tonic::Status> {
56+
request
57+
.into_inner()
58+
.resource_logs
59+
.into_iter()
60+
.flat_map(|rl| rl.scope_logs)
61+
.flat_map(|sl| sl.log_records)
62+
.map(ExportedLog::from)
63+
.for_each(|el| self.tx.lock().unwrap().send(el).unwrap());
64+
Ok(tonic::Response::new(ExportLogsServiceResponse {
65+
partial_success: None,
66+
}))
67+
}
68+
}

0 commit comments

Comments
 (0)